001 package org.apache.jcs.engine;
002
003 /*
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements. See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership. The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License. You may obtain a copy of the License at
011 *
012 * http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing,
015 * software distributed under the License is distributed on an
016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017 * KIND, either express or implied. See the License for the
018 * specific language governing permissions and limitations
019 * under the License.
020 */
021
022 import java.io.IOException;
023 import java.io.Serializable;
024
025 import org.apache.commons.logging.Log;
026 import org.apache.commons.logging.LogFactory;
027 import org.apache.jcs.engine.behavior.ICacheElement;
028 import org.apache.jcs.engine.behavior.ICacheEventQueue;
029 import org.apache.jcs.engine.behavior.ICacheListener;
030
031 /**
032 * An abstract base class to the different implementations
033 */
034 public abstract class AbstractCacheEventQueue<K extends Serializable, V extends Serializable>
035 implements ICacheEventQueue<K, V>
036 {
037 /** The logger. */
038 protected static final Log log = LogFactory.getLog( AbstractCacheEventQueue.class );
039
040 /** default */
041 protected static final int DEFAULT_WAIT_TO_DIE_MILLIS = 10000;
042
043 /**
044 * time to wait for an event before snuffing the background thread if the queue is empty. make
045 * configurable later
046 */
047 protected int waitToDieMillis = DEFAULT_WAIT_TO_DIE_MILLIS;
048
049 /**
050 * When the events are pulled off the queue, the tell the listener to handle the specific event
051 * type. The work is done by the listener.
052 */
053 protected ICacheListener<K, V> listener;
054
055 /** Id of the listener registered with this queue */
056 protected long listenerId;
057
058 /** The cache region name, if applicable. */
059 protected String cacheName;
060
061 /** Maximum number of failures before we buy the farm. */
062 protected int maxFailure;
063
064 /** in milliseconds */
065 protected int waitBeforeRetry;
066
067 /** this is true if there is no worker thread. */
068 protected boolean destroyed = true;
069
070 /**
071 * This means that the queue is functional. If we reached the max number of failures, the queue
072 * is marked as non functional and will never work again.
073 */
074 protected boolean working = true;
075
076 /**
077 * Returns the time to wait for events before killing the background thread.
078 * <p>
079 * @return int
080 */
081 public int getWaitToDieMillis()
082 {
083 return waitToDieMillis;
084 }
085
086 /**
087 * Sets the time to wait for events before killing the background thread.
088 * <p>
089 * @param wtdm the ms for the q to sit idle.
090 */
091 public void setWaitToDieMillis( int wtdm )
092 {
093 waitToDieMillis = wtdm;
094 }
095
096 /**
097 * Creates a brief string identifying the listener and the region.
098 * <p>
099 * @return String debugging info.
100 */
101 @Override
102 public String toString()
103 {
104 return "CacheEventQueue [listenerId=" + listenerId + ", cacheName=" + cacheName + "]";
105 }
106
107 /**
108 * If they queue has an active thread it is considered alive.
109 * <p>
110 * @return The alive value
111 */
112 public synchronized boolean isAlive()
113 {
114 return ( !destroyed );
115 }
116
117 /**
118 * Sets whether the queue is actively processing -- if there are working threads.
119 * <p>
120 * @param aState
121 */
122 public synchronized void setAlive( boolean aState )
123 {
124 destroyed = !aState;
125 }
126
127 /**
128 * @return The listenerId value
129 */
130 public long getListenerId()
131 {
132 return listenerId;
133 }
134
135 /**
136 * This adds a put event to the queue. When it is processed, the element will be put to the
137 * listener.
138 * <p>
139 * @param ce The feature to be added to the PutEvent attribute
140 * @exception IOException
141 */
142 public synchronized void addPutEvent( ICacheElement<K, V> ce )
143 throws IOException
144 {
145 if ( isWorking() )
146 {
147 put( new PutEvent( ce ) );
148 }
149 else
150 {
151 if ( log.isWarnEnabled() )
152 {
153 log.warn( "Not enqueuing Put Event for [" + this + "] because it's non-functional." );
154 }
155 }
156 }
157
158 /**
159 * This adds a remove event to the queue. When processed the listener's remove method will be
160 * called for the key.
161 * <p>
162 * @param key The feature to be added to the RemoveEvent attribute
163 * @exception IOException
164 */
165 public synchronized void addRemoveEvent( K key )
166 throws IOException
167 {
168 if ( isWorking() )
169 {
170 put( new RemoveEvent( key ) );
171 }
172 else
173 {
174 if ( log.isWarnEnabled() )
175 {
176 log.warn( "Not enqueuing Remove Event for [" + this + "] because it's non-functional." );
177 }
178 }
179 }
180
181 /**
182 * This adds a remove all event to the queue. When it is processed, all elements will be removed
183 * from the cache.
184 * <p>
185 * @exception IOException
186 */
187 public synchronized void addRemoveAllEvent()
188 throws IOException
189 {
190 if ( isWorking() )
191 {
192 put( new RemoveAllEvent() );
193 }
194 else
195 {
196 if ( log.isWarnEnabled() )
197 {
198 log.warn( "Not enqueuing RemoveAll Event for [" + this + "] because it's non-functional." );
199 }
200 }
201 }
202
203 /**
204 * @exception IOException
205 */
206 public synchronized void addDisposeEvent()
207 throws IOException
208 {
209 if ( isWorking() )
210 {
211 put( new DisposeEvent() );
212 }
213 else
214 {
215 if ( log.isWarnEnabled() )
216 {
217 log.warn( "Not enqueuing Dispose Event for [" + this + "] because it's non-functional." );
218 }
219 }
220 }
221
222 /**
223 * Adds an event to the queue.
224 * <p>
225 * @param event
226 */
227 protected abstract void put( AbstractCacheEvent event );
228
229
230 // /////////////////////////// Inner classes /////////////////////////////
231
232 /** The queue is composed of nodes. */
233 protected static class Node
234 {
235 /** Next node in the singly linked list. */
236 Node next = null;
237
238 /** The payload. */
239 AbstractCacheEventQueue<?, ?>.AbstractCacheEvent event = null;
240 }
241
242 /**
243 * Retries before declaring failure.
244 * <p>
245 * @author asmuts
246 * @created January 15, 2002
247 */
248 protected abstract class AbstractCacheEvent
249 implements Runnable
250 {
251 /** Number of failures encountered processing this event. */
252 int failures = 0;
253
254 /**
255 * Main processing method for the AbstractCacheEvent object
256 */
257 public void run()
258 {
259 try
260 {
261 doRun();
262 }
263 catch ( IOException e )
264 {
265 if ( log.isWarnEnabled() )
266 {
267 log.warn( e );
268 }
269 if ( ++failures >= maxFailure )
270 {
271 if ( log.isWarnEnabled() )
272 {
273 log.warn( "Error while running event from Queue: " + this
274 + ". Dropping Event and marking Event Queue as non-functional." );
275 }
276 setWorking( false );
277 setAlive( false );
278 return;
279 }
280 if ( log.isInfoEnabled() )
281 {
282 log.info( "Error while running event from Queue: " + this + ". Retrying..." );
283 }
284 try
285 {
286 Thread.sleep( waitBeforeRetry );
287 run();
288 }
289 catch ( InterruptedException ie )
290 {
291 if ( log.isErrorEnabled() )
292 {
293 log.warn( "Interrupted while sleeping for retry on event " + this + "." );
294 }
295 // TODO consider if this is best. maybe we should just
296 // destroy
297 setWorking( false );
298 setAlive( false );
299 }
300 }
301 }
302
303 /**
304 * @exception IOException
305 */
306 protected abstract void doRun()
307 throws IOException;
308 }
309
310 /**
311 * An element should be put in the cache.
312 * <p>
313 * @author asmuts
314 * @created January 15, 2002
315 */
316 protected class PutEvent
317 extends AbstractCacheEvent
318 {
319 /** The element to put to the listener */
320 private final ICacheElement<K, V> ice;
321
322 /**
323 * Constructor for the PutEvent object.
324 * <p>
325 * @param ice
326 * @exception IOException
327 */
328 PutEvent( ICacheElement<K, V> ice )
329 throws IOException
330 {
331 this.ice = ice;
332 }
333
334 /**
335 * Call put on the listener.
336 * <p>
337 * @exception IOException
338 */
339 @Override
340 protected void doRun()
341 throws IOException
342 {
343 listener.handlePut( ice );
344 }
345
346 /**
347 * For debugging.
348 * <p>
349 * @return Info on the key and value.
350 */
351 @Override
352 public String toString()
353 {
354 return new StringBuffer( "PutEvent for key: " ).append( ice.getKey() ).append( " value: " )
355 .append( ice.getVal() ).toString();
356 }
357
358 }
359
360 /**
361 * An element should be removed from the cache.
362 * <p>
363 * @author asmuts
364 * @created January 15, 2002
365 */
366 protected class RemoveEvent
367 extends AbstractCacheEvent
368 {
369 /** The key to remove from the listener */
370 private final K key;
371
372 /**
373 * Constructor for the RemoveEvent object
374 * <p>
375 * @param key
376 * @exception IOException
377 */
378 RemoveEvent( K key )
379 throws IOException
380 {
381 this.key = key;
382 }
383
384 /**
385 * Call remove on the listener.
386 * <p>
387 * @exception IOException
388 */
389 @Override
390 protected void doRun()
391 throws IOException
392 {
393 listener.handleRemove( cacheName, key );
394 }
395
396 /**
397 * For debugging.
398 * <p>
399 * @return Info on the key to remove.
400 */
401 @Override
402 public String toString()
403 {
404 return new StringBuffer( "RemoveEvent for " ).append( key ).toString();
405 }
406
407 }
408
409 /**
410 * All elements should be removed from the cache when this event is processed.
411 * <p>
412 * @author asmuts
413 * @created January 15, 2002
414 */
415 protected class RemoveAllEvent
416 extends AbstractCacheEvent
417 {
418 /**
419 * Call removeAll on the listener.
420 * <p>
421 * @exception IOException
422 */
423 @Override
424 protected void doRun()
425 throws IOException
426 {
427 listener.handleRemoveAll( cacheName );
428 }
429
430 /**
431 * For debugging.
432 * <p>
433 * @return The name of the event.
434 */
435 @Override
436 public String toString()
437 {
438 return "RemoveAllEvent";
439 }
440
441 }
442
443 /**
444 * The cache should be disposed when this event is processed.
445 * <p>
446 * @author asmuts
447 * @created January 15, 2002
448 */
449 protected class DisposeEvent
450 extends AbstractCacheEvent
451 {
452 /**
453 * Called when gets to the end of the queue
454 * <p>
455 * @exception IOException
456 */
457 @Override
458 protected void doRun()
459 throws IOException
460 {
461 listener.handleDispose( cacheName );
462 }
463
464 /**
465 * For debugging.
466 * <p>
467 * @return The name of the event.
468 */
469 @Override
470 public String toString()
471 {
472 return "DisposeEvent";
473 }
474 }
475
476 /**
477 * @return whether the queue is functional.
478 */
479 public boolean isWorking()
480 {
481 return working;
482 }
483
484 /**
485 * This means that the queue is functional. If we reached the max number of failures, the queue
486 * is marked as non functional and will never work again.
487 * <p>
488 * @param b
489 */
490 public void setWorking( boolean b )
491 {
492 working = b;
493 }
494 }