View Javadoc
1   package org.apache.commons.jcs.engine;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.ArrayList;
23  import java.util.concurrent.LinkedBlockingQueue;
24  import java.util.concurrent.TimeUnit;
25  
26  import org.apache.commons.jcs.engine.behavior.ICacheListener;
27  import org.apache.commons.jcs.engine.stats.StatElement;
28  import org.apache.commons.jcs.engine.stats.Stats;
29  import org.apache.commons.jcs.engine.stats.behavior.IStatElement;
30  import org.apache.commons.jcs.engine.stats.behavior.IStats;
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  
34  /**
35   * An event queue is used to propagate ordered cache events to one and only one target listener.
36   * <p>
37   * This is a modified version of the experimental version. It should lazy initialize the processor
38   * thread, and kill the thread if the queue goes empty for a specified period, now set to 1 minute.
39   * If something comes in after that a new processor thread should be created.
40   */
41  public class CacheEventQueue<K, V>
42      extends AbstractCacheEventQueue<K, V>
43  {
44      /** The logger. */
45      private static final Log log = LogFactory.getLog( CacheEventQueue.class );
46  
47      /** The type of queue -- there are pooled and single */
48      private static final QueueType queueType = QueueType.SINGLE;
49  
50      /** the thread that works the queue. */
51      private Thread processorThread;
52  
53      /** Queue implementation */
54      private LinkedBlockingQueue<AbstractCacheEvent> queue = new LinkedBlockingQueue<AbstractCacheEvent>();
55  
56      /**
57       * Constructs with the specified listener and the cache name.
58       * <p>
59       * @param listener
60       * @param listenerId
61       * @param cacheName
62       */
63      public CacheEventQueue( ICacheListener<K, V> listener, long listenerId, String cacheName )
64      {
65          this( listener, listenerId, cacheName, 10, 500 );
66      }
67  
68      /**
69       * Constructor for the CacheEventQueue object
70       * <p>
71       * @param listener
72       * @param listenerId
73       * @param cacheName
74       * @param maxFailure
75       * @param waitBeforeRetry
76       */
77      public CacheEventQueue( ICacheListener<K, V> listener, long listenerId, String cacheName, int maxFailure,
78                              int waitBeforeRetry )
79      {
80          initialize( listener, listenerId, cacheName, maxFailure, waitBeforeRetry );
81      }
82  
83      /**
84       * What type of queue is this.
85       * <p>
86       * @return queueType
87       */
88      @Override
89      public QueueType getQueueType()
90      {
91          return queueType;
92      }
93  
94      /**
95       * Kill the processor thread and indicate that the queue is destroyed and no longer alive, but it
96       * can still be working.
97       */
98      protected void stopProcessing()
99      {
100         setAlive(false);
101         processorThread = null;
102     }
103 
104     /**
105      * Event Q is empty.
106      * <p>
107      * Calling destroy interrupts the processor thread.
108      */
109     @Override
110     public void destroy()
111     {
112         if ( isAlive() )
113         {
114             setAlive(false);
115 
116             if ( log.isInfoEnabled() )
117             {
118                 log.info( "Destroying queue, stats =  " + getStatistics() );
119             }
120 
121             if ( processorThread != null )
122             {
123                 processorThread.interrupt();
124                 processorThread = null;
125             }
126 
127             if ( log.isInfoEnabled() )
128             {
129                 log.info( "Cache event queue destroyed: " + this );
130             }
131         }
132         else
133         {
134             if ( log.isInfoEnabled() )
135             {
136                 log.info( "Destroy was called after queue was destroyed. Doing nothing. Stats =  " + getStatistics() );
137             }
138         }
139     }
140 
141     /**
142      * Adds an event to the queue.
143      * <p>
144      * @param event
145      */
146     @Override
147     protected void put( AbstractCacheEvent event )
148     {
149         if ( log.isDebugEnabled() )
150         {
151             log.debug( "Event entering Queue for " + getCacheName() + ": " + event );
152         }
153 
154         queue.offer(event);
155 
156         if ( isWorking() )
157         {
158             if ( !isAlive() )
159             {
160                 setAlive(true);
161                 processorThread = new QProcessor();
162                 processorThread.start();
163                 if ( log.isInfoEnabled() )
164                 {
165                     log.info( "Cache event queue created: " + this );
166                 }
167             }
168         }
169     }
170 
171     // /////////////////////////// Inner classes /////////////////////////////
172 
173     /**
174      * This is the thread that works the queue.
175      * <p>
176      * @author asmuts
177      */
178     protected class QProcessor
179         extends Thread
180     {
181         /**
182          * Constructor for the QProcessor object
183          * <p>
184          * @param aQueue the event queue to take items from.
185          */
186         QProcessor()
187         {
188             super( "CacheEventQueue.QProcessor-" + getCacheName() );
189             setDaemon( true );
190         }
191 
192         /**
193          * Main processing method for the QProcessor object.
194          * <p>
195          * Waits for a specified time (waitToDieMillis) for something to come in and if no new
196          * events come in during that period the run method can exit and the thread is dereferenced.
197          */
198         @Override
199         public void run()
200         {
201 
202             while ( CacheEventQueue.this.isAlive() )
203             {
204                 AbstractCacheEvent event = null;
205 
206                 try
207                 {
208                     event = queue.poll(getWaitToDieMillis(), TimeUnit.MILLISECONDS);
209                 }
210                 catch (InterruptedException e)
211                 {
212                     // is ok
213                 }
214 
215                 if ( log.isDebugEnabled() )
216                 {
217                     log.debug( "Event from queue = " + event );
218                 }
219 
220                 if ( event == null )
221                 {
222                     stopProcessing();
223                 }
224 
225                 if ( event != null && isWorking() && CacheEventQueue.this.isAlive() )
226                 {
227                     event.run();
228                 }
229             }
230             if ( log.isDebugEnabled() )
231             {
232                 log.debug( "QProcessor exiting for " + getCacheName() );
233             }
234         }
235     }
236 
237     /**
238      * This method returns semi-structured data on this queue.
239      * <p>
240      * @see org.apache.commons.jcs.engine.behavior.ICacheEventQueue#getStatistics()
241      * @return information on the status and history of the queue
242      */
243     @Override
244     public IStats getStatistics()
245     {
246         IStats stats = new Stats();
247         stats.setTypeName( "Cache Event Queue" );
248 
249         ArrayList<IStatElement<?>> elems = new ArrayList<IStatElement<?>>();
250 
251         elems.add(new StatElement<Boolean>( "Working", Boolean.valueOf(this.isWorking()) ) );
252         elems.add(new StatElement<Boolean>( "Alive", Boolean.valueOf(this.isAlive()) ) );
253         elems.add(new StatElement<Boolean>( "Empty", Boolean.valueOf(this.isEmpty()) ) );
254         elems.add(new StatElement<Integer>( "Size", Integer.valueOf(this.size()) ) );
255 
256         stats.setStatElements( elems );
257 
258         return stats;
259     }
260 
261     /**
262      * @return whether there are any items in the queue.
263      */
264     @Override
265     public boolean isEmpty()
266     {
267         return queue.isEmpty();
268     }
269 
270     /**
271      * Returns the number of elements in the queue.
272      * <p>
273      * @return number of items in the queue.
274      */
275     @Override
276     public int size()
277     {
278         return queue.size();
279     }
280 }