1 package org.apache.commons.jcs.engine;
2
3 import java.io.IOException;
4 import java.util.concurrent.atomic.AtomicBoolean;
5
6 /*
7 * Licensed to the Apache Software Foundation (ASF) under one
8 * or more contributor license agreements. See the NOTICE file
9 * distributed with this work for additional information
10 * regarding copyright ownership. The ASF licenses this file
11 * to you under the Apache License, Version 2.0 (the
12 * "License"); you may not use this file except in compliance
13 * with the License. You may obtain a copy of the License at
14 *
15 * http://www.apache.org/licenses/LICENSE-2.0
16 *
17 * Unless required by applicable law or agreed to in writing,
18 * software distributed under the License is distributed on an
19 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
20 * KIND, either express or implied. See the License for the
21 * specific language governing permissions and limitations
22 * under the License.
23 */
24
25 import org.apache.commons.jcs.engine.behavior.ICacheElement;
26 import org.apache.commons.jcs.engine.behavior.ICacheEventQueue;
27 import org.apache.commons.jcs.engine.behavior.ICacheListener;
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30
31 /**
32 * An abstract base class to the different implementations
33 */
34 public abstract class AbstractCacheEventQueue<K, V>
35 implements ICacheEventQueue<K, V>
36 {
37 /** The logger. */
38 private static final Log log = LogFactory.getLog( AbstractCacheEventQueue.class );
39
40 /** default */
41 protected static final int DEFAULT_WAIT_TO_DIE_MILLIS = 10000;
42
43 /**
44 * time to wait for an event before snuffing the background thread if the queue is empty. make
45 * configurable later
46 */
47 private int waitToDieMillis = DEFAULT_WAIT_TO_DIE_MILLIS;
48
49 /**
50 * When the events are pulled off the queue, then tell the listener to handle the specific event
51 * type. The work is done by the listener.
52 */
53 private ICacheListener<K, V> listener;
54
55 /** Id of the listener registered with this queue */
56 private long listenerId;
57
58 /** The cache region name, if applicable. */
59 private String cacheName;
60
61 /** Maximum number of failures before we buy the farm. */
62 private int maxFailure;
63
64 /** in milliseconds */
65 private int waitBeforeRetry;
66
67 /** this is true if there is any worker thread. */
68 private final AtomicBoolean alive = new AtomicBoolean(false);
69
70 /**
71 * This means that the queue is functional. If we reached the max number of failures, the queue
72 * is marked as non functional and will never work again.
73 */
74 private final AtomicBoolean working = new AtomicBoolean(true);
75
76 /**
77 * Returns the time to wait for events before killing the background thread.
78 * <p>
79 * @return int
80 */
81 public int getWaitToDieMillis()
82 {
83 return waitToDieMillis;
84 }
85
86 /**
87 * Sets the time to wait for events before killing the background thread.
88 * <p>
89 * @param wtdm the ms for the q to sit idle.
90 */
91 public void setWaitToDieMillis( int wtdm )
92 {
93 waitToDieMillis = wtdm;
94 }
95
96 /**
97 * Creates a brief string identifying the listener and the region.
98 * <p>
99 * @return String debugging info.
100 */
101 @Override
102 public String toString()
103 {
104 return "CacheEventQueue [listenerId=" + listenerId + ", cacheName=" + cacheName + "]";
105 }
106
107 /**
108 * If they queue has an active thread it is considered alive.
109 * <p>
110 * @return The alive value
111 */
112 @Override
113 public boolean isAlive()
114 {
115 return alive.get();
116 }
117
118 /**
119 * Sets whether the queue is actively processing -- if there are working threads.
120 * <p>
121 * @param aState
122 */
123 public void setAlive( boolean aState )
124 {
125 alive.set(aState);
126 }
127
128 /**
129 * @return The listenerId value
130 */
131 @Override
132 public long getListenerId()
133 {
134 return listenerId;
135 }
136
137 /**
138 * @return the cacheName
139 */
140 protected String getCacheName()
141 {
142 return cacheName;
143 }
144
145 /**
146 * Initializes the queue.
147 * <p>
148 * @param listener
149 * @param listenerId
150 * @param cacheName
151 * @param maxFailure
152 * @param waitBeforeRetry
153 */
154 protected void initialize( ICacheListener<K, V> listener, long listenerId, String cacheName, int maxFailure,
155 int waitBeforeRetry)
156 {
157 if ( listener == null )
158 {
159 throw new IllegalArgumentException( "listener must not be null" );
160 }
161
162 this.listener = listener;
163 this.listenerId = listenerId;
164 this.cacheName = cacheName;
165 this.maxFailure = maxFailure <= 0 ? 3 : maxFailure;
166 this.waitBeforeRetry = waitBeforeRetry <= 0 ? 500 : waitBeforeRetry;
167
168 if ( log.isDebugEnabled() )
169 {
170 log.debug( "Constructed: " + this );
171 }
172 }
173
174 /**
175 * This adds a put event to the queue. When it is processed, the element will be put to the
176 * listener.
177 * <p>
178 * @param ce The feature to be added to the PutEvent attribute
179 * @throws IOException
180 */
181 @Override
182 public synchronized void addPutEvent( ICacheElement<K, V> ce )
183 throws IOException
184 {
185 if ( isWorking() )
186 {
187 put( new PutEvent( ce ) );
188 }
189 else if ( log.isWarnEnabled() )
190 {
191 log.warn( "Not enqueuing Put Event for [" + this + "] because it's non-functional." );
192 }
193 }
194
195 /**
196 * This adds a remove event to the queue. When processed the listener's remove method will be
197 * called for the key.
198 * <p>
199 * @param key The feature to be added to the RemoveEvent attribute
200 * @throws IOException
201 */
202 @Override
203 public synchronized void addRemoveEvent( K key )
204 throws IOException
205 {
206 if ( isWorking() )
207 {
208 put( new RemoveEvent( key ) );
209 }
210 else if ( log.isWarnEnabled() )
211 {
212 log.warn( "Not enqueuing Remove Event for [" + this + "] because it's non-functional." );
213 }
214 }
215
216 /**
217 * This adds a remove all event to the queue. When it is processed, all elements will be removed
218 * from the cache.
219 * <p>
220 * @throws IOException
221 */
222 @Override
223 public synchronized void addRemoveAllEvent()
224 throws IOException
225 {
226 if ( isWorking() )
227 {
228 put( new RemoveAllEvent() );
229 }
230 else if ( log.isWarnEnabled() )
231 {
232 log.warn( "Not enqueuing RemoveAll Event for [" + this + "] because it's non-functional." );
233 }
234 }
235
236 /**
237 * @throws IOException
238 */
239 @Override
240 public synchronized void addDisposeEvent()
241 throws IOException
242 {
243 if ( isWorking() )
244 {
245 put( new DisposeEvent() );
246 }
247 else if ( log.isWarnEnabled() )
248 {
249 log.warn( "Not enqueuing Dispose Event for [" + this + "] because it's non-functional." );
250 }
251 }
252
253 /**
254 * Adds an event to the queue.
255 * <p>
256 * @param event
257 */
258 protected abstract void put( AbstractCacheEvent event );
259
260
261 // /////////////////////////// Inner classes /////////////////////////////
262 /**
263 * Retries before declaring failure.
264 * <p>
265 * @author asmuts
266 */
267 protected abstract class AbstractCacheEvent implements Runnable
268 {
269 /** Number of failures encountered processing this event. */
270 int failures = 0;
271
272 /**
273 * Main processing method for the AbstractCacheEvent object
274 */
275 @Override
276 @SuppressWarnings("synthetic-access")
277 public void run()
278 {
279 try
280 {
281 doRun();
282 }
283 catch ( IOException e )
284 {
285 if ( log.isWarnEnabled() )
286 {
287 log.warn( e );
288 }
289 if ( ++failures >= maxFailure )
290 {
291 if ( log.isWarnEnabled() )
292 {
293 log.warn( "Error while running event from Queue: " + this
294 + ". Dropping Event and marking Event Queue as non-functional." );
295 }
296 setWorking( false );
297 setAlive( false );
298 return;
299 }
300 if ( log.isInfoEnabled() )
301 {
302 log.info( "Error while running event from Queue: " + this + ". Retrying..." );
303 }
304 try
305 {
306 Thread.sleep( waitBeforeRetry );
307 run();
308 }
309 catch ( InterruptedException ie )
310 {
311 if ( log.isErrorEnabled() )
312 {
313 log.warn( "Interrupted while sleeping for retry on event " + this + "." );
314 }
315 // TODO consider if this is best. maybe we should just
316 // destroy
317 setWorking( false );
318 setAlive( false );
319 }
320 }
321 }
322
323 /**
324 * @throws IOException
325 */
326 protected abstract void doRun()
327 throws IOException;
328 }
329
330 /**
331 * An element should be put in the cache.
332 * <p>
333 * @author asmuts
334 */
335 protected class PutEvent
336 extends AbstractCacheEvent
337 {
338 /** The element to put to the listener */
339 private final ICacheElement<K, V> ice;
340
341 /**
342 * Constructor for the PutEvent object.
343 * <p>
344 * @param ice
345 * @throws IOException
346 */
347 PutEvent( ICacheElement<K, V> ice )
348 throws IOException
349 {
350 this.ice = ice;
351 }
352
353 /**
354 * Call put on the listener.
355 * <p>
356 * @throws IOException
357 */
358 @Override
359 protected void doRun()
360 throws IOException
361 {
362 listener.handlePut( ice );
363 }
364
365 /**
366 * For debugging.
367 * <p>
368 * @return Info on the key and value.
369 */
370 @Override
371 public String toString()
372 {
373 return new StringBuilder( "PutEvent for key: " ).append( ice.getKey() ).append( " value: " )
374 .append( ice.getVal() ).toString();
375 }
376
377 }
378
379 /**
380 * An element should be removed from the cache.
381 * <p>
382 * @author asmuts
383 */
384 protected class RemoveEvent
385 extends AbstractCacheEvent
386 {
387 /** The key to remove from the listener */
388 private final K key;
389
390 /**
391 * Constructor for the RemoveEvent object
392 * <p>
393 * @param key
394 * @throws IOException
395 */
396 RemoveEvent( K key )
397 throws IOException
398 {
399 this.key = key;
400 }
401
402 /**
403 * Call remove on the listener.
404 * <p>
405 * @throws IOException
406 */
407 @Override
408 protected void doRun()
409 throws IOException
410 {
411 listener.handleRemove( cacheName, key );
412 }
413
414 /**
415 * For debugging.
416 * <p>
417 * @return Info on the key to remove.
418 */
419 @Override
420 public String toString()
421 {
422 return new StringBuilder( "RemoveEvent for " ).append( key ).toString();
423 }
424
425 }
426
427 /**
428 * All elements should be removed from the cache when this event is processed.
429 * <p>
430 * @author asmuts
431 */
432 protected class RemoveAllEvent
433 extends AbstractCacheEvent
434 {
435 /**
436 * Call removeAll on the listener.
437 * <p>
438 * @throws IOException
439 */
440 @Override
441 protected void doRun()
442 throws IOException
443 {
444 listener.handleRemoveAll( cacheName );
445 }
446
447 /**
448 * For debugging.
449 * <p>
450 * @return The name of the event.
451 */
452 @Override
453 public String toString()
454 {
455 return "RemoveAllEvent";
456 }
457 }
458
459 /**
460 * The cache should be disposed when this event is processed.
461 * <p>
462 * @author asmuts
463 */
464 protected class DisposeEvent
465 extends AbstractCacheEvent
466 {
467 /**
468 * Called when gets to the end of the queue
469 * <p>
470 * @throws IOException
471 */
472 @Override
473 protected void doRun()
474 throws IOException
475 {
476 listener.handleDispose( cacheName );
477 }
478
479 /**
480 * For debugging.
481 * <p>
482 * @return The name of the event.
483 */
484 @Override
485 public String toString()
486 {
487 return "DisposeEvent";
488 }
489 }
490
491 /**
492 * @return whether the queue is functional.
493 */
494 @Override
495 public boolean isWorking()
496 {
497 return working.get();
498 }
499
500 /**
501 * This means that the queue is functional. If we reached the max number of failures, the queue
502 * is marked as non functional and will never work again.
503 * <p>
504 * @param b
505 */
506 public void setWorking( boolean b )
507 {
508 working.set(b);
509 }
510 }