1 package org.apache.jcs.engine.control.event;
2
3 /*
4 * Licensed to the Apache Software Foundation (ASF) under one
5 * or more contributor license agreements. See the NOTICE file
6 * distributed with this work for additional information
7 * regarding copyright ownership. The ASF licenses this file
8 * to you under the Apache License, Version 2.0 (the
9 * "License"); you may not use this file except in compliance
10 * with the License. You may obtain a copy of the License at
11 *
12 * http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing,
15 * software distributed under the License is distributed on an
16 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17 * KIND, either express or implied. See the License for the
18 * specific language governing permissions and limitations
19 * under the License.
20 */
21
22 import java.io.IOException;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.jcs.engine.control.event.behavior.IElementEvent;
27 import org.apache.jcs.engine.control.event.behavior.IElementEventHandler;
28 import org.apache.jcs.engine.control.event.behavior.IElementEventQueue;
29
30 /**
31 * An event queue is used to propagate ordered cache events to one and only one target listener.
32 */
33 public class ElementEventQueue
34 implements IElementEventQueue
35 {
36 /** The logger */
37 protected final static Log log = LogFactory.getLog( ElementEventQueue.class );
38
39 /** The cache (region) name. */
40 protected final String cacheName;
41
42 /** default */
43 private static final int DEFAULT_WAIT_TO_DIE_MILLIS = 10000;
44
45 /**
46 * time to wait for an event before snuffing the background thread if the queue is empty. make
47 * configurable later
48 */
49 private int waitToDieMillis = DEFAULT_WAIT_TO_DIE_MILLIS;
50
51 /** shutdown or not */
52 private boolean destroyed = false;
53
54 /** The worker thread. */
55 private Thread processorThread;
56
57 /** Internal queue implementation */
58 protected final Object queueLock = new Object();
59
60 /** Dummy node */
61 private Node head = new Node();
62
63 /** tail of the doubly linked list */
64 private Node tail = head;
65
66 /** Number of items in the queue */
67 private int size = 0;
68
69 /**
70 * Constructor for the ElementEventQueue object
71 * <p>
72 * @param cacheName
73 */
74 public ElementEventQueue( String cacheName )
75 {
76 this.cacheName = cacheName;
77
78 processorThread = new QProcessor( this );
79 processorThread.start();
80
81 if ( log.isDebugEnabled() )
82 {
83 log.debug( "Constructed: " + this );
84 }
85 }
86
87 /**
88 * Event Q is empty.
89 */
90 public void destroy()
91 {
92 synchronized ( queueLock )
93 {
94 if ( !destroyed )
95 {
96 destroyed = true;
97
98 // synchronize on queue so the thread will not wait forever,
99 // and then interrupt the QueueProcessor
100 processorThread.interrupt();
101 processorThread = null;
102
103 if ( log.isInfoEnabled() )
104 {
105 log.info( "Element event queue destroyed: " + this );
106 }
107 }
108 }
109 }
110
111 /**
112 * Kill the processor thread and indicate that the queue is destroyed and no longer alive, but it
113 * can still be working.
114 */
115 public void stopProcessing()
116 {
117 synchronized ( queueLock )
118 {
119 destroyed = true;
120 processorThread = null;
121 }
122 }
123
124 /**
125 * Returns the time to wait for events before killing the background thread.
126 * <p>
127 * @return int
128 */
129 public int getWaitToDieMillis()
130 {
131 return waitToDieMillis;
132 }
133
134 /**
135 * Sets the time to wait for events before killing the background thread.
136 * <p>
137 * @param wtdm the ms for the q to sit idle.
138 */
139 public void setWaitToDieMillis( int wtdm )
140 {
141 waitToDieMillis = wtdm;
142 }
143
144 /**
145 * @return the region name for the event queue
146 */
147 @Override
148 public String toString()
149 {
150 return "cacheName=" + cacheName;
151 }
152
153 /**
154 * Returns the number of elements in the queue.
155 * <p>
156 * @return number of items in the queue.
157 */
158 public int size()
159 {
160 return size;
161 }
162
163 /**
164 * @return The destroyed value
165 */
166 public boolean isAlive()
167 {
168 return ( !destroyed );
169 }
170
171 /**
172 * Adds an ElementEvent to be handled
173 * @param hand The IElementEventHandler
174 * @param event The IElementEventHandler IElementEvent event
175 * @throws IOException
176 */
177 public void addElementEvent( IElementEventHandler hand, IElementEvent event )
178 throws IOException
179 {
180
181 if ( log.isDebugEnabled() )
182 {
183 log.debug( "Adding Event Handler to QUEUE, !destroyed = " + !destroyed );
184 }
185
186 if ( !destroyed )
187 {
188 ElementEventRunner runner = new ElementEventRunner( hand, event );
189
190 if ( log.isDebugEnabled() )
191 {
192 log.debug( "runner = " + runner );
193 }
194
195 put( runner );
196 }
197 }
198
199 /**
200 * Adds an event to the queue.
201 * @param event
202 */
203 private void put( AbstractElementEventRunner event )
204 {
205 Node newNode = new Node();
206
207 newNode.event = event;
208
209 synchronized ( queueLock )
210 {
211 size++;
212 tail.next = newNode;
213 tail = newNode;
214 if ( !isAlive() )
215 {
216 destroyed = false;
217 processorThread = new QProcessor( this );
218 processorThread.start();
219 }
220 else
221 {
222 queueLock.notify();
223 }
224 queueLock.notify();
225 }
226 }
227
228 /**
229 * Returns the next item on the queue, or waits if empty.
230 * <p>
231 * @return AbstractElementEventRunner
232 */
233 protected AbstractElementEventRunner take()
234 {
235 synchronized ( queueLock )
236 {
237 // wait until there is something to read
238 if ( head == tail )
239 {
240 return null;
241 }
242
243 Node node = head.next;
244
245 AbstractElementEventRunner value = node.event;
246
247 if ( log.isDebugEnabled() )
248 {
249 log.debug( "head.event = " + head.event );
250 log.debug( "node.event = " + node.event );
251 }
252
253 // Node becomes the new head (head is always empty)
254
255 node.event = null;
256 head = node;
257
258 size--;
259 return value;
260 }
261 }
262
263 // /////////////////////////// Inner classes /////////////////////////////
264
265 /** A node in the queue. These are chained forming a singly linked list */
266 protected static class Node
267 {
268 /** The next node. */
269 Node next = null;
270
271 /** The event to run */
272 ElementEventQueue.AbstractElementEventRunner event = null;
273 }
274
275 /**
276 */
277 private class QProcessor
278 extends Thread
279 {
280 /** The event queue */
281 ElementEventQueue queue;
282
283 /**
284 * Constructor for the QProcessor object
285 * <p>
286 * @param aQueue
287 */
288 QProcessor( ElementEventQueue aQueue )
289 {
290 super( "ElementEventQueue.QProcessor-" + aQueue.cacheName );
291
292 setDaemon( true );
293 queue = aQueue;
294 }
295
296 /**
297 * Main processing method for the QProcessor object.
298 * <p>
299 * Waits for a specified time (waitToDieMillis) for something to come in and if no new
300 * events come in during that period the run method can exit and the thread is dereferenced.
301 */
302 @Override
303 public void run()
304 {
305 AbstractElementEventRunner event = null;
306
307 while ( queue.isAlive() )
308 {
309 event = queue.take();
310
311 if ( log.isDebugEnabled() )
312 {
313 log.debug( "Event from queue = " + event );
314 }
315
316 if ( event == null )
317 {
318 synchronized ( queueLock )
319 {
320 try
321 {
322 queueLock.wait( queue.getWaitToDieMillis() );
323 }
324 catch ( InterruptedException e )
325 {
326 log.warn( "Interrupted while waiting for another event to come in before we die." );
327 return;
328 }
329 event = queue.take();
330 if ( log.isDebugEnabled() )
331 {
332 log.debug( "Event from queue after sleep = " + event );
333 }
334 }
335 if ( event == null )
336 {
337 queue.stopProcessing();
338 }
339 }
340
341 if ( queue.isAlive() && event != null )
342 {
343 event.run();
344 }
345 }
346 if ( log.isDebugEnabled() )
347 {
348 log.debug( "QProcessor exiting for " + queue );
349 }
350 }
351 }
352
353 /**
354 * Retries before declaring failure.
355 */
356 protected abstract class AbstractElementEventRunner
357 implements Runnable
358 {
359 /**
360 * Main processing method for the AbstractElementEvent object
361 */
362 public void run()
363 {
364 try
365 {
366 doRun();
367 // happy and done.
368 }
369 catch ( IOException e )
370 {
371 // Too bad. The handler has problems.
372 log.warn( "Giving up element event handling " + ElementEventQueue.this, e );
373 }
374 }
375
376 /**
377 * This will do the work or trigger the work to be done.
378 * <p>
379 * @exception IOException
380 */
381 protected abstract void doRun()
382 throws IOException;
383 }
384
385 /**
386 * ElementEventRunner.
387 */
388 private class ElementEventRunner
389 extends AbstractElementEventRunner
390 {
391 /** the handler */
392 private final IElementEventHandler hand;
393
394 /** event */
395 private final IElementEvent event;
396
397 /**
398 * Constructor for the PutEvent object.
399 * <p>
400 * @param hand
401 * @param event
402 * @exception IOException
403 */
404 ElementEventRunner( IElementEventHandler hand, IElementEvent event )
405 throws IOException
406 {
407 if ( log.isDebugEnabled() )
408 {
409 log.debug( "Constructing " + this );
410 }
411 this.hand = hand;
412 this.event = event;
413 }
414
415 /**
416 * Tells the handler to handle the event.
417 * <p>
418 * @exception IOException
419 */
420 @Override
421 protected void doRun()
422 throws IOException
423 {
424 hand.handleElementEvent( event );
425 }
426 }
427 }