1 package org.apache.commons.jcs.utils.threadpool;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.ArrayList;
23 import java.util.Properties;
24 import java.util.concurrent.BlockingQueue;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.LinkedBlockingQueue;
27 import java.util.concurrent.ThreadPoolExecutor;
28 import java.util.concurrent.TimeUnit;
29
30 import org.apache.commons.jcs.utils.threadpool.PoolConfiguration.WhenBlockedPolicy;
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69 public class ThreadPoolManager
70 {
71
72 private static final Log log = LogFactory.getLog( ThreadPoolManager.class );
73
74
75
76
77 private static boolean useBoundary_DEFAULT = true;
78
79
80 private static int boundarySize_DEFAULT = 2000;
81
82
83 private static int maximumPoolSize_DEFAULT = 150;
84
85
86 private static int minimumPoolSize_DEFAULT = 4;
87
88
89 private static int keepAliveTime_DEFAULT = 1000 * 60 * 5;
90
91
92 private static WhenBlockedPolicy whenBlockedPolicy_DEFAULT = WhenBlockedPolicy.RUN;
93
94
95 private static int startUpSize_DEFAULT = 4;
96
97
98 private static PoolConfiguration defaultConfig;
99
100
101 private static final String PROP_NAME_ROOT = "thread_pool";
102
103
104 private static final String DEFAULT_PROP_NAME_ROOT = "thread_pool.default";
105
106
107
108
109
110 private static volatile Properties props = null;
111
112
113 private static ThreadPoolManager INSTANCE = null;
114
115
116 private ConcurrentHashMap<String, ThreadPoolExecutor> pools;
117
118
119
120
121 private ThreadPoolManager()
122 {
123 this.pools = new ConcurrentHashMap<String, ThreadPoolExecutor>();
124 configure();
125 }
126
127
128
129
130
131
132
133 private ThreadPoolExecutor createPool( PoolConfiguration config )
134 {
135 BlockingQueue<Runnable> queue = null;
136 if ( config.isUseBoundary() )
137 {
138 if ( log.isDebugEnabled() )
139 {
140 log.debug( "Creating a Bounded Buffer to use for the pool" );
141 }
142
143 queue = new LinkedBlockingQueue<Runnable>(config.getBoundarySize());
144 }
145 else
146 {
147 if ( log.isDebugEnabled() )
148 {
149 log.debug( "Creating a non bounded Linked Queue to use for the pool" );
150 }
151 queue = new LinkedBlockingQueue<Runnable>();
152 }
153
154 ThreadPoolExecutor pool = new ThreadPoolExecutor(
155 config.getStartUpSize(),
156 config.getMaximumPoolSize(),
157 config.getKeepAliveTime(),
158 TimeUnit.MILLISECONDS,
159 queue,
160 new DaemonThreadFactory("JCS-ThreadPoolManager-"));
161
162
163 switch (config.getWhenBlockedPolicy())
164 {
165 case ABORT:
166 pool.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
167 break;
168
169 case RUN:
170 pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
171 break;
172
173 case WAIT:
174 throw new RuntimeException("POLICY_WAIT no longer supported");
175
176 case DISCARDOLDEST:
177 pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
178 break;
179
180 default:
181 break;
182 }
183
184 pool.prestartAllCoreThreads();
185
186 return pool;
187 }
188
189
190
191
192
193
194
195 public static synchronized ThreadPoolManager getInstance()
196 {
197 if ( INSTANCE == null )
198 {
199 INSTANCE = new ThreadPoolManager();
200 }
201 return INSTANCE;
202 }
203
204
205
206
207 public static synchronized void dispose()
208 {
209 if ( INSTANCE != null )
210 {
211 for ( String poolName : INSTANCE.getPoolNames())
212 {
213 try
214 {
215 INSTANCE.getPool(poolName).shutdownNow();
216 }
217 catch (Throwable t)
218 {
219 log.warn("Failed to close pool " + poolName, t);
220 }
221 }
222
223 INSTANCE = null;
224 }
225 }
226
227
228
229
230
231
232
233
234
235
236 public ThreadPoolExecutor getPool( String name )
237 {
238 ThreadPoolExecutor pool = pools.get( name );
239
240 if ( pool == null )
241 {
242 if ( log.isDebugEnabled() )
243 {
244 log.debug( "Creating pool for name [" + name + "]" );
245 }
246
247 PoolConfiguration config = loadConfig( PROP_NAME_ROOT + "." + name );
248 pool = createPool( config );
249 ThreadPoolExecutor _pool = pools.putIfAbsent( name, pool );
250 if (_pool != null)
251 {
252 pool = _pool;
253 }
254
255 if ( log.isDebugEnabled() )
256 {
257 log.debug( "PoolName = " + getPoolNames() );
258 }
259 }
260
261 return pool;
262 }
263
264
265
266
267
268
269 public ArrayList<String> getPoolNames()
270 {
271 return new ArrayList<String>(pools.keySet());
272 }
273
274
275
276
277
278
279
280 public static void setProps( Properties props )
281 {
282 ThreadPoolManager.props = props;
283 }
284
285
286
287
288 private static void configure()
289 {
290 if ( log.isDebugEnabled() )
291 {
292 log.debug( "Initializing ThreadPoolManager" );
293 }
294
295 if ( props == null )
296 {
297 log.warn( "No configuration settings found. Using hardcoded default values for all pools." );
298 props = new Properties();
299 }
300
301
302
303 defaultConfig = new PoolConfiguration( useBoundary_DEFAULT, boundarySize_DEFAULT, maximumPoolSize_DEFAULT,
304 minimumPoolSize_DEFAULT, keepAliveTime_DEFAULT,
305 whenBlockedPolicy_DEFAULT, startUpSize_DEFAULT );
306
307 defaultConfig = loadConfig( DEFAULT_PROP_NAME_ROOT );
308 }
309
310
311
312
313
314
315
316 private static PoolConfiguration loadConfig( String root )
317 {
318 PoolConfiguration config = defaultConfig.clone();
319
320 try
321 {
322 config.setUseBoundary( Boolean.parseBoolean( props.getProperty( root + ".useBoundary", "false" ) ) );
323 }
324 catch ( NumberFormatException nfe )
325 {
326 log.error( "useBoundary not a boolean.", nfe );
327 }
328
329
330 try
331 {
332 config.setBoundarySize( Integer.parseInt( props.getProperty( root + ".boundarySize", "2000" ) ) );
333 }
334 catch ( NumberFormatException nfe )
335 {
336 log.error( "boundarySize not a number.", nfe );
337 }
338
339
340 try
341 {
342 config.setMaximumPoolSize( Integer.parseInt( props.getProperty( root + ".maximumPoolSize", "150" ) ) );
343 }
344 catch ( NumberFormatException nfe )
345 {
346 log.error( "maximumPoolSize not a number.", nfe );
347 }
348
349
350 try
351 {
352 config.setMinimumPoolSize( Integer.parseInt( props.getProperty( root + ".minimumPoolSize", "4" ) ) );
353 }
354 catch ( NumberFormatException nfe )
355 {
356 log.error( "minimumPoolSize not a number.", nfe );
357 }
358
359
360 try
361 {
362 config.setKeepAliveTime( Integer.parseInt( props.getProperty( root + ".keepAliveTime", "300000" ) ) );
363 }
364 catch ( NumberFormatException nfe )
365 {
366 log.error( "keepAliveTime not a number.", nfe );
367 }
368
369
370 config.setWhenBlockedPolicy( props.getProperty( root + ".whenBlockedPolicy", "RUN" ) );
371
372
373 try
374 {
375 config.setStartUpSize( Integer.parseInt( props.getProperty( root + ".startUpSize", "4" ) ) );
376 }
377 catch ( NumberFormatException nfe )
378 {
379 log.error( "startUpSize not a number.", nfe );
380 }
381
382 if ( log.isInfoEnabled() )
383 {
384 log.info( root + " PoolConfiguration = " + config );
385 }
386
387 return config;
388 }
389 }