1 package org.apache.commons.jcs.engine;
2
3 /*
4 * Licensed to the Apache Software Foundation (ASF) under one
5 * or more contributor license agreements. See the NOTICE file
6 * distributed with this work for additional information
7 * regarding copyright ownership. The ASF licenses this file
8 * to you under the Apache License, Version 2.0 (the
9 * "License"); you may not use this file except in compliance
10 * with the License. You may obtain a copy of the License at
11 *
12 * http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing,
15 * software distributed under the License is distributed on an
16 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17 * KIND, either express or implied. See the License for the
18 * specific language governing permissions and limitations
19 * under the License.
20 */
21
22 import java.util.ArrayList;
23 import java.util.concurrent.LinkedBlockingQueue;
24 import java.util.concurrent.TimeUnit;
25
26 import org.apache.commons.jcs.engine.behavior.ICacheListener;
27 import org.apache.commons.jcs.engine.stats.StatElement;
28 import org.apache.commons.jcs.engine.stats.Stats;
29 import org.apache.commons.jcs.engine.stats.behavior.IStatElement;
30 import org.apache.commons.jcs.engine.stats.behavior.IStats;
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33
34 /**
35 * An event queue is used to propagate ordered cache events to one and only one target listener.
36 * <p>
37 * This is a modified version of the experimental version. It should lazy initialize the processor
38 * thread, and kill the thread if the queue goes empty for a specified period, now set to 1 minute.
39 * If something comes in after that a new processor thread should be created.
40 */
41 public class CacheEventQueue<K, V>
42 extends AbstractCacheEventQueue<K, V>
43 {
44 /** The logger. */
45 private static final Log log = LogFactory.getLog( CacheEventQueue.class );
46
47 /** The type of queue -- there are pooled and single */
48 private static final QueueType queueType = QueueType.SINGLE;
49
50 /** the thread that works the queue. */
51 private Thread processorThread;
52
53 /** Queue implementation */
54 private LinkedBlockingQueue<AbstractCacheEvent> queue = new LinkedBlockingQueue<AbstractCacheEvent>();
55
56 /**
57 * Constructs with the specified listener and the cache name.
58 * <p>
59 * @param listener
60 * @param listenerId
61 * @param cacheName
62 */
63 public CacheEventQueue( ICacheListener<K, V> listener, long listenerId, String cacheName )
64 {
65 this( listener, listenerId, cacheName, 10, 500 );
66 }
67
68 /**
69 * Constructor for the CacheEventQueue object
70 * <p>
71 * @param listener
72 * @param listenerId
73 * @param cacheName
74 * @param maxFailure
75 * @param waitBeforeRetry
76 */
77 public CacheEventQueue( ICacheListener<K, V> listener, long listenerId, String cacheName, int maxFailure,
78 int waitBeforeRetry )
79 {
80 initialize( listener, listenerId, cacheName, maxFailure, waitBeforeRetry );
81 }
82
83 /**
84 * What type of queue is this.
85 * <p>
86 * @return queueType
87 */
88 @Override
89 public QueueType getQueueType()
90 {
91 return queueType;
92 }
93
94 /**
95 * Kill the processor thread and indicate that the queue is destroyed and no longer alive, but it
96 * can still be working.
97 */
98 protected void stopProcessing()
99 {
100 setAlive(false);
101 processorThread = null;
102 }
103
104 /**
105 * Event Q is empty.
106 * <p>
107 * Calling destroy interrupts the processor thread.
108 */
109 @Override
110 public void destroy()
111 {
112 if ( isAlive() )
113 {
114 setAlive(false);
115
116 if ( log.isInfoEnabled() )
117 {
118 log.info( "Destroying queue, stats = " + getStatistics() );
119 }
120
121 if ( processorThread != null )
122 {
123 processorThread.interrupt();
124 processorThread = null;
125 }
126
127 if ( log.isInfoEnabled() )
128 {
129 log.info( "Cache event queue destroyed: " + this );
130 }
131 }
132 else
133 {
134 if ( log.isInfoEnabled() )
135 {
136 log.info( "Destroy was called after queue was destroyed. Doing nothing. Stats = " + getStatistics() );
137 }
138 }
139 }
140
141 /**
142 * Adds an event to the queue.
143 * <p>
144 * @param event
145 */
146 @Override
147 protected void put( AbstractCacheEvent event )
148 {
149 if ( log.isDebugEnabled() )
150 {
151 log.debug( "Event entering Queue for " + getCacheName() + ": " + event );
152 }
153
154 queue.offer(event);
155
156 if ( isWorking() )
157 {
158 if ( !isAlive() )
159 {
160 setAlive(true);
161 processorThread = new QProcessor();
162 processorThread.start();
163 if ( log.isInfoEnabled() )
164 {
165 log.info( "Cache event queue created: " + this );
166 }
167 }
168 }
169 }
170
171 // /////////////////////////// Inner classes /////////////////////////////
172
173 /**
174 * This is the thread that works the queue.
175 * <p>
176 * @author asmuts
177 */
178 protected class QProcessor
179 extends Thread
180 {
181 /**
182 * Constructor for the QProcessor object
183 * <p>
184 * @param aQueue the event queue to take items from.
185 */
186 QProcessor()
187 {
188 super( "CacheEventQueue.QProcessor-" + getCacheName() );
189 setDaemon( true );
190 }
191
192 /**
193 * Main processing method for the QProcessor object.
194 * <p>
195 * Waits for a specified time (waitToDieMillis) for something to come in and if no new
196 * events come in during that period the run method can exit and the thread is dereferenced.
197 */
198 @Override
199 public void run()
200 {
201
202 while ( CacheEventQueue.this.isAlive() )
203 {
204 AbstractCacheEvent event = null;
205
206 try
207 {
208 event = queue.poll(getWaitToDieMillis(), TimeUnit.MILLISECONDS);
209 }
210 catch (InterruptedException e)
211 {
212 // is ok
213 }
214
215 if ( log.isDebugEnabled() )
216 {
217 log.debug( "Event from queue = " + event );
218 }
219
220 if ( event == null )
221 {
222 stopProcessing();
223 }
224
225 if ( event != null && isWorking() && CacheEventQueue.this.isAlive() )
226 {
227 event.run();
228 }
229 }
230 if ( log.isDebugEnabled() )
231 {
232 log.debug( "QProcessor exiting for " + getCacheName() );
233 }
234 }
235 }
236
237 /**
238 * This method returns semi-structured data on this queue.
239 * <p>
240 * @see org.apache.commons.jcs.engine.behavior.ICacheEventQueue#getStatistics()
241 * @return information on the status and history of the queue
242 */
243 @Override
244 public IStats getStatistics()
245 {
246 IStats stats = new Stats();
247 stats.setTypeName( "Cache Event Queue" );
248
249 ArrayList<IStatElement<?>> elems = new ArrayList<IStatElement<?>>();
250
251 elems.add(new StatElement<Boolean>( "Working", Boolean.valueOf(this.isWorking()) ) );
252 elems.add(new StatElement<Boolean>( "Alive", Boolean.valueOf(this.isAlive()) ) );
253 elems.add(new StatElement<Boolean>( "Empty", Boolean.valueOf(this.isEmpty()) ) );
254 elems.add(new StatElement<Integer>( "Size", Integer.valueOf(this.size()) ) );
255
256 stats.setStatElements( elems );
257
258 return stats;
259 }
260
261 /**
262 * @return whether there are any items in the queue.
263 */
264 @Override
265 public boolean isEmpty()
266 {
267 return queue.isEmpty();
268 }
269
270 /**
271 * Returns the number of elements in the queue.
272 * <p>
273 * @return number of items in the queue.
274 */
275 @Override
276 public int size()
277 {
278 return queue.size();
279 }
280 }