1 package org.apache.commons.jcs3.engine;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.IOException;
23 import java.util.concurrent.atomic.AtomicBoolean;
24
25 import org.apache.commons.jcs3.engine.behavior.ICacheElement;
26 import org.apache.commons.jcs3.engine.behavior.ICacheEventQueue;
27 import org.apache.commons.jcs3.engine.behavior.ICacheListener;
28 import org.apache.commons.jcs3.log.Log;
29 import org.apache.commons.jcs3.log.LogManager;
30
31
32
33
34 public abstract class AbstractCacheEventQueue<K, V>
35 implements ICacheEventQueue<K, V>
36 {
37
38 private static final Log log = LogManager.getLog( AbstractCacheEventQueue.class );
39
40
41 protected static final int DEFAULT_WAIT_TO_DIE_MILLIS = 10000;
42
43
44
45
46
47 private int waitToDieMillis = DEFAULT_WAIT_TO_DIE_MILLIS;
48
49
50
51
52
53 private ICacheListener<K, V> listener;
54
55
56 private long listenerId;
57
58
59 private String cacheName;
60
61
62 private int maxFailure;
63
64
65 private int waitBeforeRetry;
66
67
68
69
70
71 private final AtomicBoolean working = new AtomicBoolean(true);
72
73
74
75
76
77
78 public int getWaitToDieMillis()
79 {
80 return waitToDieMillis;
81 }
82
83
84
85
86
87
88 public void setWaitToDieMillis( final int wtdm )
89 {
90 waitToDieMillis = wtdm;
91 }
92
93
94
95
96
97
98 @Override
99 public String toString()
100 {
101 return "CacheEventQueue [listenerId=" + listenerId + ", cacheName=" + cacheName + "]";
102 }
103
104
105
106
107 @Override
108 public long getListenerId()
109 {
110 return listenerId;
111 }
112
113
114
115
116 protected String getCacheName()
117 {
118 return cacheName;
119 }
120
121
122
123
124
125
126
127
128
129
130 protected void initialize( final ICacheListener<K, V> listener, final long listenerId, final String cacheName, final int maxFailure,
131 final int waitBeforeRetry)
132 {
133 if ( listener == null )
134 {
135 throw new IllegalArgumentException( "listener must not be null" );
136 }
137
138 this.listener = listener;
139 this.listenerId = listenerId;
140 this.cacheName = cacheName;
141 this.maxFailure = maxFailure <= 0 ? 3 : maxFailure;
142 this.waitBeforeRetry = waitBeforeRetry <= 0 ? 500 : waitBeforeRetry;
143
144 log.debug( "Constructed: {0}", this );
145 }
146
147
148
149
150
151
152
153
154 @Override
155 public void addPutEvent( final ICacheElement<K, V> ce )
156 {
157 put( new PutEvent( ce ) );
158 }
159
160
161
162
163
164
165
166
167 @Override
168 public void addRemoveEvent( final K key )
169 {
170 put( new RemoveEvent( key ) );
171 }
172
173
174
175
176
177 @Override
178 public void addRemoveAllEvent()
179 {
180 put( new RemoveAllEvent() );
181 }
182
183
184
185
186 @Override
187 public void addDisposeEvent()
188 {
189 put( new DisposeEvent() );
190 }
191
192
193
194
195
196
197 protected abstract void put( AbstractCacheEvent event );
198
199
200
201
202
203
204 protected abstract class AbstractCacheEvent implements Runnable
205 {
206
207
208
209 @Override
210 public void run()
211 {
212 for (int failures = 0; failures < maxFailure; failures++)
213 {
214 try
215 {
216 doRun();
217 return;
218 }
219 catch (final IOException e)
220 {
221 log.warn("Error while running event from Queue: {0}. "
222 + "Retrying...", this, e);
223 }
224
225 try
226 {
227 Thread.sleep( waitBeforeRetry );
228 }
229 catch ( final InterruptedException ie )
230 {
231 log.warn("Interrupted while sleeping for retry on event "
232 + "{0}.", this, ie);
233 break;
234 }
235 }
236
237 log.warn( "Dropping Event and marking Event Queue {0} as "
238 + "non-functional.", this );
239 destroy();
240 }
241
242
243
244
245 protected abstract void doRun()
246 throws IOException;
247 }
248
249
250
251
252 protected class PutEvent
253 extends AbstractCacheEvent
254 {
255
256 private final ICacheElement<K, V> ice;
257
258
259
260
261
262
263 PutEvent( final ICacheElement<K, V> ice )
264 {
265 this.ice = ice;
266 }
267
268
269
270
271
272
273 @Override
274 protected void doRun()
275 throws IOException
276 {
277 listener.handlePut( ice );
278 }
279
280
281
282
283
284
285 @Override
286 public String toString()
287 {
288 return new StringBuilder( "PutEvent for key: " )
289 .append( ice.getKey() )
290 .append( " value: " )
291 .append( ice.getVal() )
292 .toString();
293 }
294
295 }
296
297
298
299
300 protected class RemoveEvent
301 extends AbstractCacheEvent
302 {
303
304 private final K key;
305
306
307
308
309
310
311 RemoveEvent( final K key )
312 {
313 this.key = key;
314 }
315
316
317
318
319
320
321 @Override
322 protected void doRun()
323 throws IOException
324 {
325 listener.handleRemove( cacheName, key );
326 }
327
328
329
330
331
332
333 @Override
334 public String toString()
335 {
336 return new StringBuilder( "RemoveEvent for " )
337 .append( key )
338 .toString();
339 }
340
341 }
342
343
344
345
346 protected class RemoveAllEvent
347 extends AbstractCacheEvent
348 {
349
350
351
352
353
354 @Override
355 protected void doRun()
356 throws IOException
357 {
358 listener.handleRemoveAll( cacheName );
359 }
360
361
362
363
364
365
366 @Override
367 public String toString()
368 {
369 return "RemoveAllEvent";
370 }
371 }
372
373
374
375
376 protected class DisposeEvent
377 extends AbstractCacheEvent
378 {
379
380
381
382
383
384 @Override
385 protected void doRun()
386 throws IOException
387 {
388 listener.handleDispose( cacheName );
389 }
390
391
392
393
394
395
396 @Override
397 public String toString()
398 {
399 return "DisposeEvent";
400 }
401 }
402
403
404
405
406 @Override
407 public boolean isWorking()
408 {
409 return working.get();
410 }
411
412
413
414
415
416
417
418 public void setWorking( final boolean b )
419 {
420 working.set(b);
421 }
422 }