1 package org.apache.commons.jcs.engine;
2
3 import java.io.IOException;
4 import java.util.concurrent.atomic.AtomicBoolean;
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25 import org.apache.commons.jcs.engine.behavior.ICacheElement;
26 import org.apache.commons.jcs.engine.behavior.ICacheEventQueue;
27 import org.apache.commons.jcs.engine.behavior.ICacheListener;
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30
31
32
33
34 public abstract class AbstractCacheEventQueue<K, V>
35 implements ICacheEventQueue<K, V>
36 {
37
38 private static final Log log = LogFactory.getLog( AbstractCacheEventQueue.class );
39
40
41 protected static final int DEFAULT_WAIT_TO_DIE_MILLIS = 10000;
42
43
44
45
46
47 private int waitToDieMillis = DEFAULT_WAIT_TO_DIE_MILLIS;
48
49
50
51
52
53 private ICacheListener<K, V> listener;
54
55
56 private long listenerId;
57
58
59 private String cacheName;
60
61
62 private int maxFailure;
63
64
65 private int waitBeforeRetry;
66
67
68 private final AtomicBoolean alive = new AtomicBoolean(false);
69
70
71
72
73
74 private final AtomicBoolean working = new AtomicBoolean(true);
75
76
77
78
79
80
81 public int getWaitToDieMillis()
82 {
83 return waitToDieMillis;
84 }
85
86
87
88
89
90
91 public void setWaitToDieMillis( int wtdm )
92 {
93 waitToDieMillis = wtdm;
94 }
95
96
97
98
99
100
101 @Override
102 public String toString()
103 {
104 return "CacheEventQueue [listenerId=" + listenerId + ", cacheName=" + cacheName + "]";
105 }
106
107
108
109
110
111
112 @Override
113 public boolean isAlive()
114 {
115 return alive.get();
116 }
117
118
119
120
121
122
123 public void setAlive( boolean aState )
124 {
125 alive.set(aState);
126 }
127
128
129
130
131 @Override
132 public long getListenerId()
133 {
134 return listenerId;
135 }
136
137
138
139
140 protected String getCacheName()
141 {
142 return cacheName;
143 }
144
145
146
147
148
149
150
151
152
153
154 protected void initialize( ICacheListener<K, V> listener, long listenerId, String cacheName, int maxFailure,
155 int waitBeforeRetry)
156 {
157 if ( listener == null )
158 {
159 throw new IllegalArgumentException( "listener must not be null" );
160 }
161
162 this.listener = listener;
163 this.listenerId = listenerId;
164 this.cacheName = cacheName;
165 this.maxFailure = maxFailure <= 0 ? 3 : maxFailure;
166 this.waitBeforeRetry = waitBeforeRetry <= 0 ? 500 : waitBeforeRetry;
167
168 if ( log.isDebugEnabled() )
169 {
170 log.debug( "Constructed: " + this );
171 }
172 }
173
174
175
176
177
178
179
180
181 @Override
182 public synchronized void addPutEvent( ICacheElement<K, V> ce )
183 throws IOException
184 {
185 if ( isWorking() )
186 {
187 put( new PutEvent( ce ) );
188 }
189 else if ( log.isWarnEnabled() )
190 {
191 log.warn( "Not enqueuing Put Event for [" + this + "] because it's non-functional." );
192 }
193 }
194
195
196
197
198
199
200
201
202 @Override
203 public synchronized void addRemoveEvent( K key )
204 throws IOException
205 {
206 if ( isWorking() )
207 {
208 put( new RemoveEvent( key ) );
209 }
210 else if ( log.isWarnEnabled() )
211 {
212 log.warn( "Not enqueuing Remove Event for [" + this + "] because it's non-functional." );
213 }
214 }
215
216
217
218
219
220
221
222 @Override
223 public synchronized void addRemoveAllEvent()
224 throws IOException
225 {
226 if ( isWorking() )
227 {
228 put( new RemoveAllEvent() );
229 }
230 else if ( log.isWarnEnabled() )
231 {
232 log.warn( "Not enqueuing RemoveAll Event for [" + this + "] because it's non-functional." );
233 }
234 }
235
236
237
238
239 @Override
240 public synchronized void addDisposeEvent()
241 throws IOException
242 {
243 if ( isWorking() )
244 {
245 put( new DisposeEvent() );
246 }
247 else if ( log.isWarnEnabled() )
248 {
249 log.warn( "Not enqueuing Dispose Event for [" + this + "] because it's non-functional." );
250 }
251 }
252
253
254
255
256
257
258 protected abstract void put( AbstractCacheEvent event );
259
260
261
262
263
264
265
266
267 protected abstract class AbstractCacheEvent implements Runnable
268 {
269
270 int failures = 0;
271
272
273
274
275 @Override
276 @SuppressWarnings("synthetic-access")
277 public void run()
278 {
279 try
280 {
281 doRun();
282 }
283 catch ( IOException e )
284 {
285 if ( log.isWarnEnabled() )
286 {
287 log.warn( e );
288 }
289 if ( ++failures >= maxFailure )
290 {
291 if ( log.isWarnEnabled() )
292 {
293 log.warn( "Error while running event from Queue: " + this
294 + ". Dropping Event and marking Event Queue as non-functional." );
295 }
296 setWorking( false );
297 setAlive( false );
298 return;
299 }
300 if ( log.isInfoEnabled() )
301 {
302 log.info( "Error while running event from Queue: " + this + ". Retrying..." );
303 }
304 try
305 {
306 Thread.sleep( waitBeforeRetry );
307 run();
308 }
309 catch ( InterruptedException ie )
310 {
311 if ( log.isErrorEnabled() )
312 {
313 log.warn( "Interrupted while sleeping for retry on event " + this + "." );
314 }
315
316
317 setWorking( false );
318 setAlive( false );
319 }
320 }
321 }
322
323
324
325
326 protected abstract void doRun()
327 throws IOException;
328 }
329
330
331
332
333
334
335 protected class PutEvent
336 extends AbstractCacheEvent
337 {
338
339 private final ICacheElement<K, V> ice;
340
341
342
343
344
345
346
347 PutEvent( ICacheElement<K, V> ice )
348 throws IOException
349 {
350 this.ice = ice;
351 }
352
353
354
355
356
357
358 @Override
359 protected void doRun()
360 throws IOException
361 {
362 listener.handlePut( ice );
363 }
364
365
366
367
368
369
370 @Override
371 public String toString()
372 {
373 return new StringBuilder( "PutEvent for key: " ).append( ice.getKey() ).append( " value: " )
374 .append( ice.getVal() ).toString();
375 }
376
377 }
378
379
380
381
382
383
384 protected class RemoveEvent
385 extends AbstractCacheEvent
386 {
387
388 private final K key;
389
390
391
392
393
394
395
396 RemoveEvent( K key )
397 throws IOException
398 {
399 this.key = key;
400 }
401
402
403
404
405
406
407 @Override
408 protected void doRun()
409 throws IOException
410 {
411 listener.handleRemove( cacheName, key );
412 }
413
414
415
416
417
418
419 @Override
420 public String toString()
421 {
422 return new StringBuilder( "RemoveEvent for " ).append( key ).toString();
423 }
424
425 }
426
427
428
429
430
431
432 protected class RemoveAllEvent
433 extends AbstractCacheEvent
434 {
435
436
437
438
439
440 @Override
441 protected void doRun()
442 throws IOException
443 {
444 listener.handleRemoveAll( cacheName );
445 }
446
447
448
449
450
451
452 @Override
453 public String toString()
454 {
455 return "RemoveAllEvent";
456 }
457 }
458
459
460
461
462
463
464 protected class DisposeEvent
465 extends AbstractCacheEvent
466 {
467
468
469
470
471
472 @Override
473 protected void doRun()
474 throws IOException
475 {
476 listener.handleDispose( cacheName );
477 }
478
479
480
481
482
483
484 @Override
485 public String toString()
486 {
487 return "DisposeEvent";
488 }
489 }
490
491
492
493
494 @Override
495 public boolean isWorking()
496 {
497 return working.get();
498 }
499
500
501
502
503
504
505
506 public void setWorking( boolean b )
507 {
508 working.set(b);
509 }
510 }