View Javadoc

1   package org.apache.jcs.engine.control.event;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.IOException;
23  
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.jcs.engine.control.event.behavior.IElementEvent;
27  import org.apache.jcs.engine.control.event.behavior.IElementEventHandler;
28  import org.apache.jcs.engine.control.event.behavior.IElementEventQueue;
29  
30  /**
31   * An event queue is used to propagate ordered cache events to one and only one target listener.
32   */
33  public class ElementEventQueue
34      implements IElementEventQueue
35  {
36      /** The logger */
37      protected final static Log log = LogFactory.getLog( ElementEventQueue.class );
38  
39      /** The cache (region) name. */
40      protected final String cacheName;
41  
42      /** default */
43      private static final int DEFAULT_WAIT_TO_DIE_MILLIS = 10000;
44  
45      /**
46       * time to wait for an event before snuffing the background thread if the queue is empty. make
47       * configurable later
48       */
49      private int waitToDieMillis = DEFAULT_WAIT_TO_DIE_MILLIS;
50  
51      /** shutdown or not */
52      private boolean destroyed = false;
53  
54      /** The worker thread. */
55      private Thread processorThread;
56  
57      /** Internal queue implementation */
58      protected final Object queueLock = new Object();
59  
60      /** Dummy node */
61      private Node head = new Node();
62  
63      /** tail of the doubly linked list */
64      private Node tail = head;
65  
66      /** Number of items in the queue */
67      private int size = 0;
68  
69      /**
70       * Constructor for the ElementEventQueue object
71       * <p>
72       * @param cacheName
73       */
74      public ElementEventQueue( String cacheName )
75      {
76          this.cacheName = cacheName;
77  
78          processorThread = new QProcessor( this );
79          processorThread.start();
80  
81          if ( log.isDebugEnabled() )
82          {
83              log.debug( "Constructed: " + this );
84          }
85      }
86  
87      /**
88       * Event Q is empty.
89       */
90      public void destroy()
91      {
92          synchronized ( queueLock )
93          {
94              if ( !destroyed )
95              {
96                  destroyed = true;
97  
98                  // synchronize on queue so the thread will not wait forever,
99                  // and then interrupt the QueueProcessor
100                 processorThread.interrupt();
101                 processorThread = null;
102 
103                 if ( log.isInfoEnabled() )
104                 {
105                     log.info( "Element event queue destroyed: " + this );
106                 }
107             }
108         }
109     }
110 
111     /**
112      * Kill the processor thread and indicate that the queue is destroyed and no longer alive, but it
113      * can still be working.
114      */
115     public void stopProcessing()
116     {
117         synchronized ( queueLock )
118         {
119             destroyed = true;
120             processorThread = null;
121         }
122     }
123 
124     /**
125      * Returns the time to wait for events before killing the background thread.
126      * <p>
127      * @return int
128      */
129     public int getWaitToDieMillis()
130     {
131         return waitToDieMillis;
132     }
133 
134     /**
135      * Sets the time to wait for events before killing the background thread.
136      * <p>
137      * @param wtdm the ms for the q to sit idle.
138      */
139     public void setWaitToDieMillis( int wtdm )
140     {
141         waitToDieMillis = wtdm;
142     }
143 
144     /**
145      * @return the region name for the event queue
146      */
147     @Override
148     public String toString()
149     {
150         return "cacheName=" + cacheName;
151     }
152 
153     /**
154      * Returns the number of elements in the queue.
155      * <p>
156      * @return number of items in the queue.
157      */
158     public int size()
159     {
160         return size;
161     }
162 
163     /**
164      * @return The destroyed value
165      */
166     public boolean isAlive()
167     {
168         return ( !destroyed );
169     }
170 
171     /**
172      * Adds an ElementEvent to be handled
173      * @param hand The IElementEventHandler
174      * @param event The IElementEventHandler IElementEvent event
175      * @throws IOException
176      */
177     public void addElementEvent( IElementEventHandler hand, IElementEvent event )
178         throws IOException
179     {
180 
181         if ( log.isDebugEnabled() )
182         {
183             log.debug( "Adding Event Handler to QUEUE, !destroyed = " + !destroyed );
184         }
185 
186         if ( !destroyed )
187         {
188             ElementEventRunner runner = new ElementEventRunner( hand, event );
189 
190             if ( log.isDebugEnabled() )
191             {
192                 log.debug( "runner = " + runner );
193             }
194 
195             put( runner );
196         }
197     }
198 
199     /**
200      * Adds an event to the queue.
201      * @param event
202      */
203     private void put( AbstractElementEventRunner event )
204     {
205         Node newNode = new Node();
206 
207         newNode.event = event;
208 
209         synchronized ( queueLock )
210         {
211             size++;
212             tail.next = newNode;
213             tail = newNode;
214             if ( !isAlive() )
215             {
216                 destroyed = false;
217                 processorThread = new QProcessor( this );
218                 processorThread.start();
219             }
220             else
221             {
222                 queueLock.notify();
223             }
224             queueLock.notify();
225         }
226     }
227 
228     /**
229      * Returns the next item on the queue, or waits if empty.
230      * <p>
231      * @return AbstractElementEventRunner
232      */
233     protected AbstractElementEventRunner take()
234     {
235         synchronized ( queueLock )
236         {
237             // wait until there is something to read
238             if ( head == tail )
239             {
240                 return null;
241             }
242 
243             Node node = head.next;
244 
245             AbstractElementEventRunner value = node.event;
246 
247             if ( log.isDebugEnabled() )
248             {
249                 log.debug( "head.event = " + head.event );
250                 log.debug( "node.event = " + node.event );
251             }
252 
253             // Node becomes the new head (head is always empty)
254 
255             node.event = null;
256             head = node;
257 
258             size--;
259             return value;
260         }
261     }
262 
263     // /////////////////////////// Inner classes /////////////////////////////
264 
265     /** A node in the queue. These are chained forming a singly linked list */
266     protected static class Node
267     {
268         /** The next node. */
269         Node next = null;
270 
271         /** The event to run */
272         ElementEventQueue.AbstractElementEventRunner event = null;
273     }
274 
275     /**
276      */
277     private class QProcessor
278         extends Thread
279     {
280         /** The event queue */
281         ElementEventQueue queue;
282 
283         /**
284          * Constructor for the QProcessor object
285          * <p>
286          * @param aQueue
287          */
288         QProcessor( ElementEventQueue aQueue )
289         {
290             super( "ElementEventQueue.QProcessor-" + aQueue.cacheName );
291 
292             setDaemon( true );
293             queue = aQueue;
294         }
295 
296         /**
297          * Main processing method for the QProcessor object.
298          * <p>
299          * Waits for a specified time (waitToDieMillis) for something to come in and if no new
300          * events come in during that period the run method can exit and the thread is dereferenced.
301          */
302         @Override
303         public void run()
304         {
305             AbstractElementEventRunner event = null;
306 
307             while ( queue.isAlive() )
308             {
309                 event = queue.take();
310 
311                 if ( log.isDebugEnabled() )
312                 {
313                     log.debug( "Event from queue = " + event );
314                 }
315 
316                 if ( event == null )
317                 {
318                     synchronized ( queueLock )
319                     {
320                         try
321                         {
322                             queueLock.wait( queue.getWaitToDieMillis() );
323                         }
324                         catch ( InterruptedException e )
325                         {
326                             log.warn( "Interrupted while waiting for another event to come in before we die." );
327                             return;
328                         }
329                         event = queue.take();
330                         if ( log.isDebugEnabled() )
331                         {
332                             log.debug( "Event from queue after sleep = " + event );
333                         }
334                     }
335                     if ( event == null )
336                     {
337                         queue.stopProcessing();
338                     }
339                 }
340 
341                 if ( queue.isAlive() && event != null )
342                 {
343                     event.run();
344                 }
345             }
346             if ( log.isDebugEnabled() )
347             {
348                 log.debug( "QProcessor exiting for " + queue );
349             }
350         }
351     }
352 
353     /**
354      * Retries before declaring failure.
355      */
356     protected abstract class AbstractElementEventRunner
357         implements Runnable
358     {
359         /**
360          * Main processing method for the AbstractElementEvent object
361          */
362         public void run()
363         {
364             try
365             {
366                 doRun();
367                 // happy and done.
368             }
369             catch ( IOException e )
370             {
371                 // Too bad. The handler has problems.
372                 log.warn( "Giving up element event handling " + ElementEventQueue.this, e );
373             }
374         }
375 
376         /**
377          * This will do the work or trigger the work to be done.
378          * <p>
379          * @exception IOException
380          */
381         protected abstract void doRun()
382             throws IOException;
383     }
384 
385     /**
386      * ElementEventRunner.
387      */
388     private class ElementEventRunner
389         extends AbstractElementEventRunner
390     {
391         /** the handler */
392         private final IElementEventHandler hand;
393 
394         /** event */
395         private final IElementEvent event;
396 
397         /**
398          * Constructor for the PutEvent object.
399          * <p>
400          * @param hand
401          * @param event
402          * @exception IOException
403          */
404         ElementEventRunner( IElementEventHandler hand, IElementEvent event )
405             throws IOException
406         {
407             if ( log.isDebugEnabled() )
408             {
409                 log.debug( "Constructing " + this );
410             }
411             this.hand = hand;
412             this.event = event;
413         }
414 
415         /**
416          * Tells the handler to handle the event.
417          * <p>
418          * @exception IOException
419          */
420         @Override
421         protected void doRun()
422             throws IOException
423         {
424             hand.handleElementEvent( event );
425         }
426     }
427 }