View Javadoc
1   package org.apache.commons.jcs3.engine;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.IOException;
23  import java.util.concurrent.atomic.AtomicBoolean;
24  
25  import org.apache.commons.jcs3.engine.behavior.ICacheElement;
26  import org.apache.commons.jcs3.engine.behavior.ICacheEventQueue;
27  import org.apache.commons.jcs3.engine.behavior.ICacheListener;
28  import org.apache.commons.jcs3.log.Log;
29  import org.apache.commons.jcs3.log.LogManager;
30  
31  /**
32   * An abstract base class to the different implementations
33   */
34  public abstract class AbstractCacheEventQueue<K, V>
35      implements ICacheEventQueue<K, V>
36  {
37      /** The logger. */
38      private static final Log log = LogManager.getLog( AbstractCacheEventQueue.class );
39  
40      /** default */
41      protected static final int DEFAULT_WAIT_TO_DIE_MILLIS = 10000;
42  
43      /**
44       * time to wait for an event before snuffing the background thread if the queue is empty. make
45       * configurable later
46       */
47      private int waitToDieMillis = DEFAULT_WAIT_TO_DIE_MILLIS;
48  
49      /**
50       * When the events are pulled off the queue, then tell the listener to handle the specific event
51       * type. The work is done by the listener.
52       */
53      private ICacheListener<K, V> listener;
54  
55      /** Id of the listener registered with this queue */
56      private long listenerId;
57  
58      /** The cache region name, if applicable. */
59      private String cacheName;
60  
61      /** Maximum number of failures before we buy the farm. */
62      private int maxFailure;
63  
64      /** in milliseconds */
65      private int waitBeforeRetry;
66  
67      /**
68       * This means that the queue is functional. If we reached the max number of failures, the queue
69       * is marked as non functional and will never work again.
70       */
71      private final AtomicBoolean working = new AtomicBoolean(true);
72  
73      /**
74       * Returns the time to wait for events before killing the background thread.
75       * <p>
76       * @return int
77       */
78      public int getWaitToDieMillis()
79      {
80          return waitToDieMillis;
81      }
82  
83      /**
84       * Sets the time to wait for events before killing the background thread.
85       * <p>
86       * @param wtdm the ms for the q to sit idle.
87       */
88      public void setWaitToDieMillis( final int wtdm )
89      {
90          waitToDieMillis = wtdm;
91      }
92  
93      /**
94       * Creates a brief string identifying the listener and the region.
95       * <p>
96       * @return String debugging info.
97       */
98      @Override
99      public String toString()
100     {
101         return "CacheEventQueue [listenerId=" + listenerId + ", cacheName=" + cacheName + "]";
102     }
103 
104     /**
105      * @return The listenerId value
106      */
107     @Override
108     public long getListenerId()
109     {
110         return listenerId;
111     }
112 
113     /**
114      * @return the cacheName
115      */
116     protected String getCacheName()
117     {
118         return cacheName;
119     }
120 
121     /**
122      * Initializes the queue.
123      * <p>
124      * @param listener
125      * @param listenerId
126      * @param cacheName
127      * @param maxFailure
128      * @param waitBeforeRetry
129      */
130     protected void initialize( final ICacheListener<K, V> listener, final long listenerId, final String cacheName, final int maxFailure,
131                             final int waitBeforeRetry)
132     {
133         if ( listener == null )
134         {
135             throw new IllegalArgumentException( "listener must not be null" );
136         }
137 
138         this.listener = listener;
139         this.listenerId = listenerId;
140         this.cacheName = cacheName;
141         this.maxFailure = maxFailure <= 0 ? 3 : maxFailure;
142         this.waitBeforeRetry = waitBeforeRetry <= 0 ? 500 : waitBeforeRetry;
143 
144         log.debug( "Constructed: {0}", this );
145     }
146 
147     /**
148      * This adds a put event to the queue. When it is processed, the element will be put to the
149      * listener.
150      * <p>
151      * @param ce The feature to be added to the PutEvent attribute
152      * @throws IOException
153      */
154     @Override
155     public void addPutEvent( final ICacheElement<K, V> ce )
156     {
157         put( new PutEvent( ce ) );
158     }
159 
160     /**
161      * This adds a remove event to the queue. When processed the listener's remove method will be
162      * called for the key.
163      * <p>
164      * @param key The feature to be added to the RemoveEvent attribute
165      * @throws IOException
166      */
167     @Override
168     public void addRemoveEvent( final K key )
169     {
170         put( new RemoveEvent( key ) );
171     }
172 
173     /**
174      * This adds a remove all event to the queue. When it is processed, all elements will be removed
175      * from the cache.
176      */
177     @Override
178     public void addRemoveAllEvent()
179     {
180         put( new RemoveAllEvent() );
181     }
182 
183     /**
184      * This adds a dispose event to the queue. When it is processed, the cache is shut down
185      */
186     @Override
187     public void addDisposeEvent()
188     {
189         put( new DisposeEvent() );
190     }
191 
192     /**
193      * Adds an event to the queue.
194      * <p>
195      * @param event
196      */
197     protected abstract void put( AbstractCacheEvent event );
198 
199 
200     // /////////////////////////// Inner classes /////////////////////////////
201     /**
202      * Retries before declaring failure.
203      */
204     protected abstract class AbstractCacheEvent implements Runnable
205     {
206         /**
207          * Main processing method for the AbstractCacheEvent object
208          */
209         @Override
210         public void run()
211         {
212             for (int failures = 0; failures < maxFailure; failures++)
213             {
214                 try
215                 {
216                     doRun();
217                     return;
218                 }
219                 catch (final IOException e)
220                 {
221                     log.warn("Error while running event from Queue: {0}. "
222                             + "Retrying...", this, e);
223                 }
224 
225                 try
226                 {
227                     Thread.sleep( waitBeforeRetry );
228                 }
229                 catch ( final InterruptedException ie )
230                 {
231                     log.warn("Interrupted while sleeping for retry on event "
232                             + "{0}.", this, ie);
233                     break;
234                 }
235             }
236 
237             log.warn( "Dropping Event and marking Event Queue {0} as "
238                     + "non-functional.", this );
239             destroy();
240         }
241 
242         /**
243          * @throws IOException
244          */
245         protected abstract void doRun()
246             throws IOException;
247     }
248 
249     /**
250      * An element should be put in the cache.
251      */
252     protected class PutEvent
253         extends AbstractCacheEvent
254     {
255         /** The element to put to the listener */
256         private final ICacheElement<K, V> ice;
257 
258         /**
259          * Constructor for the PutEvent object.
260          * <p>
261          * @param ice
262          */
263         PutEvent( final ICacheElement<K, V> ice )
264         {
265             this.ice = ice;
266         }
267 
268         /**
269          * Call put on the listener.
270          * <p>
271          * @throws IOException
272          */
273         @Override
274         protected void doRun()
275             throws IOException
276         {
277             listener.handlePut( ice );
278         }
279 
280         /**
281          * For debugging.
282          * <p>
283          * @return Info on the key and value.
284          */
285         @Override
286         public String toString()
287         {
288             return new StringBuilder( "PutEvent for key: " )
289                     .append( ice.getKey() )
290                     .append( " value: " )
291                     .append( ice.getVal() )
292                     .toString();
293         }
294 
295     }
296 
297     /**
298      * An element should be removed from the cache.
299      */
300     protected class RemoveEvent
301         extends AbstractCacheEvent
302     {
303         /** The key to remove from the listener */
304         private final K key;
305 
306         /**
307          * Constructor for the RemoveEvent object
308          * <p>
309          * @param key
310          */
311         RemoveEvent( final K key )
312         {
313             this.key = key;
314         }
315 
316         /**
317          * Call remove on the listener.
318          * <p>
319          * @throws IOException
320          */
321         @Override
322         protected void doRun()
323             throws IOException
324         {
325             listener.handleRemove( cacheName, key );
326         }
327 
328         /**
329          * For debugging.
330          * <p>
331          * @return Info on the key to remove.
332          */
333         @Override
334         public String toString()
335         {
336             return new StringBuilder( "RemoveEvent for " )
337                     .append( key )
338                     .toString();
339         }
340 
341     }
342 
343     /**
344      * All elements should be removed from the cache when this event is processed.
345      */
346     protected class RemoveAllEvent
347         extends AbstractCacheEvent
348     {
349         /**
350          * Call removeAll on the listener.
351          * <p>
352          * @throws IOException
353          */
354         @Override
355         protected void doRun()
356             throws IOException
357         {
358             listener.handleRemoveAll( cacheName );
359         }
360 
361         /**
362          * For debugging.
363          * <p>
364          * @return The name of the event.
365          */
366         @Override
367         public String toString()
368         {
369             return "RemoveAllEvent";
370         }
371     }
372 
373     /**
374      * The cache should be disposed when this event is processed.
375      */
376     protected class DisposeEvent
377         extends AbstractCacheEvent
378     {
379         /**
380          * Called when gets to the end of the queue
381          *
382          * @throws IOException
383          */
384         @Override
385         protected void doRun()
386             throws IOException
387         {
388             listener.handleDispose( cacheName );
389         }
390 
391         /**
392          * For debugging.
393          *
394          * @return The name of the event.
395          */
396         @Override
397         public String toString()
398         {
399             return "DisposeEvent";
400         }
401     }
402 
403     /**
404      * @return whether the queue is functional.
405      */
406     @Override
407     public boolean isWorking()
408     {
409         return working.get();
410     }
411 
412     /**
413      * This means that the queue is functional. If we reached the max number of failures, the queue
414      * is marked as non functional and will never work again.
415      * <p>
416      * @param b
417      */
418     public void setWorking( final boolean b )
419     {
420         working.set(b);
421     }
422 }