View Javadoc
1   package org.apache.commons.jcs.engine;
2   
3   import java.io.IOException;
4   import java.util.concurrent.atomic.AtomicBoolean;
5   
6   /*
7    * Licensed to the Apache Software Foundation (ASF) under one
8    * or more contributor license agreements.  See the NOTICE file
9    * distributed with this work for additional information
10   * regarding copyright ownership.  The ASF licenses this file
11   * to you under the Apache License, Version 2.0 (the
12   * "License"); you may not use this file except in compliance
13   * with the License.  You may obtain a copy of the License at
14   *
15   *   http://www.apache.org/licenses/LICENSE-2.0
16   *
17   * Unless required by applicable law or agreed to in writing,
18   * software distributed under the License is distributed on an
19   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
20   * KIND, either express or implied.  See the License for the
21   * specific language governing permissions and limitations
22   * under the License.
23   */
24  
25  import org.apache.commons.jcs.engine.behavior.ICacheElement;
26  import org.apache.commons.jcs.engine.behavior.ICacheEventQueue;
27  import org.apache.commons.jcs.engine.behavior.ICacheListener;
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  
31  /**
32   * An abstract base class to the different implementations
33   */
34  public abstract class AbstractCacheEventQueue<K, V>
35      implements ICacheEventQueue<K, V>
36  {
37      /** The logger. */
38      private static final Log log = LogFactory.getLog( AbstractCacheEventQueue.class );
39  
40      /** default */
41      protected static final int DEFAULT_WAIT_TO_DIE_MILLIS = 10000;
42  
43      /**
44       * time to wait for an event before snuffing the background thread if the queue is empty. make
45       * configurable later
46       */
47      private int waitToDieMillis = DEFAULT_WAIT_TO_DIE_MILLIS;
48  
49      /**
50       * When the events are pulled off the queue, then tell the listener to handle the specific event
51       * type. The work is done by the listener.
52       */
53      private ICacheListener<K, V> listener;
54  
55      /** Id of the listener registered with this queue */
56      private long listenerId;
57  
58      /** The cache region name, if applicable. */
59      private String cacheName;
60  
61      /** Maximum number of failures before we buy the farm. */
62      private int maxFailure;
63  
64      /** in milliseconds */
65      private int waitBeforeRetry;
66  
67      /** this is true if there is any worker thread. */
68      private final AtomicBoolean alive = new AtomicBoolean(false);
69  
70      /**
71       * This means that the queue is functional. If we reached the max number of failures, the queue
72       * is marked as non functional and will never work again.
73       */
74      private final AtomicBoolean working = new AtomicBoolean(true);
75  
76      /**
77       * Returns the time to wait for events before killing the background thread.
78       * <p>
79       * @return int
80       */
81      public int getWaitToDieMillis()
82      {
83          return waitToDieMillis;
84      }
85  
86      /**
87       * Sets the time to wait for events before killing the background thread.
88       * <p>
89       * @param wtdm the ms for the q to sit idle.
90       */
91      public void setWaitToDieMillis( int wtdm )
92      {
93          waitToDieMillis = wtdm;
94      }
95  
96      /**
97       * Creates a brief string identifying the listener and the region.
98       * <p>
99       * @return String debugging info.
100      */
101     @Override
102     public String toString()
103     {
104         return "CacheEventQueue [listenerId=" + listenerId + ", cacheName=" + cacheName + "]";
105     }
106 
107     /**
108      * If they queue has an active thread it is considered alive.
109      * <p>
110      * @return The alive value
111      */
112     @Override
113     public boolean isAlive()
114     {
115         return alive.get();
116     }
117 
118     /**
119      * Sets whether the queue is actively processing -- if there are working threads.
120      * <p>
121      * @param aState
122      */
123     public void setAlive( boolean aState )
124     {
125         alive.set(aState);
126     }
127 
128     /**
129      * @return The listenerId value
130      */
131     @Override
132     public long getListenerId()
133     {
134         return listenerId;
135     }
136 
137     /**
138      * @return the cacheName
139      */
140     protected String getCacheName()
141     {
142         return cacheName;
143     }
144 
145     /**
146      * Initializes the queue.
147      * <p>
148      * @param listener
149      * @param listenerId
150      * @param cacheName
151      * @param maxFailure
152      * @param waitBeforeRetry
153      */
154     protected void initialize( ICacheListener<K, V> listener, long listenerId, String cacheName, int maxFailure,
155                             int waitBeforeRetry)
156     {
157         if ( listener == null )
158         {
159             throw new IllegalArgumentException( "listener must not be null" );
160         }
161 
162         this.listener = listener;
163         this.listenerId = listenerId;
164         this.cacheName = cacheName;
165         this.maxFailure = maxFailure <= 0 ? 3 : maxFailure;
166         this.waitBeforeRetry = waitBeforeRetry <= 0 ? 500 : waitBeforeRetry;
167 
168         if ( log.isDebugEnabled() )
169         {
170             log.debug( "Constructed: " + this );
171         }
172     }
173 
174     /**
175      * This adds a put event to the queue. When it is processed, the element will be put to the
176      * listener.
177      * <p>
178      * @param ce The feature to be added to the PutEvent attribute
179      * @throws IOException
180      */
181     @Override
182     public synchronized void addPutEvent( ICacheElement<K, V> ce )
183         throws IOException
184     {
185         if ( isWorking() )
186         {
187             put( new PutEvent( ce ) );
188         }
189         else if ( log.isWarnEnabled() )
190         {
191             log.warn( "Not enqueuing Put Event for [" + this + "] because it's non-functional." );
192         }
193     }
194 
195     /**
196      * This adds a remove event to the queue. When processed the listener's remove method will be
197      * called for the key.
198      * <p>
199      * @param key The feature to be added to the RemoveEvent attribute
200      * @throws IOException
201      */
202     @Override
203     public synchronized void addRemoveEvent( K key )
204         throws IOException
205     {
206         if ( isWorking() )
207         {
208             put( new RemoveEvent( key ) );
209         }
210         else if ( log.isWarnEnabled() )
211         {
212             log.warn( "Not enqueuing Remove Event for [" + this + "] because it's non-functional." );
213         }
214     }
215 
216     /**
217      * This adds a remove all event to the queue. When it is processed, all elements will be removed
218      * from the cache.
219      * <p>
220      * @throws IOException
221      */
222     @Override
223     public synchronized void addRemoveAllEvent()
224         throws IOException
225     {
226         if ( isWorking() )
227         {
228             put( new RemoveAllEvent() );
229         }
230         else if ( log.isWarnEnabled() )
231         {
232             log.warn( "Not enqueuing RemoveAll Event for [" + this + "] because it's non-functional." );
233         }
234     }
235 
236     /**
237      * @throws IOException
238      */
239     @Override
240     public synchronized void addDisposeEvent()
241         throws IOException
242     {
243         if ( isWorking() )
244         {
245             put( new DisposeEvent() );
246         }
247         else if ( log.isWarnEnabled() )
248         {
249             log.warn( "Not enqueuing Dispose Event for [" + this + "] because it's non-functional." );
250         }
251     }
252 
253     /**
254      * Adds an event to the queue.
255      * <p>
256      * @param event
257      */
258     protected abstract void put( AbstractCacheEvent event );
259 
260 
261     // /////////////////////////// Inner classes /////////////////////////////
262     /**
263      * Retries before declaring failure.
264      * <p>
265      * @author asmuts
266      */
267     protected abstract class AbstractCacheEvent implements Runnable
268     {
269         /** Number of failures encountered processing this event. */
270         int failures = 0;
271 
272         /**
273          * Main processing method for the AbstractCacheEvent object
274          */
275         @Override
276         @SuppressWarnings("synthetic-access")
277         public void run()
278         {
279             try
280             {
281                 doRun();
282             }
283             catch ( IOException e )
284             {
285                 if ( log.isWarnEnabled() )
286                 {
287                     log.warn( e );
288                 }
289                 if ( ++failures >= maxFailure )
290                 {
291                     if ( log.isWarnEnabled() )
292                     {
293                         log.warn( "Error while running event from Queue: " + this
294                             + ". Dropping Event and marking Event Queue as non-functional." );
295                     }
296                     setWorking( false );
297                     setAlive( false );
298                     return;
299                 }
300                 if ( log.isInfoEnabled() )
301                 {
302                     log.info( "Error while running event from Queue: " + this + ". Retrying..." );
303                 }
304                 try
305                 {
306                     Thread.sleep( waitBeforeRetry );
307                     run();
308                 }
309                 catch ( InterruptedException ie )
310                 {
311                     if ( log.isErrorEnabled() )
312                     {
313                         log.warn( "Interrupted while sleeping for retry on event " + this + "." );
314                     }
315                     // TODO consider if this is best. maybe we should just
316                     // destroy
317                     setWorking( false );
318                     setAlive( false );
319                 }
320             }
321         }
322 
323         /**
324          * @throws IOException
325          */
326         protected abstract void doRun()
327             throws IOException;
328     }
329 
330     /**
331      * An element should be put in the cache.
332      * <p>
333      * @author asmuts
334      */
335     protected class PutEvent
336         extends AbstractCacheEvent
337     {
338         /** The element to put to the listener */
339         private final ICacheElement<K, V> ice;
340 
341         /**
342          * Constructor for the PutEvent object.
343          * <p>
344          * @param ice
345          * @throws IOException
346          */
347         PutEvent( ICacheElement<K, V> ice )
348             throws IOException
349         {
350             this.ice = ice;
351         }
352 
353         /**
354          * Call put on the listener.
355          * <p>
356          * @throws IOException
357          */
358         @Override
359         protected void doRun()
360             throws IOException
361         {
362             listener.handlePut( ice );
363         }
364 
365         /**
366          * For debugging.
367          * <p>
368          * @return Info on the key and value.
369          */
370         @Override
371         public String toString()
372         {
373             return new StringBuilder( "PutEvent for key: " ).append( ice.getKey() ).append( " value: " )
374                 .append( ice.getVal() ).toString();
375         }
376 
377     }
378 
379     /**
380      * An element should be removed from the cache.
381      * <p>
382      * @author asmuts
383      */
384     protected class RemoveEvent
385         extends AbstractCacheEvent
386     {
387         /** The key to remove from the listener */
388         private final K key;
389 
390         /**
391          * Constructor for the RemoveEvent object
392          * <p>
393          * @param key
394          * @throws IOException
395          */
396         RemoveEvent( K key )
397             throws IOException
398         {
399             this.key = key;
400         }
401 
402         /**
403          * Call remove on the listener.
404          * <p>
405          * @throws IOException
406          */
407         @Override
408         protected void doRun()
409             throws IOException
410         {
411             listener.handleRemove( cacheName, key );
412         }
413 
414         /**
415          * For debugging.
416          * <p>
417          * @return Info on the key to remove.
418          */
419         @Override
420         public String toString()
421         {
422             return new StringBuilder( "RemoveEvent for " ).append( key ).toString();
423         }
424 
425     }
426 
427     /**
428      * All elements should be removed from the cache when this event is processed.
429      * <p>
430      * @author asmuts
431      */
432     protected class RemoveAllEvent
433         extends AbstractCacheEvent
434     {
435         /**
436          * Call removeAll on the listener.
437          * <p>
438          * @throws IOException
439          */
440         @Override
441         protected void doRun()
442             throws IOException
443         {
444             listener.handleRemoveAll( cacheName );
445         }
446 
447         /**
448          * For debugging.
449          * <p>
450          * @return The name of the event.
451          */
452         @Override
453         public String toString()
454         {
455             return "RemoveAllEvent";
456         }
457     }
458 
459     /**
460      * The cache should be disposed when this event is processed.
461      * <p>
462      * @author asmuts
463      */
464     protected class DisposeEvent
465         extends AbstractCacheEvent
466     {
467         /**
468          * Called when gets to the end of the queue
469          * <p>
470          * @throws IOException
471          */
472         @Override
473         protected void doRun()
474             throws IOException
475         {
476             listener.handleDispose( cacheName );
477         }
478 
479         /**
480          * For debugging.
481          * <p>
482          * @return The name of the event.
483          */
484         @Override
485         public String toString()
486         {
487             return "DisposeEvent";
488         }
489     }
490 
491     /**
492      * @return whether the queue is functional.
493      */
494     @Override
495     public boolean isWorking()
496     {
497         return working.get();
498     }
499 
500     /**
501      * This means that the queue is functional. If we reached the max number of failures, the queue
502      * is marked as non functional and will never work again.
503      * <p>
504      * @param b
505      */
506     public void setWorking( boolean b )
507     {
508         working.set(b);
509     }
510 }