1 package org.apache.jcs.engine;
2
3 /*
4 * Licensed to the Apache Software Foundation (ASF) under one
5 * or more contributor license agreements. See the NOTICE file
6 * distributed with this work for additional information
7 * regarding copyright ownership. The ASF licenses this file
8 * to you under the Apache License, Version 2.0 (the
9 * "License"); you may not use this file except in compliance
10 * with the License. You may obtain a copy of the License at
11 *
12 * http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing,
15 * software distributed under the License is distributed on an
16 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17 * KIND, either express or implied. See the License for the
18 * specific language governing permissions and limitations
19 * under the License.
20 */
21
22 import java.io.Serializable;
23 import java.util.ArrayList;
24
25 import org.apache.jcs.engine.behavior.ICacheListener;
26 import org.apache.jcs.engine.stats.StatElement;
27 import org.apache.jcs.engine.stats.Stats;
28 import org.apache.jcs.engine.stats.behavior.IStatElement;
29 import org.apache.jcs.engine.stats.behavior.IStats;
30
31 /**
32 * An event queue is used to propagate ordered cache events to one and only one target listener.
33 * <p>
34 * This is a modified version of the experimental version. It should lazy initialize the processor
35 * thread, and kill the thread if the queue goes empty for a specified period, now set to 1 minute.
36 * If something comes in after that a new processor thread should be created.
37 */
38 public class CacheEventQueue<K extends Serializable, V extends Serializable>
39 extends AbstractCacheEventQueue<K, V>
40 {
41 /** The type of queue -- there are pooled and single */
42 private static final String queueType = SINGLE_QUEUE_TYPE;
43
44 /** the thread that works the queue. */
45 private Thread processorThread;
46
47 /** sync */
48 protected final Object queueLock = new Object();
49
50 /** the head of the queue */
51 private Node head = new Node();
52
53 /** the end of the queue */
54 private Node tail = head;
55
56 /** Number of items in the queue */
57 private int size = 0;
58
59 /**
60 * Constructs with the specified listener and the cache name.
61 * <p>
62 * @param listener
63 * @param listenerId
64 * @param cacheName
65 */
66 public CacheEventQueue( ICacheListener<K, V> listener, long listenerId, String cacheName )
67 {
68 this( listener, listenerId, cacheName, 10, 500 );
69 }
70
71 /**
72 * Constructor for the CacheEventQueue object
73 * <p>
74 * @param listener
75 * @param listenerId
76 * @param cacheName
77 * @param maxFailure
78 * @param waitBeforeRetry
79 */
80 public CacheEventQueue( ICacheListener<K, V> listener, long listenerId, String cacheName, int maxFailure,
81 int waitBeforeRetry )
82 {
83 initialize( listener, listenerId, cacheName, maxFailure, waitBeforeRetry, null );
84 }
85
86 /**
87 * Initializes the queue.
88 * <p>
89 * @param listener
90 * @param listenerId
91 * @param cacheName
92 * @param maxFailure
93 * @param waitBeforeRetry
94 * @param threadPoolName
95 */
96 public void initialize( ICacheListener<K, V> listener, long listenerId, String cacheName, int maxFailure,
97 int waitBeforeRetry, String threadPoolName )
98 {
99 if ( listener == null )
100 {
101 throw new IllegalArgumentException( "listener must not be null" );
102 }
103
104 this.listener = listener;
105 this.listenerId = listenerId;
106 this.cacheName = cacheName;
107 this.maxFailure = maxFailure <= 0 ? 3 : maxFailure;
108 this.waitBeforeRetry = waitBeforeRetry <= 0 ? 500 : waitBeforeRetry;
109
110 if ( log.isDebugEnabled() )
111 {
112 log.debug( "Constructed: " + this );
113 }
114 }
115
116 /**
117 * What type of queue is this.
118 * <p>
119 * @return queueType
120 */
121 public String getQueueType()
122 {
123 return queueType;
124 }
125
126 /**
127 * Kill the processor thread and indicate that the queue is destroyed and no longer alive, but it
128 * can still be working.
129 */
130 public void stopProcessing()
131 {
132 synchronized (queueLock)
133 {
134 destroyed = true;
135 processorThread = null;
136 }
137 }
138
139 /**
140 * Event Q is empty.
141 * <p>
142 * Calling destroy interrupts the processor thread.
143 */
144 public void destroy()
145 {
146 synchronized (queueLock)
147 {
148 if ( !destroyed )
149 {
150 destroyed = true;
151
152 if ( log.isInfoEnabled() )
153 {
154 log.info( "Destroying queue, stats = " + getStatistics() );
155 }
156
157 // Synchronize on queue so the thread will not wait forever,
158 // and then interrupt the QueueProcessor
159
160 if ( processorThread != null )
161 {
162 processorThread.interrupt();
163 processorThread = null;
164 }
165
166 if ( log.isInfoEnabled() )
167 {
168 log.info( "Cache event queue destroyed: " + this );
169 }
170 }
171 else
172 {
173 if ( log.isInfoEnabled() )
174 {
175 log.info( "Destroy was called after queue was destroyed. Doing nothing. Stats = " + getStatistics() );
176 }
177 }
178 }
179 }
180
181 /**
182 * Adds an event to the queue.
183 * <p>
184 * @param event
185 */
186 @Override
187 protected void put( AbstractCacheEvent event )
188 {
189 Node newNode = new Node();
190 if ( log.isDebugEnabled() )
191 {
192 log.debug( "Event entering Queue for " + cacheName + ": " + event );
193 }
194
195 newNode.event = event;
196
197 synchronized ( queueLock )
198 {
199 size++;
200 tail.next = newNode;
201 tail = newNode;
202 if ( isWorking() )
203 {
204 if ( !isAlive() )
205 {
206 destroyed = false;
207 processorThread = new QProcessor( this );
208 processorThread.start();
209 if ( log.isInfoEnabled() )
210 {
211 log.info( "Cache event queue created: " + this );
212 }
213 }
214 else
215 {
216 queueLock.notify();
217 }
218 }
219 }
220 }
221
222 // /////////////////////////// Inner classes /////////////////////////////
223
224 /**
225 * This is the thread that works the queue.
226 * <p>
227 * @author asmuts
228 * @created January 15, 2002
229 */
230 private class QProcessor
231 extends Thread
232 {
233 /** The queue to work */
234 CacheEventQueue<K, V> queue;
235
236 /**
237 * Constructor for the QProcessor object
238 * <p>
239 * @param aQueue the event queue to take items from.
240 */
241 QProcessor( CacheEventQueue<K, V> aQueue )
242 {
243 super( "CacheEventQueue.QProcessor-" + aQueue.cacheName );
244
245 setDaemon( true );
246 queue = aQueue;
247 }
248
249 /**
250 * Main processing method for the QProcessor object.
251 * <p>
252 * Waits for a specified time (waitToDieMillis) for something to come in and if no new
253 * events come in during that period the run method can exit and the thread is dereferenced.
254 */
255 @Override
256 public void run()
257 {
258 AbstractCacheEvent event = null;
259
260 while ( queue.isAlive() )
261 {
262 event = queue.take();
263
264 if ( log.isDebugEnabled() )
265 {
266 log.debug( "Event from queue = " + event );
267 }
268
269 if ( event == null )
270 {
271 synchronized ( queueLock )
272 {
273 try
274 {
275 queueLock.wait( queue.getWaitToDieMillis() );
276 }
277 catch ( InterruptedException e )
278 {
279 log.warn( "Interrupted while waiting for another event to come in before we die." );
280 return;
281 }
282 event = queue.take();
283 if ( log.isDebugEnabled() )
284 {
285 log.debug( "Event from queue after sleep = " + event );
286 }
287 }
288 if ( event == null )
289 {
290 queue.stopProcessing();
291 }
292 }
293
294 if ( queue.isWorking() && queue.isAlive() && event != null )
295 {
296 event.run();
297 }
298 }
299 if ( log.isDebugEnabled() )
300 {
301 log.debug( "QProcessor exiting for " + queue );
302 }
303 }
304 }
305
306 /**
307 * Returns the next cache event from the queue or null if there are no events in the queue.
308 * <p>
309 * We have an empty node at the head and the tail. When we take an item from the queue we move
310 * the next node to the head and then clear the value from that node. This value is returned.
311 * <p>
312 * When the queue is empty the head node is the same as the tail node.
313 * <p>
314 * @return An event to process.
315 */
316 protected AbstractCacheEvent take()
317 {
318 synchronized ( queueLock )
319 {
320 // wait until there is something to read
321 if ( head == tail )
322 {
323 return null;
324 }
325
326 Node node = head.next;
327
328 @SuppressWarnings("unchecked") // No generics for public fields
329 AbstractCacheEvent value = (AbstractCacheEvent) node.event;
330
331 if ( log.isDebugEnabled() )
332 {
333 log.debug( "head.event = " + head.event );
334 log.debug( "node.event = " + node.event );
335 }
336
337 // Node becomes the new head (head is always empty)
338
339 node.event = null;
340 head = node;
341
342 size--;
343 return value;
344 }
345 }
346
347 /**
348 * This method returns semi-structured data on this queue.
349 * <p>
350 * @see org.apache.jcs.engine.behavior.ICacheEventQueue#getStatistics()
351 * @return information on the status and history of the queue
352 */
353 public IStats getStatistics()
354 {
355 IStats stats = new Stats();
356 stats.setTypeName( "Cache Event Queue" );
357
358 ArrayList<IStatElement> elems = new ArrayList<IStatElement>();
359
360 IStatElement se = null;
361
362 se = new StatElement();
363 se.setName( "Working" );
364 se.setData( "" + this.working );
365 elems.add( se );
366
367 se = new StatElement();
368 se.setName( "Alive" );
369 se.setData( "" + this.isAlive() );
370 elems.add( se );
371
372 se = new StatElement();
373 se.setName( "Empty" );
374 se.setData( "" + this.isEmpty() );
375 elems.add( se );
376
377 int size = 0;
378 synchronized ( queueLock )
379 {
380 // wait until there is something to read
381 if ( head == tail )
382 {
383 size = 0;
384 }
385 else
386 {
387 Node n = head;
388 while ( n != null )
389 {
390 n = n.next;
391 size++;
392 }
393 }
394
395 se = new StatElement();
396 se.setName( "Size" );
397 se.setData( "" + size );
398 elems.add( se );
399 }
400
401 // get an array and put them in the Stats object
402 IStatElement[] ses = elems.toArray( new StatElement[0] );
403 stats.setStatElements( ses );
404
405 return stats;
406 }
407
408 /**
409 * @return whether there are any items in the queue.
410 */
411 public boolean isEmpty()
412 {
413 return tail == head;
414 }
415
416 /**
417 * Returns the number of elements in the queue.
418 * <p>
419 * @return number of items in the queue.
420 */
421 public int size()
422 {
423 return size;
424 }
425 }