1 package org.apache.commons.jcs.engine;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.ArrayList;
23 import java.util.concurrent.LinkedBlockingQueue;
24 import java.util.concurrent.TimeUnit;
25
26 import org.apache.commons.jcs.engine.behavior.ICacheListener;
27 import org.apache.commons.jcs.engine.stats.StatElement;
28 import org.apache.commons.jcs.engine.stats.Stats;
29 import org.apache.commons.jcs.engine.stats.behavior.IStatElement;
30 import org.apache.commons.jcs.engine.stats.behavior.IStats;
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33
34
35
36
37
38
39
40
41 public class CacheEventQueue<K, V>
42 extends AbstractCacheEventQueue<K, V>
43 {
44
45 private static final Log log = LogFactory.getLog( CacheEventQueue.class );
46
47
48 private static final QueueType queueType = QueueType.SINGLE;
49
50
51 private Thread processorThread;
52
53
54 private LinkedBlockingQueue<AbstractCacheEvent> queue = new LinkedBlockingQueue<AbstractCacheEvent>();
55
56
57
58
59
60
61
62
63 public CacheEventQueue( ICacheListener<K, V> listener, long listenerId, String cacheName )
64 {
65 this( listener, listenerId, cacheName, 10, 500 );
66 }
67
68
69
70
71
72
73
74
75
76
77 public CacheEventQueue( ICacheListener<K, V> listener, long listenerId, String cacheName, int maxFailure,
78 int waitBeforeRetry )
79 {
80 initialize( listener, listenerId, cacheName, maxFailure, waitBeforeRetry );
81 }
82
83
84
85
86
87
88 @Override
89 public QueueType getQueueType()
90 {
91 return queueType;
92 }
93
94
95
96
97
98 protected void stopProcessing()
99 {
100 setAlive(false);
101 processorThread = null;
102 }
103
104
105
106
107
108
109 @Override
110 public void destroy()
111 {
112 if ( isAlive() )
113 {
114 setAlive(false);
115
116 if ( log.isInfoEnabled() )
117 {
118 log.info( "Destroying queue, stats = " + getStatistics() );
119 }
120
121 if ( processorThread != null )
122 {
123 processorThread.interrupt();
124 processorThread = null;
125 }
126
127 if ( log.isInfoEnabled() )
128 {
129 log.info( "Cache event queue destroyed: " + this );
130 }
131 }
132 else
133 {
134 if ( log.isInfoEnabled() )
135 {
136 log.info( "Destroy was called after queue was destroyed. Doing nothing. Stats = " + getStatistics() );
137 }
138 }
139 }
140
141
142
143
144
145
146 @Override
147 protected void put( AbstractCacheEvent event )
148 {
149 if ( log.isDebugEnabled() )
150 {
151 log.debug( "Event entering Queue for " + getCacheName() + ": " + event );
152 }
153
154 queue.offer(event);
155
156 if ( isWorking() )
157 {
158 if ( !isAlive() )
159 {
160 setAlive(true);
161 processorThread = new QProcessor();
162 processorThread.start();
163 if ( log.isInfoEnabled() )
164 {
165 log.info( "Cache event queue created: " + this );
166 }
167 }
168 }
169 }
170
171
172
173
174
175
176
177
178 protected class QProcessor
179 extends Thread
180 {
181
182
183
184
185
186 QProcessor()
187 {
188 super( "CacheEventQueue.QProcessor-" + getCacheName() );
189 setDaemon( true );
190 }
191
192
193
194
195
196
197
198 @Override
199 public void run()
200 {
201
202 while ( CacheEventQueue.this.isAlive() )
203 {
204 AbstractCacheEvent event = null;
205
206 try
207 {
208 event = queue.poll(getWaitToDieMillis(), TimeUnit.MILLISECONDS);
209 }
210 catch (InterruptedException e)
211 {
212
213 }
214
215 if ( log.isDebugEnabled() )
216 {
217 log.debug( "Event from queue = " + event );
218 }
219
220 if ( event == null )
221 {
222 stopProcessing();
223 }
224
225 if ( event != null && isWorking() && CacheEventQueue.this.isAlive() )
226 {
227 event.run();
228 }
229 }
230 if ( log.isDebugEnabled() )
231 {
232 log.debug( "QProcessor exiting for " + getCacheName() );
233 }
234 }
235 }
236
237
238
239
240
241
242
243 @Override
244 public IStats getStatistics()
245 {
246 IStats stats = new Stats();
247 stats.setTypeName( "Cache Event Queue" );
248
249 ArrayList<IStatElement<?>> elems = new ArrayList<IStatElement<?>>();
250
251 elems.add(new StatElement<Boolean>( "Working", Boolean.valueOf(this.isWorking()) ) );
252 elems.add(new StatElement<Boolean>( "Alive", Boolean.valueOf(this.isAlive()) ) );
253 elems.add(new StatElement<Boolean>( "Empty", Boolean.valueOf(this.isEmpty()) ) );
254 elems.add(new StatElement<Integer>( "Size", Integer.valueOf(this.size()) ) );
255
256 stats.setStatElements( elems );
257
258 return stats;
259 }
260
261
262
263
264 @Override
265 public boolean isEmpty()
266 {
267 return queue.isEmpty();
268 }
269
270
271
272
273
274
275 @Override
276 public int size()
277 {
278 return queue.size();
279 }
280 }