001package org.apache.commons.jcs.engine; 002 003/* 004 * Licensed to the Apache Software Foundation (ASF) under one 005 * or more contributor license agreements. See the NOTICE file 006 * distributed with this work for additional information 007 * regarding copyright ownership. The ASF licenses this file 008 * to you under the Apache License, Version 2.0 (the 009 * "License"); you may not use this file except in compliance 010 * with the License. You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, 015 * software distributed under the License is distributed on an 016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 017 * KIND, either express or implied. See the License for the 018 * specific language governing permissions and limitations 019 * under the License. 020 */ 021 022import java.util.ArrayList; 023import java.util.concurrent.LinkedBlockingQueue; 024import java.util.concurrent.TimeUnit; 025 026import org.apache.commons.jcs.engine.behavior.ICacheListener; 027import org.apache.commons.jcs.engine.stats.StatElement; 028import org.apache.commons.jcs.engine.stats.Stats; 029import org.apache.commons.jcs.engine.stats.behavior.IStatElement; 030import org.apache.commons.jcs.engine.stats.behavior.IStats; 031import org.apache.commons.logging.Log; 032import org.apache.commons.logging.LogFactory; 033 034/** 035 * An event queue is used to propagate ordered cache events to one and only one target listener. 036 * <p> 037 * This is a modified version of the experimental version. It should lazy initialize the processor 038 * thread, and kill the thread if the queue goes empty for a specified period, now set to 1 minute. 039 * If something comes in after that a new processor thread should be created. 040 */ 041public class CacheEventQueue<K, V> 042 extends AbstractCacheEventQueue<K, V> 043{ 044 /** The logger. */ 045 private static final Log log = LogFactory.getLog( CacheEventQueue.class ); 046 047 /** The type of queue -- there are pooled and single */ 048 private static final QueueType queueType = QueueType.SINGLE; 049 050 /** the thread that works the queue. */ 051 private Thread processorThread; 052 053 /** Queue implementation */ 054 private LinkedBlockingQueue<AbstractCacheEvent> queue = new LinkedBlockingQueue<AbstractCacheEvent>(); 055 056 /** 057 * Constructs with the specified listener and the cache name. 058 * <p> 059 * @param listener 060 * @param listenerId 061 * @param cacheName 062 */ 063 public CacheEventQueue( ICacheListener<K, V> listener, long listenerId, String cacheName ) 064 { 065 this( listener, listenerId, cacheName, 10, 500 ); 066 } 067 068 /** 069 * Constructor for the CacheEventQueue object 070 * <p> 071 * @param listener 072 * @param listenerId 073 * @param cacheName 074 * @param maxFailure 075 * @param waitBeforeRetry 076 */ 077 public CacheEventQueue( ICacheListener<K, V> listener, long listenerId, String cacheName, int maxFailure, 078 int waitBeforeRetry ) 079 { 080 initialize( listener, listenerId, cacheName, maxFailure, waitBeforeRetry ); 081 } 082 083 /** 084 * What type of queue is this. 085 * <p> 086 * @return queueType 087 */ 088 @Override 089 public QueueType getQueueType() 090 { 091 return queueType; 092 } 093 094 /** 095 * Kill the processor thread and indicate that the queue is destroyed and no longer alive, but it 096 * can still be working. 097 */ 098 protected void stopProcessing() 099 { 100 setAlive(false); 101 processorThread = null; 102 } 103 104 /** 105 * Event Q is empty. 106 * <p> 107 * Calling destroy interrupts the processor thread. 108 */ 109 @Override 110 public void destroy() 111 { 112 if ( isAlive() ) 113 { 114 setAlive(false); 115 116 if ( log.isInfoEnabled() ) 117 { 118 log.info( "Destroying queue, stats = " + getStatistics() ); 119 } 120 121 if ( processorThread != null ) 122 { 123 processorThread.interrupt(); 124 processorThread = null; 125 } 126 127 if ( log.isInfoEnabled() ) 128 { 129 log.info( "Cache event queue destroyed: " + this ); 130 } 131 } 132 else 133 { 134 if ( log.isInfoEnabled() ) 135 { 136 log.info( "Destroy was called after queue was destroyed. Doing nothing. Stats = " + getStatistics() ); 137 } 138 } 139 } 140 141 /** 142 * Adds an event to the queue. 143 * <p> 144 * @param event 145 */ 146 @Override 147 protected void put( AbstractCacheEvent event ) 148 { 149 if ( log.isDebugEnabled() ) 150 { 151 log.debug( "Event entering Queue for " + getCacheName() + ": " + event ); 152 } 153 154 queue.offer(event); 155 156 if ( isWorking() ) 157 { 158 if ( !isAlive() ) 159 { 160 setAlive(true); 161 processorThread = new QProcessor(); 162 processorThread.start(); 163 if ( log.isInfoEnabled() ) 164 { 165 log.info( "Cache event queue created: " + this ); 166 } 167 } 168 } 169 } 170 171 // /////////////////////////// Inner classes ///////////////////////////// 172 173 /** 174 * This is the thread that works the queue. 175 * <p> 176 * @author asmuts 177 */ 178 protected class QProcessor 179 extends Thread 180 { 181 /** 182 * Constructor for the QProcessor object 183 * <p> 184 * @param aQueue the event queue to take items from. 185 */ 186 QProcessor() 187 { 188 super( "CacheEventQueue.QProcessor-" + getCacheName() ); 189 setDaemon( true ); 190 } 191 192 /** 193 * Main processing method for the QProcessor object. 194 * <p> 195 * Waits for a specified time (waitToDieMillis) for something to come in and if no new 196 * events come in during that period the run method can exit and the thread is dereferenced. 197 */ 198 @Override 199 public void run() 200 { 201 202 while ( CacheEventQueue.this.isAlive() ) 203 { 204 AbstractCacheEvent event = null; 205 206 try 207 { 208 event = queue.poll(getWaitToDieMillis(), TimeUnit.MILLISECONDS); 209 } 210 catch (InterruptedException e) 211 { 212 // is ok 213 } 214 215 if ( log.isDebugEnabled() ) 216 { 217 log.debug( "Event from queue = " + event ); 218 } 219 220 if ( event == null ) 221 { 222 stopProcessing(); 223 } 224 225 if ( event != null && isWorking() && CacheEventQueue.this.isAlive() ) 226 { 227 event.run(); 228 } 229 } 230 if ( log.isDebugEnabled() ) 231 { 232 log.debug( "QProcessor exiting for " + getCacheName() ); 233 } 234 } 235 } 236 237 /** 238 * This method returns semi-structured data on this queue. 239 * <p> 240 * @see org.apache.commons.jcs.engine.behavior.ICacheEventQueue#getStatistics() 241 * @return information on the status and history of the queue 242 */ 243 @Override 244 public IStats getStatistics() 245 { 246 IStats stats = new Stats(); 247 stats.setTypeName( "Cache Event Queue" ); 248 249 ArrayList<IStatElement<?>> elems = new ArrayList<IStatElement<?>>(); 250 251 elems.add(new StatElement<Boolean>( "Working", Boolean.valueOf(this.isWorking()) ) ); 252 elems.add(new StatElement<Boolean>( "Alive", Boolean.valueOf(this.isAlive()) ) ); 253 elems.add(new StatElement<Boolean>( "Empty", Boolean.valueOf(this.isEmpty()) ) ); 254 elems.add(new StatElement<Integer>( "Size", Integer.valueOf(this.size()) ) ); 255 256 stats.setStatElements( elems ); 257 258 return stats; 259 } 260 261 /** 262 * @return whether there are any items in the queue. 263 */ 264 @Override 265 public boolean isEmpty() 266 { 267 return queue.isEmpty(); 268 } 269 270 /** 271 * Returns the number of elements in the queue. 272 * <p> 273 * @return number of items in the queue. 274 */ 275 @Override 276 public int size() 277 { 278 return queue.size(); 279 } 280}