1 package org.apache.commons.jcs3.engine;
2
3 /*
4 * Licensed to the Apache Software Foundation (ASF) under one
5 * or more contributor license agreements. See the NOTICE file
6 * distributed with this work for additional information
7 * regarding copyright ownership. The ASF licenses this file
8 * to you under the Apache License, Version 2.0 (the
9 * "License"); you may not use this file except in compliance
10 * with the License. You may obtain a copy of the License at
11 *
12 * http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing,
15 * software distributed under the License is distributed on an
16 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17 * KIND, either express or implied. See the License for the
18 * specific language governing permissions and limitations
19 * under the License.
20 */
21
22 import java.io.IOException;
23 import java.util.concurrent.atomic.AtomicBoolean;
24
25 import org.apache.commons.jcs3.engine.behavior.ICacheElement;
26 import org.apache.commons.jcs3.engine.behavior.ICacheEventQueue;
27 import org.apache.commons.jcs3.engine.behavior.ICacheListener;
28 import org.apache.commons.jcs3.log.Log;
29 import org.apache.commons.jcs3.log.LogManager;
30
31 /**
32 * An abstract base class to the different implementations
33 */
34 public abstract class AbstractCacheEventQueue<K, V>
35 implements ICacheEventQueue<K, V>
36 {
37 /** The logger. */
38 private static final Log log = LogManager.getLog( AbstractCacheEventQueue.class );
39
40 /** default */
41 protected static final int DEFAULT_WAIT_TO_DIE_MILLIS = 10000;
42
43 /**
44 * time to wait for an event before snuffing the background thread if the queue is empty. make
45 * configurable later
46 */
47 private int waitToDieMillis = DEFAULT_WAIT_TO_DIE_MILLIS;
48
49 /**
50 * When the events are pulled off the queue, then tell the listener to handle the specific event
51 * type. The work is done by the listener.
52 */
53 private ICacheListener<K, V> listener;
54
55 /** Id of the listener registered with this queue */
56 private long listenerId;
57
58 /** The cache region name, if applicable. */
59 private String cacheName;
60
61 /** Maximum number of failures before we buy the farm. */
62 private int maxFailure;
63
64 /** in milliseconds */
65 private int waitBeforeRetry;
66
67 /**
68 * This means that the queue is functional. If we reached the max number of failures, the queue
69 * is marked as non functional and will never work again.
70 */
71 private final AtomicBoolean working = new AtomicBoolean(true);
72
73 /**
74 * Returns the time to wait for events before killing the background thread.
75 * <p>
76 * @return int
77 */
78 public int getWaitToDieMillis()
79 {
80 return waitToDieMillis;
81 }
82
83 /**
84 * Sets the time to wait for events before killing the background thread.
85 * <p>
86 * @param wtdm the ms for the q to sit idle.
87 */
88 public void setWaitToDieMillis( final int wtdm )
89 {
90 waitToDieMillis = wtdm;
91 }
92
93 /**
94 * Creates a brief string identifying the listener and the region.
95 * <p>
96 * @return String debugging info.
97 */
98 @Override
99 public String toString()
100 {
101 return "CacheEventQueue [listenerId=" + listenerId + ", cacheName=" + cacheName + "]";
102 }
103
104 /**
105 * @return The listenerId value
106 */
107 @Override
108 public long getListenerId()
109 {
110 return listenerId;
111 }
112
113 /**
114 * @return the cacheName
115 */
116 protected String getCacheName()
117 {
118 return cacheName;
119 }
120
121 /**
122 * Initializes the queue.
123 * <p>
124 * @param listener
125 * @param listenerId
126 * @param cacheName
127 * @param maxFailure
128 * @param waitBeforeRetry
129 */
130 protected void initialize( final ICacheListener<K, V> listener, final long listenerId, final String cacheName, final int maxFailure,
131 final int waitBeforeRetry)
132 {
133 if ( listener == null )
134 {
135 throw new IllegalArgumentException( "listener must not be null" );
136 }
137
138 this.listener = listener;
139 this.listenerId = listenerId;
140 this.cacheName = cacheName;
141 this.maxFailure = maxFailure <= 0 ? 3 : maxFailure;
142 this.waitBeforeRetry = waitBeforeRetry <= 0 ? 500 : waitBeforeRetry;
143
144 log.debug( "Constructed: {0}", this );
145 }
146
147 /**
148 * This adds a put event to the queue. When it is processed, the element will be put to the
149 * listener.
150 * <p>
151 * @param ce The feature to be added to the PutEvent attribute
152 * @throws IOException
153 */
154 @Override
155 public void addPutEvent( final ICacheElement<K, V> ce )
156 {
157 put( new PutEvent( ce ) );
158 }
159
160 /**
161 * This adds a remove event to the queue. When processed the listener's remove method will be
162 * called for the key.
163 * <p>
164 * @param key The feature to be added to the RemoveEvent attribute
165 * @throws IOException
166 */
167 @Override
168 public void addRemoveEvent( final K key )
169 {
170 put( new RemoveEvent( key ) );
171 }
172
173 /**
174 * This adds a remove all event to the queue. When it is processed, all elements will be removed
175 * from the cache.
176 */
177 @Override
178 public void addRemoveAllEvent()
179 {
180 put( new RemoveAllEvent() );
181 }
182
183 /**
184 * This adds a dispose event to the queue. When it is processed, the cache is shut down
185 */
186 @Override
187 public void addDisposeEvent()
188 {
189 put( new DisposeEvent() );
190 }
191
192 /**
193 * Adds an event to the queue.
194 * <p>
195 * @param event
196 */
197 protected abstract void put( AbstractCacheEvent event );
198
199
200 // /////////////////////////// Inner classes /////////////////////////////
201 /**
202 * Retries before declaring failure.
203 */
204 protected abstract class AbstractCacheEvent implements Runnable
205 {
206 /**
207 * Main processing method for the AbstractCacheEvent object
208 */
209 @Override
210 public void run()
211 {
212 for (int failures = 0; failures < maxFailure; failures++)
213 {
214 try
215 {
216 doRun();
217 return;
218 }
219 catch (final IOException e)
220 {
221 log.warn("Error while running event from Queue: {0}. "
222 + "Retrying...", this, e);
223 }
224
225 try
226 {
227 Thread.sleep( waitBeforeRetry );
228 }
229 catch ( final InterruptedException ie )
230 {
231 log.warn("Interrupted while sleeping for retry on event "
232 + "{0}.", this, ie);
233 break;
234 }
235 }
236
237 log.warn( "Dropping Event and marking Event Queue {0} as "
238 + "non-functional.", this );
239 destroy();
240 }
241
242 /**
243 * @throws IOException
244 */
245 protected abstract void doRun()
246 throws IOException;
247 }
248
249 /**
250 * An element should be put in the cache.
251 */
252 protected class PutEvent
253 extends AbstractCacheEvent
254 {
255 /** The element to put to the listener */
256 private final ICacheElement<K, V> ice;
257
258 /**
259 * Constructor for the PutEvent object.
260 * <p>
261 * @param ice
262 */
263 PutEvent( final ICacheElement<K, V> ice )
264 {
265 this.ice = ice;
266 }
267
268 /**
269 * Call put on the listener.
270 * <p>
271 * @throws IOException
272 */
273 @Override
274 protected void doRun()
275 throws IOException
276 {
277 listener.handlePut( ice );
278 }
279
280 /**
281 * For debugging.
282 * <p>
283 * @return Info on the key and value.
284 */
285 @Override
286 public String toString()
287 {
288 return new StringBuilder( "PutEvent for key: " )
289 .append( ice.getKey() )
290 .append( " value: " )
291 .append( ice.getVal() )
292 .toString();
293 }
294
295 }
296
297 /**
298 * An element should be removed from the cache.
299 */
300 protected class RemoveEvent
301 extends AbstractCacheEvent
302 {
303 /** The key to remove from the listener */
304 private final K key;
305
306 /**
307 * Constructor for the RemoveEvent object
308 * <p>
309 * @param key
310 */
311 RemoveEvent( final K key )
312 {
313 this.key = key;
314 }
315
316 /**
317 * Call remove on the listener.
318 * <p>
319 * @throws IOException
320 */
321 @Override
322 protected void doRun()
323 throws IOException
324 {
325 listener.handleRemove( cacheName, key );
326 }
327
328 /**
329 * For debugging.
330 * <p>
331 * @return Info on the key to remove.
332 */
333 @Override
334 public String toString()
335 {
336 return new StringBuilder( "RemoveEvent for " )
337 .append( key )
338 .toString();
339 }
340
341 }
342
343 /**
344 * All elements should be removed from the cache when this event is processed.
345 */
346 protected class RemoveAllEvent
347 extends AbstractCacheEvent
348 {
349 /**
350 * Call removeAll on the listener.
351 * <p>
352 * @throws IOException
353 */
354 @Override
355 protected void doRun()
356 throws IOException
357 {
358 listener.handleRemoveAll( cacheName );
359 }
360
361 /**
362 * For debugging.
363 * <p>
364 * @return The name of the event.
365 */
366 @Override
367 public String toString()
368 {
369 return "RemoveAllEvent";
370 }
371 }
372
373 /**
374 * The cache should be disposed when this event is processed.
375 */
376 protected class DisposeEvent
377 extends AbstractCacheEvent
378 {
379 /**
380 * Called when gets to the end of the queue
381 *
382 * @throws IOException
383 */
384 @Override
385 protected void doRun()
386 throws IOException
387 {
388 listener.handleDispose( cacheName );
389 }
390
391 /**
392 * For debugging.
393 *
394 * @return The name of the event.
395 */
396 @Override
397 public String toString()
398 {
399 return "DisposeEvent";
400 }
401 }
402
403 /**
404 * @return whether the queue is functional.
405 */
406 @Override
407 public boolean isWorking()
408 {
409 return working.get();
410 }
411
412 /**
413 * This means that the queue is functional. If we reached the max number of failures, the queue
414 * is marked as non functional and will never work again.
415 * <p>
416 * @param b
417 */
418 public void setWorking( final boolean b )
419 {
420 working.set(b);
421 }
422 }