001package org.apache.commons.jcs.utils.threadpool;
002
003/*
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements.  See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership.  The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License.  You may obtain a copy of the License at
011 *
012 *   http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing,
015 * software distributed under the License is distributed on an
016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017 * KIND, either express or implied.  See the License for the
018 * specific language governing permissions and limitations
019 * under the License.
020 */
021
022import java.util.ArrayList;
023import java.util.Properties;
024import java.util.concurrent.BlockingQueue;
025import java.util.concurrent.ConcurrentHashMap;
026import java.util.concurrent.LinkedBlockingQueue;
027import java.util.concurrent.ThreadPoolExecutor;
028import java.util.concurrent.TimeUnit;
029
030import org.apache.commons.jcs.utils.threadpool.PoolConfiguration.WhenBlockedPolicy;
031import org.apache.commons.logging.Log;
032import org.apache.commons.logging.LogFactory;
033
034/**
035 * This manages threadpools for an application
036 * <p>
037 * It is a singleton since threads need to be managed vm wide.
038 * <p>
039 * This manager forces you to use a bounded queue. By default it uses the current thread for
040 * execution when the buffer is full and no free threads can be created.
041 * <p>
042 * You can specify the props file to use or pass in a properties object prior to configuration. By
043 * default it looks for configuration information in thread_pool.properties.
044 * <p>
045 * If set, the Properties object will take precedence.
046 * <p>
047 * If a value is not set for a particular pool, the hard coded defaults will be used.
048 *
049 * <pre>
050 * int boundarySize_DEFAULT = 2000;
051 *
052 * int maximumPoolSize_DEFAULT = 150;
053 *
054 * int minimumPoolSize_DEFAULT = 4;
055 *
056 * int keepAliveTime_DEFAULT = 1000 * 60 * 5;
057 *
058 * boolean abortWhenBlocked = false;
059 *
060 * String whenBlockedPolicy_DEFAULT = IPoolConfiguration.POLICY_RUN;
061 *
062 * int startUpSize_DEFAULT = 4;
063 * </pre>
064 *
065 * You can configure default settings by specifying a default pool in the properties, ie "cache.ccf"
066 * <p>
067 * @author Aaron Smuts
068 */
069public class ThreadPoolManager
070{
071    /** The logger */
072    private static final Log log = LogFactory.getLog( ThreadPoolManager.class );
073
074    /**
075     * DEFAULT SETTINGS, these are not final since they can be set via the properties file or object
076     */
077    private static boolean useBoundary_DEFAULT = true;
078
079    /** Default queue size limit */
080    private static int boundarySize_DEFAULT = 2000;
081
082    /** Default max size */
083    private static int maximumPoolSize_DEFAULT = 150;
084
085    /** Default min */
086    private static int minimumPoolSize_DEFAULT = 4;
087
088    /** Default keep alive */
089    private static int keepAliveTime_DEFAULT = 1000 * 60 * 5;
090
091    /** Default when blocked */
092    private static WhenBlockedPolicy whenBlockedPolicy_DEFAULT = WhenBlockedPolicy.RUN;
093
094    /** Default startup size */
095    private static int startUpSize_DEFAULT = 4;
096
097    /** The default config, created using property defaults if present, else those above. */
098    private static PoolConfiguration defaultConfig;
099
100    /** the root property name */
101    private static final String PROP_NAME_ROOT = "thread_pool";
102
103    /** default property file name */
104    private static final String DEFAULT_PROP_NAME_ROOT = "thread_pool.default";
105
106    /**
107     * You can specify the properties to be used to configure the thread pool. Setting this post
108     * initialization will have no effect.
109     */
110    private static volatile Properties props = null;
111
112    /** singleton instance */
113    private static ThreadPoolManager INSTANCE = null;
114
115    /** Map of names to pools. */
116    private ConcurrentHashMap<String, ThreadPoolExecutor> pools;
117
118    /**
119     * No instances please. This is a singleton.
120     */
121    private ThreadPoolManager()
122    {
123        this.pools = new ConcurrentHashMap<String, ThreadPoolExecutor>();
124        configure();
125    }
126
127    /**
128     * Creates a pool based on the configuration info.
129     * <p>
130     * @param config
131     * @return A ThreadPoll wrapper
132     */
133    private ThreadPoolExecutor createPool( PoolConfiguration config )
134    {
135        BlockingQueue<Runnable> queue = null;
136        if ( config.isUseBoundary() )
137        {
138            if ( log.isDebugEnabled() )
139            {
140                log.debug( "Creating a Bounded Buffer to use for the pool" );
141            }
142
143            queue = new LinkedBlockingQueue<Runnable>(config.getBoundarySize());
144        }
145        else
146        {
147            if ( log.isDebugEnabled() )
148            {
149                log.debug( "Creating a non bounded Linked Queue to use for the pool" );
150            }
151            queue = new LinkedBlockingQueue<Runnable>();
152        }
153
154        ThreadPoolExecutor pool = new ThreadPoolExecutor(
155            config.getStartUpSize(),
156            config.getMaximumPoolSize(),
157            config.getKeepAliveTime(),
158            TimeUnit.MILLISECONDS,
159            queue,
160            new DaemonThreadFactory("JCS-ThreadPoolManager-"));
161
162        // when blocked policy
163        switch (config.getWhenBlockedPolicy())
164        {
165            case ABORT:
166                pool.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
167                break;
168
169            case RUN:
170                pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
171                break;
172
173            case WAIT:
174                throw new RuntimeException("POLICY_WAIT no longer supported");
175
176            case DISCARDOLDEST:
177                pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
178                break;
179
180            default:
181                break;
182        }
183
184        pool.prestartAllCoreThreads();
185
186        return pool;
187    }
188
189    /**
190     * Returns a configured instance of the ThreadPoolManger To specify a configuration file or
191     * Properties object to use call the appropriate setter prior to calling getInstance.
192     * <p>
193     * @return The single instance of the ThreadPoolManager
194     */
195    public static synchronized ThreadPoolManager getInstance()
196    {
197        if ( INSTANCE == null )
198        {
199            INSTANCE = new ThreadPoolManager();
200        }
201        return INSTANCE;
202    }
203
204    /**
205     * Dispose of the instance of the ThreadPoolManger and shut down all thread pools
206     */
207    public static synchronized void dispose()
208    {
209        if ( INSTANCE != null )
210        {
211            for ( String poolName : INSTANCE.getPoolNames())
212            {
213                try
214                {
215                    INSTANCE.getPool(poolName).shutdownNow();
216                }
217                catch (Throwable t)
218                {
219                    log.warn("Failed to close pool " + poolName, t);
220                }
221            }
222
223            INSTANCE = null;
224        }
225    }
226
227    /**
228     * Returns a pool by name. If a pool by this name does not exist in the configuration file or
229     * properties, one will be created using the default values.
230     * <p>
231     * Pools are lazily created.
232     * <p>
233     * @param name
234     * @return The thread pool configured for the name.
235     */
236    public ThreadPoolExecutor getPool( String name )
237    {
238        ThreadPoolExecutor pool = pools.get( name );
239
240        if ( pool == null )
241        {
242            if ( log.isDebugEnabled() )
243            {
244                log.debug( "Creating pool for name [" + name + "]" );
245            }
246
247            PoolConfiguration config = loadConfig( PROP_NAME_ROOT + "." + name );
248            pool = createPool( config );
249            ThreadPoolExecutor _pool = pools.putIfAbsent( name, pool );
250            if (_pool != null)
251            {
252                pool = _pool;
253            }
254
255            if ( log.isDebugEnabled() )
256            {
257                log.debug( "PoolName = " + getPoolNames() );
258            }
259        }
260
261        return pool;
262    }
263
264    /**
265     * Returns the names of all configured pools.
266     * <p>
267     * @return ArrayList of string names
268     */
269    public ArrayList<String> getPoolNames()
270    {
271        return new ArrayList<String>(pools.keySet());
272    }
273
274    /**
275     * This will be used if it is not null on initialization. Setting this post initialization will
276     * have no effect.
277     * <p>
278     * @param props The props to set.
279     */
280    public static void setProps( Properties props )
281    {
282        ThreadPoolManager.props = props;
283    }
284
285    /**
286     * Initialize the ThreadPoolManager and create all the pools defined in the configuration.
287     */
288    private static void configure()
289    {
290        if ( log.isDebugEnabled() )
291        {
292            log.debug( "Initializing ThreadPoolManager" );
293        }
294
295        if ( props == null )
296        {
297            log.warn( "No configuration settings found.  Using hardcoded default values for all pools." );
298            props = new Properties();
299        }
300
301        // set intial default and then override if new
302        // settings are available
303        defaultConfig = new PoolConfiguration( useBoundary_DEFAULT, boundarySize_DEFAULT, maximumPoolSize_DEFAULT,
304                                               minimumPoolSize_DEFAULT, keepAliveTime_DEFAULT,
305                                               whenBlockedPolicy_DEFAULT, startUpSize_DEFAULT );
306
307        defaultConfig = loadConfig( DEFAULT_PROP_NAME_ROOT );
308    }
309
310    /**
311     * Configures the default PoolConfiguration settings.
312     * <p>
313     * @param root
314     * @return PoolConfiguration
315     */
316    private static PoolConfiguration loadConfig( String root )
317    {
318        PoolConfiguration config = defaultConfig.clone();
319
320        try
321        {
322            config.setUseBoundary( Boolean.parseBoolean( props.getProperty( root + ".useBoundary", "false" ) ) );
323        }
324        catch ( NumberFormatException nfe )
325        {
326            log.error( "useBoundary not a boolean.", nfe );
327        }
328
329        // load default if they exist
330        try
331        {
332            config.setBoundarySize( Integer.parseInt( props.getProperty( root + ".boundarySize", "2000" ) ) );
333        }
334        catch ( NumberFormatException nfe )
335        {
336            log.error( "boundarySize not a number.", nfe );
337        }
338
339        // maximum pool size
340        try
341        {
342            config.setMaximumPoolSize( Integer.parseInt( props.getProperty( root + ".maximumPoolSize", "150" ) ) );
343        }
344        catch ( NumberFormatException nfe )
345        {
346            log.error( "maximumPoolSize not a number.", nfe );
347        }
348
349        // minimum pool size
350        try
351        {
352            config.setMinimumPoolSize( Integer.parseInt( props.getProperty( root + ".minimumPoolSize", "4" ) ) );
353        }
354        catch ( NumberFormatException nfe )
355        {
356            log.error( "minimumPoolSize not a number.", nfe );
357        }
358
359        // keep alive
360        try
361        {
362            config.setKeepAliveTime( Integer.parseInt( props.getProperty( root + ".keepAliveTime", "300000" ) ) );
363        }
364        catch ( NumberFormatException nfe )
365        {
366            log.error( "keepAliveTime not a number.", nfe );
367        }
368
369        // when blocked
370        config.setWhenBlockedPolicy( props.getProperty( root + ".whenBlockedPolicy", "RUN" ) );
371
372        // startupsize
373        try
374        {
375            config.setStartUpSize( Integer.parseInt( props.getProperty( root + ".startUpSize", "4" ) ) );
376        }
377        catch ( NumberFormatException nfe )
378        {
379            log.error( "startUpSize not a number.", nfe );
380        }
381
382        if ( log.isInfoEnabled() )
383        {
384            log.info( root + " PoolConfiguration = " + config );
385        }
386
387        return config;
388    }
389}