1 package org.apache.jcs.utils.threadpool;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.ArrayList;
23 import java.util.HashMap;
24 import java.util.Properties;
25 import java.util.concurrent.BlockingQueue;
26 import java.util.concurrent.LinkedBlockingQueue;
27 import java.util.concurrent.ThreadFactory;
28 import java.util.concurrent.ThreadPoolExecutor;
29 import java.util.concurrent.TimeUnit;
30
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.apache.jcs.utils.props.PropertyLoader;
34 import org.apache.jcs.utils.threadpool.PoolConfiguration.WhenBlockedPolicy;
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72 public class ThreadPoolManager
73 {
74
75 private static final Log log = LogFactory.getLog( ThreadPoolManager.class );
76
77
78
79
80 private static boolean useBoundary_DEFAULT = true;
81
82
83 private static int boundarySize_DEFAULT = 2000;
84
85
86 private static int maximumPoolSize_DEFAULT = 150;
87
88
89 private static int minimumPoolSize_DEFAULT = 4;
90
91
92 private static int keepAliveTime_DEFAULT = 1000 * 60 * 5;
93
94
95 private static WhenBlockedPolicy whenBlockedPolicy_DEFAULT = WhenBlockedPolicy.RUN;
96
97
98 private static int startUpSize_DEFAULT = 4;
99
100
101 private static PoolConfiguration defaultConfig;
102
103
104 public static final String DEFAULT_PROPS_FILE_NAME = "cache.ccf";
105
106
107 private static String propsFileName = null;
108
109
110 private static String PROP_NAME_ROOT = "thread_pool";
111
112
113 private static String DEFAULT_PROP_NAME_ROOT = "thread_pool.default";
114
115
116
117
118
119 private static volatile Properties props = null;
120
121
122 private static HashMap<String, ThreadPoolExecutor> pools = new HashMap<String, ThreadPoolExecutor>();
123
124
125 private static ThreadPoolManager INSTANCE = null;
126
127
128
129
130 private ThreadPoolManager()
131 {
132 configure();
133 }
134
135
136
137
138
139
140
141 private ThreadPoolExecutor createPool( PoolConfiguration config )
142 {
143 ThreadPoolExecutor pool = null;
144 BlockingQueue<Runnable> queue = null;
145 if ( config.isUseBoundary() )
146 {
147 if ( log.isDebugEnabled() )
148 {
149 log.debug( "Creating a Bounded Buffer to use for the pool" );
150 }
151
152 queue = new LinkedBlockingQueue<Runnable>(config.getBoundarySize());
153 }
154 else
155 {
156 if ( log.isDebugEnabled() )
157 {
158 log.debug( "Creating a non bounded Linked Queue to use for the pool" );
159 }
160 queue = new LinkedBlockingQueue<Runnable>();
161 }
162
163 pool = new ThreadPoolExecutor(config.getStartUpSize(), config.getMaximumPoolSize(),
164 config.getKeepAliveTime(), TimeUnit.MILLISECONDS,
165 queue, new MyThreadFactory());
166
167
168 switch (config.getWhenBlockedPolicy())
169 {
170 case ABORT:
171 pool.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
172 break;
173
174 case RUN:
175 pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
176 break;
177
178 case WAIT:
179 throw new RuntimeException("POLICY_WAIT no longer supported");
180
181 case DISCARDOLDEST:
182 pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
183 break;
184
185 default:
186 break;
187 }
188
189 pool.prestartAllCoreThreads();
190
191 return pool;
192 }
193
194
195
196
197
198
199
200 public static synchronized ThreadPoolManager getInstance()
201 {
202 if ( INSTANCE == null )
203 {
204 INSTANCE = new ThreadPoolManager();
205 }
206 return INSTANCE;
207 }
208
209
210
211
212
213
214
215
216
217
218 public ThreadPoolExecutor getPool( String name )
219 {
220 ThreadPoolExecutor pool = null;
221
222 synchronized ( pools )
223 {
224 pool = pools.get( name );
225 if ( pool == null )
226 {
227 if ( log.isDebugEnabled() )
228 {
229 log.debug( "Creating pool for name [" + name + "]" );
230 }
231 PoolConfiguration config = this.loadConfig( PROP_NAME_ROOT + "." + name );
232 pool = createPool( config );
233
234 if ( pool != null )
235 {
236 pools.put( name, pool );
237 }
238
239 if ( log.isDebugEnabled() )
240 {
241 log.debug( "PoolName = " + getPoolNames() );
242 }
243 }
244 }
245
246 return pool;
247 }
248
249
250
251
252
253
254 public ArrayList<String> getPoolNames()
255 {
256 ArrayList<String> poolNames = new ArrayList<String>();
257 synchronized ( pools )
258 {
259 poolNames.addAll(pools.keySet());
260 }
261 return poolNames;
262 }
263
264
265
266
267
268
269 public static void setPropsFileName( String propsFileName )
270 {
271 ThreadPoolManager.propsFileName = propsFileName;
272 }
273
274
275
276
277
278
279
280 public static String getPropsFileName()
281 {
282 return propsFileName;
283 }
284
285
286
287
288
289
290
291 public static void setProps( Properties props )
292 {
293 ThreadPoolManager.props = props;
294 }
295
296
297
298
299 public static Properties getProps()
300 {
301 return props;
302 }
303
304
305
306
307 protected void configure()
308 {
309 if ( log.isDebugEnabled() )
310 {
311 log.debug( "Initializing ThreadPoolManager" );
312 }
313
314 if ( props == null )
315 {
316 try
317 {
318 props = PropertyLoader.loadProperties( propsFileName );
319
320 if ( log.isDebugEnabled() )
321 {
322 log.debug( "File contained " + props.size() + " properties" );
323 }
324 }
325 catch ( Exception e )
326 {
327 log.error( "Problem loading properties. propsFileName [" + propsFileName + "]", e );
328 }
329 }
330
331 if ( props == null )
332 {
333 log.warn( "No configuration settings found. Using hardcoded default values for all pools." );
334 props = new Properties();
335 }
336
337
338
339 defaultConfig = new PoolConfiguration( useBoundary_DEFAULT, boundarySize_DEFAULT, maximumPoolSize_DEFAULT,
340 minimumPoolSize_DEFAULT, keepAliveTime_DEFAULT,
341 whenBlockedPolicy_DEFAULT, startUpSize_DEFAULT );
342
343 defaultConfig = loadConfig( DEFAULT_PROP_NAME_ROOT );
344 }
345
346
347
348
349
350
351
352 protected PoolConfiguration loadConfig( String root )
353 {
354 PoolConfiguration config = (PoolConfiguration) defaultConfig.clone();
355
356 if ( props.containsKey( root + ".useBoundary" ) )
357 {
358 try
359 {
360 config.setUseBoundary( Boolean.valueOf( (String) props.get( root + ".useBoundary" ) ).booleanValue() );
361 }
362 catch ( NumberFormatException nfe )
363 {
364 log.error( "useBoundary not a boolean.", nfe );
365 }
366 }
367
368
369 if ( props.containsKey( root + ".boundarySize" ) )
370 {
371 try
372 {
373 config.setBoundarySize( Integer.parseInt( (String) props.get( root + ".boundarySize" ) ) );
374 }
375 catch ( NumberFormatException nfe )
376 {
377 log.error( "boundarySize not a number.", nfe );
378 }
379 }
380
381
382 if ( props.containsKey( root + ".maximumPoolSize" ) )
383 {
384 try
385 {
386 config.setMaximumPoolSize( Integer.parseInt( (String) props.get( root + ".maximumPoolSize" ) ) );
387 }
388 catch ( NumberFormatException nfe )
389 {
390 log.error( "maximumPoolSize not a number.", nfe );
391 }
392 }
393
394
395 if ( props.containsKey( root + ".minimumPoolSize" ) )
396 {
397 try
398 {
399 config.setMinimumPoolSize( Integer.parseInt( (String) props.get( root + ".minimumPoolSize" ) ) );
400 }
401 catch ( NumberFormatException nfe )
402 {
403 log.error( "minimumPoolSize not a number.", nfe );
404 }
405 }
406
407
408 if ( props.containsKey( root + ".keepAliveTime" ) )
409 {
410 try
411 {
412 config.setKeepAliveTime( Integer.parseInt( (String) props.get( root + ".keepAliveTime" ) ) );
413 }
414 catch ( NumberFormatException nfe )
415 {
416 log.error( "keepAliveTime not a number.", nfe );
417 }
418 }
419
420
421 if ( props.containsKey( root + ".whenBlockedPolicy" ) )
422 {
423 config.setWhenBlockedPolicy( (String) props.get( root + ".whenBlockedPolicy" ) );
424 }
425
426
427 if ( props.containsKey( root + ".startUpSize" ) )
428 {
429 try
430 {
431 config.setStartUpSize( Integer.parseInt( (String) props.get( root + ".startUpSize" ) ) );
432 }
433 catch ( NumberFormatException nfe )
434 {
435 log.error( "startUpSize not a number.", nfe );
436 }
437 }
438
439 if ( log.isInfoEnabled() )
440 {
441 log.info( root + " PoolConfiguration = " + config );
442 }
443
444 return config;
445 }
446
447
448
449
450
451
452 protected static class MyThreadFactory
453 implements ThreadFactory
454 {
455
456
457
458
459
460
461 public Thread newThread( Runnable runner )
462 {
463 Thread t = new Thread( runner );
464 String oldName = t.getName();
465 t.setName( "JCS-ThreadPoolManager-" + oldName );
466 t.setDaemon( true );
467 return t;
468 }
469 }
470 }