001package org.apache.commons.jcs3.engine; 002 003/* 004 * Licensed to the Apache Software Foundation (ASF) under one 005 * or more contributor license agreements. See the NOTICE file 006 * distributed with this work for additional information 007 * regarding copyright ownership. The ASF licenses this file 008 * to you under the Apache License, Version 2.0 (the 009 * "License"); you may not use this file except in compliance 010 * with the License. You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, 015 * software distributed under the License is distributed on an 016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 017 * KIND, either express or implied. See the License for the 018 * specific language governing permissions and limitations 019 * under the License. 020 */ 021 022import java.io.IOException; 023import java.util.concurrent.atomic.AtomicBoolean; 024 025import org.apache.commons.jcs3.engine.behavior.ICacheElement; 026import org.apache.commons.jcs3.engine.behavior.ICacheEventQueue; 027import org.apache.commons.jcs3.engine.behavior.ICacheListener; 028import org.apache.commons.jcs3.log.Log; 029import org.apache.commons.jcs3.log.LogManager; 030 031/** 032 * An abstract base class to the different implementations 033 */ 034public abstract class AbstractCacheEventQueue<K, V> 035 implements ICacheEventQueue<K, V> 036{ 037 /** The logger. */ 038 private static final Log log = LogManager.getLog( AbstractCacheEventQueue.class ); 039 040 /** default */ 041 protected static final int DEFAULT_WAIT_TO_DIE_MILLIS = 10000; 042 043 /** 044 * time to wait for an event before snuffing the background thread if the queue is empty. make 045 * configurable later 046 */ 047 private int waitToDieMillis = DEFAULT_WAIT_TO_DIE_MILLIS; 048 049 /** 050 * When the events are pulled off the queue, then tell the listener to handle the specific event 051 * type. The work is done by the listener. 052 */ 053 private ICacheListener<K, V> listener; 054 055 /** Id of the listener registered with this queue */ 056 private long listenerId; 057 058 /** The cache region name, if applicable. */ 059 private String cacheName; 060 061 /** Maximum number of failures before we buy the farm. */ 062 private int maxFailure; 063 064 /** in milliseconds */ 065 private int waitBeforeRetry; 066 067 /** 068 * This means that the queue is functional. If we reached the max number of failures, the queue 069 * is marked as non functional and will never work again. 070 */ 071 private final AtomicBoolean working = new AtomicBoolean(true); 072 073 /** 074 * Returns the time to wait for events before killing the background thread. 075 * <p> 076 * @return int 077 */ 078 public int getWaitToDieMillis() 079 { 080 return waitToDieMillis; 081 } 082 083 /** 084 * Sets the time to wait for events before killing the background thread. 085 * <p> 086 * @param wtdm the ms for the q to sit idle. 087 */ 088 public void setWaitToDieMillis( final int wtdm ) 089 { 090 waitToDieMillis = wtdm; 091 } 092 093 /** 094 * Creates a brief string identifying the listener and the region. 095 * <p> 096 * @return String debugging info. 097 */ 098 @Override 099 public String toString() 100 { 101 return "CacheEventQueue [listenerId=" + listenerId + ", cacheName=" + cacheName + "]"; 102 } 103 104 /** 105 * @return The listenerId value 106 */ 107 @Override 108 public long getListenerId() 109 { 110 return listenerId; 111 } 112 113 /** 114 * @return the cacheName 115 */ 116 protected String getCacheName() 117 { 118 return cacheName; 119 } 120 121 /** 122 * Initializes the queue. 123 * <p> 124 * @param listener 125 * @param listenerId 126 * @param cacheName 127 * @param maxFailure 128 * @param waitBeforeRetry 129 */ 130 protected void initialize( final ICacheListener<K, V> listener, final long listenerId, final String cacheName, final int maxFailure, 131 final int waitBeforeRetry) 132 { 133 if ( listener == null ) 134 { 135 throw new IllegalArgumentException( "listener must not be null" ); 136 } 137 138 this.listener = listener; 139 this.listenerId = listenerId; 140 this.cacheName = cacheName; 141 this.maxFailure = maxFailure <= 0 ? 3 : maxFailure; 142 this.waitBeforeRetry = waitBeforeRetry <= 0 ? 500 : waitBeforeRetry; 143 144 log.debug( "Constructed: {0}", this ); 145 } 146 147 /** 148 * This adds a put event to the queue. When it is processed, the element will be put to the 149 * listener. 150 * <p> 151 * @param ce The feature to be added to the PutEvent attribute 152 * @throws IOException 153 */ 154 @Override 155 public void addPutEvent( final ICacheElement<K, V> ce ) 156 { 157 put( new PutEvent( ce ) ); 158 } 159 160 /** 161 * This adds a remove event to the queue. When processed the listener's remove method will be 162 * called for the key. 163 * <p> 164 * @param key The feature to be added to the RemoveEvent attribute 165 * @throws IOException 166 */ 167 @Override 168 public void addRemoveEvent( final K key ) 169 { 170 put( new RemoveEvent( key ) ); 171 } 172 173 /** 174 * This adds a remove all event to the queue. When it is processed, all elements will be removed 175 * from the cache. 176 */ 177 @Override 178 public void addRemoveAllEvent() 179 { 180 put( new RemoveAllEvent() ); 181 } 182 183 /** 184 * This adds a dispose event to the queue. When it is processed, the cache is shut down 185 */ 186 @Override 187 public void addDisposeEvent() 188 { 189 put( new DisposeEvent() ); 190 } 191 192 /** 193 * Adds an event to the queue. 194 * <p> 195 * @param event 196 */ 197 protected abstract void put( AbstractCacheEvent event ); 198 199 200 // /////////////////////////// Inner classes ///////////////////////////// 201 /** 202 * Retries before declaring failure. 203 */ 204 protected abstract class AbstractCacheEvent implements Runnable 205 { 206 /** 207 * Main processing method for the AbstractCacheEvent object 208 */ 209 @Override 210 public void run() 211 { 212 for (int failures = 0; failures < maxFailure; failures++) 213 { 214 try 215 { 216 doRun(); 217 return; 218 } 219 catch (final IOException e) 220 { 221 log.warn("Error while running event from Queue: {0}. " 222 + "Retrying...", this, e); 223 } 224 225 try 226 { 227 Thread.sleep( waitBeforeRetry ); 228 } 229 catch ( final InterruptedException ie ) 230 { 231 log.warn("Interrupted while sleeping for retry on event " 232 + "{0}.", this, ie); 233 break; 234 } 235 } 236 237 log.warn( "Dropping Event and marking Event Queue {0} as " 238 + "non-functional.", this ); 239 destroy(); 240 } 241 242 /** 243 * @throws IOException 244 */ 245 protected abstract void doRun() 246 throws IOException; 247 } 248 249 /** 250 * An element should be put in the cache. 251 */ 252 protected class PutEvent 253 extends AbstractCacheEvent 254 { 255 /** The element to put to the listener */ 256 private final ICacheElement<K, V> ice; 257 258 /** 259 * Constructor for the PutEvent object. 260 * <p> 261 * @param ice 262 */ 263 PutEvent( final ICacheElement<K, V> ice ) 264 { 265 this.ice = ice; 266 } 267 268 /** 269 * Call put on the listener. 270 * <p> 271 * @throws IOException 272 */ 273 @Override 274 protected void doRun() 275 throws IOException 276 { 277 listener.handlePut( ice ); 278 } 279 280 /** 281 * For debugging. 282 * <p> 283 * @return Info on the key and value. 284 */ 285 @Override 286 public String toString() 287 { 288 return new StringBuilder( "PutEvent for key: " ) 289 .append( ice.getKey() ) 290 .append( " value: " ) 291 .append( ice.getVal() ) 292 .toString(); 293 } 294 295 } 296 297 /** 298 * An element should be removed from the cache. 299 */ 300 protected class RemoveEvent 301 extends AbstractCacheEvent 302 { 303 /** The key to remove from the listener */ 304 private final K key; 305 306 /** 307 * Constructor for the RemoveEvent object 308 * <p> 309 * @param key 310 */ 311 RemoveEvent( final K key ) 312 { 313 this.key = key; 314 } 315 316 /** 317 * Call remove on the listener. 318 * <p> 319 * @throws IOException 320 */ 321 @Override 322 protected void doRun() 323 throws IOException 324 { 325 listener.handleRemove( cacheName, key ); 326 } 327 328 /** 329 * For debugging. 330 * <p> 331 * @return Info on the key to remove. 332 */ 333 @Override 334 public String toString() 335 { 336 return new StringBuilder( "RemoveEvent for " ) 337 .append( key ) 338 .toString(); 339 } 340 341 } 342 343 /** 344 * All elements should be removed from the cache when this event is processed. 345 */ 346 protected class RemoveAllEvent 347 extends AbstractCacheEvent 348 { 349 /** 350 * Call removeAll on the listener. 351 * <p> 352 * @throws IOException 353 */ 354 @Override 355 protected void doRun() 356 throws IOException 357 { 358 listener.handleRemoveAll( cacheName ); 359 } 360 361 /** 362 * For debugging. 363 * <p> 364 * @return The name of the event. 365 */ 366 @Override 367 public String toString() 368 { 369 return "RemoveAllEvent"; 370 } 371 } 372 373 /** 374 * The cache should be disposed when this event is processed. 375 */ 376 protected class DisposeEvent 377 extends AbstractCacheEvent 378 { 379 /** 380 * Called when gets to the end of the queue 381 * 382 * @throws IOException 383 */ 384 @Override 385 protected void doRun() 386 throws IOException 387 { 388 listener.handleDispose( cacheName ); 389 } 390 391 /** 392 * For debugging. 393 * 394 * @return The name of the event. 395 */ 396 @Override 397 public String toString() 398 { 399 return "DisposeEvent"; 400 } 401 } 402 403 /** 404 * @return whether the queue is functional. 405 */ 406 @Override 407 public boolean isWorking() 408 { 409 return working.get(); 410 } 411 412 /** 413 * This means that the queue is functional. If we reached the max number of failures, the queue 414 * is marked as non functional and will never work again. 415 * <p> 416 * @param b 417 */ 418 public void setWorking( final boolean b ) 419 { 420 working.set(b); 421 } 422}