View Javadoc

1   package org.apache.jcs.engine;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.Serializable;
23  import java.util.ArrayList;
24  
25  import org.apache.jcs.engine.behavior.ICacheListener;
26  import org.apache.jcs.engine.stats.StatElement;
27  import org.apache.jcs.engine.stats.Stats;
28  import org.apache.jcs.engine.stats.behavior.IStatElement;
29  import org.apache.jcs.engine.stats.behavior.IStats;
30  
31  /**
32   * An event queue is used to propagate ordered cache events to one and only one target listener.
33   * <p>
34   * This is a modified version of the experimental version. It should lazy initialize the processor
35   * thread, and kill the thread if the queue goes empty for a specified period, now set to 1 minute.
36   * If something comes in after that a new processor thread should be created.
37   */
38  public class CacheEventQueue<K extends Serializable, V extends Serializable>
39      extends AbstractCacheEventQueue<K, V>
40  {
41      /** The type of queue -- there are pooled and single */
42      private static final String queueType = SINGLE_QUEUE_TYPE;
43  
44      /** the thread that works the queue. */
45      private Thread processorThread;
46  
47      /** sync */
48      protected final Object queueLock = new Object();
49  
50      /** the head of the queue */
51      private Node head = new Node();
52  
53      /** the end of the queue */
54      private Node tail = head;
55  
56      /** Number of items in the queue */
57      private int size = 0;
58  
59      /**
60       * Constructs with the specified listener and the cache name.
61       * <p>
62       * @param listener
63       * @param listenerId
64       * @param cacheName
65       */
66      public CacheEventQueue( ICacheListener<K, V> listener, long listenerId, String cacheName )
67      {
68          this( listener, listenerId, cacheName, 10, 500 );
69      }
70  
71      /**
72       * Constructor for the CacheEventQueue object
73       * <p>
74       * @param listener
75       * @param listenerId
76       * @param cacheName
77       * @param maxFailure
78       * @param waitBeforeRetry
79       */
80      public CacheEventQueue( ICacheListener<K, V> listener, long listenerId, String cacheName, int maxFailure,
81                              int waitBeforeRetry )
82      {
83          initialize( listener, listenerId, cacheName, maxFailure, waitBeforeRetry, null );
84      }
85  
86      /**
87       * Initializes the queue.
88       * <p>
89       * @param listener
90       * @param listenerId
91       * @param cacheName
92       * @param maxFailure
93       * @param waitBeforeRetry
94       * @param threadPoolName
95       */
96      public void initialize( ICacheListener<K, V> listener, long listenerId, String cacheName, int maxFailure,
97                              int waitBeforeRetry, String threadPoolName )
98      {
99          if ( listener == null )
100         {
101             throw new IllegalArgumentException( "listener must not be null" );
102         }
103 
104         this.listener = listener;
105         this.listenerId = listenerId;
106         this.cacheName = cacheName;
107         this.maxFailure = maxFailure <= 0 ? 3 : maxFailure;
108         this.waitBeforeRetry = waitBeforeRetry <= 0 ? 500 : waitBeforeRetry;
109 
110         if ( log.isDebugEnabled() )
111         {
112             log.debug( "Constructed: " + this );
113         }
114     }
115 
116     /**
117      * What type of queue is this.
118      * <p>
119      * @return queueType
120      */
121     public String getQueueType()
122     {
123         return queueType;
124     }
125 
126     /**
127      * Kill the processor thread and indicate that the queue is destroyed and no longer alive, but it
128      * can still be working.
129      */
130     public void stopProcessing()
131     {
132         synchronized (queueLock)
133         {
134             destroyed = true;
135             processorThread = null;
136         }
137     }
138 
139     /**
140      * Event Q is empty.
141      * <p>
142      * Calling destroy interrupts the processor thread.
143      */
144     public void destroy()
145     {
146         synchronized (queueLock)
147         {
148             if ( !destroyed )
149             {
150                 destroyed = true;
151 
152                 if ( log.isInfoEnabled() )
153                 {
154                     log.info( "Destroying queue, stats =  " + getStatistics() );
155                 }
156 
157                 // Synchronize on queue so the thread will not wait forever,
158                 // and then interrupt the QueueProcessor
159 
160                 if ( processorThread != null )
161                 {
162                     processorThread.interrupt();
163                     processorThread = null;
164                 }
165 
166                 if ( log.isInfoEnabled() )
167                 {
168                     log.info( "Cache event queue destroyed: " + this );
169                 }
170             }
171             else
172             {
173                 if ( log.isInfoEnabled() )
174                 {
175                     log.info( "Destroy was called after queue was destroyed.  Doing nothing.  Stats =  " + getStatistics() );
176                 }
177             }
178         }
179     }
180 
181     /**
182      * Adds an event to the queue.
183      * <p>
184      * @param event
185      */
186     @Override
187     protected void put( AbstractCacheEvent event )
188     {
189         Node newNode = new Node();
190         if ( log.isDebugEnabled() )
191         {
192             log.debug( "Event entering Queue for " + cacheName + ": " + event );
193         }
194 
195         newNode.event = event;
196 
197         synchronized ( queueLock )
198         {
199             size++;
200             tail.next = newNode;
201             tail = newNode;
202             if ( isWorking() )
203             {
204                 if ( !isAlive() )
205                 {
206                     destroyed = false;
207                     processorThread = new QProcessor( this );
208                     processorThread.start();
209                     if ( log.isInfoEnabled() )
210                     {
211                         log.info( "Cache event queue created: " + this );
212                     }
213                 }
214                 else
215                 {
216                     queueLock.notify();
217                 }
218             }
219         }
220     }
221 
222     // /////////////////////////// Inner classes /////////////////////////////
223 
224     /**
225      * This is the thread that works the queue.
226      * <p>
227      * @author asmuts
228      * @created January 15, 2002
229      */
230     private class QProcessor
231         extends Thread
232     {
233         /** The queue to work */
234         CacheEventQueue<K, V> queue;
235 
236         /**
237          * Constructor for the QProcessor object
238          * <p>
239          * @param aQueue the event queue to take items from.
240          */
241         QProcessor( CacheEventQueue<K, V> aQueue )
242         {
243             super( "CacheEventQueue.QProcessor-" + aQueue.cacheName );
244 
245             setDaemon( true );
246             queue = aQueue;
247         }
248 
249         /**
250          * Main processing method for the QProcessor object.
251          * <p>
252          * Waits for a specified time (waitToDieMillis) for something to come in and if no new
253          * events come in during that period the run method can exit and the thread is dereferenced.
254          */
255         @Override
256         public void run()
257         {
258             AbstractCacheEvent event = null;
259 
260             while ( queue.isAlive() )
261             {
262                 event = queue.take();
263 
264                 if ( log.isDebugEnabled() )
265                 {
266                     log.debug( "Event from queue = " + event );
267                 }
268 
269                 if ( event == null )
270                 {
271                     synchronized ( queueLock )
272                     {
273                         try
274                         {
275                             queueLock.wait( queue.getWaitToDieMillis() );
276                         }
277                         catch ( InterruptedException e )
278                         {
279                             log.warn( "Interrupted while waiting for another event to come in before we die." );
280                             return;
281                         }
282                         event = queue.take();
283                         if ( log.isDebugEnabled() )
284                         {
285                             log.debug( "Event from queue after sleep = " + event );
286                         }
287                     }
288                     if ( event == null )
289                     {
290                         queue.stopProcessing();
291                     }
292                 }
293 
294                 if ( queue.isWorking() && queue.isAlive() && event != null )
295                 {
296                     event.run();
297                 }
298             }
299             if ( log.isDebugEnabled() )
300             {
301                 log.debug( "QProcessor exiting for " + queue );
302             }
303         }
304     }
305 
306     /**
307      * Returns the next cache event from the queue or null if there are no events in the queue.
308      * <p>
309      * We have an empty node at the head and the tail. When we take an item from the queue we move
310      * the next node to the head and then clear the value from that node. This value is returned.
311      * <p>
312      * When the queue is empty the head node is the same as the tail node.
313      * <p>
314      * @return An event to process.
315      */
316     protected AbstractCacheEvent take()
317     {
318         synchronized ( queueLock )
319         {
320             // wait until there is something to read
321             if ( head == tail )
322             {
323                 return null;
324             }
325 
326             Node node = head.next;
327 
328             @SuppressWarnings("unchecked") // No generics for public fields
329             AbstractCacheEvent value = (AbstractCacheEvent) node.event;
330 
331             if ( log.isDebugEnabled() )
332             {
333                 log.debug( "head.event = " + head.event );
334                 log.debug( "node.event = " + node.event );
335             }
336 
337             // Node becomes the new head (head is always empty)
338 
339             node.event = null;
340             head = node;
341 
342             size--;
343             return value;
344         }
345     }
346 
347     /**
348      * This method returns semi-structured data on this queue.
349      * <p>
350      * @see org.apache.jcs.engine.behavior.ICacheEventQueue#getStatistics()
351      * @return information on the status and history of the queue
352      */
353     public IStats getStatistics()
354     {
355         IStats stats = new Stats();
356         stats.setTypeName( "Cache Event Queue" );
357 
358         ArrayList<IStatElement> elems = new ArrayList<IStatElement>();
359 
360         IStatElement se = null;
361 
362         se = new StatElement();
363         se.setName( "Working" );
364         se.setData( "" + this.working );
365         elems.add( se );
366 
367         se = new StatElement();
368         se.setName( "Alive" );
369         se.setData( "" + this.isAlive() );
370         elems.add( se );
371 
372         se = new StatElement();
373         se.setName( "Empty" );
374         se.setData( "" + this.isEmpty() );
375         elems.add( se );
376 
377         int size = 0;
378         synchronized ( queueLock )
379         {
380             // wait until there is something to read
381             if ( head == tail )
382             {
383                 size = 0;
384             }
385             else
386             {
387                 Node n = head;
388                 while ( n != null )
389                 {
390                     n = n.next;
391                     size++;
392                 }
393             }
394 
395             se = new StatElement();
396             se.setName( "Size" );
397             se.setData( "" + size );
398             elems.add( se );
399         }
400 
401         // get an array and put them in the Stats object
402         IStatElement[] ses = elems.toArray( new StatElement[0] );
403         stats.setStatElements( ses );
404 
405         return stats;
406     }
407 
408     /**
409      * @return whether there are any items in the queue.
410      */
411     public boolean isEmpty()
412     {
413         return tail == head;
414     }
415 
416     /**
417      * Returns the number of elements in the queue.
418      * <p>
419      * @return number of items in the queue.
420      */
421     public int size()
422     {
423         return size;
424     }
425 }