1 package org.apache.commons.jcs.engine;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.IOException;
23 import java.util.Collections;
24 import java.util.HashMap;
25 import java.util.Map;
26 import java.util.Set;
27 import java.util.concurrent.ConcurrentLinkedQueue;
28
29 import org.apache.commons.jcs.engine.behavior.ICacheElement;
30 import org.apache.commons.jcs.engine.behavior.ICacheServiceNonLocal;
31 import org.apache.commons.jcs.utils.timing.ElapsedTimer;
32 import org.apache.commons.logging.Log;
33 import org.apache.commons.logging.LogFactory;
34
35
36
37
38
39
40
41
42
43
44 public class ZombieCacheServiceNonLocal<K, V>
45 extends ZombieCacheService<K, V>
46 implements ICacheServiceNonLocal<K, V>
47 {
48
49 private static final Log log = LogFactory.getLog( ZombieCacheServiceNonLocal.class );
50
51
52 private int maxQueueSize = 0;
53
54
55 private final ConcurrentLinkedQueue<ZombieEvent> queue;
56
57
58
59
60 public ZombieCacheServiceNonLocal()
61 {
62 queue = new ConcurrentLinkedQueue<ZombieEvent>();
63 }
64
65
66
67
68
69
70 public ZombieCacheServiceNonLocal( int maxQueueSize )
71 {
72 this.maxQueueSize = maxQueueSize;
73 queue = new ConcurrentLinkedQueue<ZombieEvent>();
74 }
75
76
77
78
79
80
81 public int getQueueSize()
82 {
83 return queue.size();
84 }
85
86 private void addQueue(ZombieEvent event)
87 {
88 queue.add(event);
89 if (queue.size() > maxQueueSize)
90 {
91 queue.poll();
92 }
93 }
94
95
96
97
98
99
100
101 @Override
102 public void update( ICacheElement<K, V> item, long listenerId )
103 {
104 if ( maxQueueSize > 0 )
105 {
106 PutEvent<K, V> event = new PutEvent<K, V>( item, listenerId );
107 addQueue( event );
108 }
109
110 }
111
112
113
114
115
116
117
118
119 @Override
120 public void remove( String cacheName, K key, long listenerId )
121 {
122 if ( maxQueueSize > 0 )
123 {
124 RemoveEvent<K> event = new RemoveEvent<K>( cacheName, key, listenerId );
125 addQueue( event );
126 }
127
128 }
129
130
131
132
133
134
135
136 @Override
137 public void removeAll( String cacheName, long listenerId )
138 {
139 if ( maxQueueSize > 0 )
140 {
141 RemoveAllEvent event = new RemoveAllEvent( cacheName, listenerId );
142 addQueue( event );
143 }
144
145 }
146
147
148
149
150
151
152
153
154
155
156 @Override
157 public ICacheElement<K, V> get( String cacheName, K key, long requesterId )
158 throws IOException
159 {
160
161 return null;
162 }
163
164
165
166
167
168
169
170
171
172
173 @Override
174 public Map<K, ICacheElement<K, V>> getMatching( String cacheName, String pattern, long requesterId )
175 throws IOException
176 {
177 return Collections.emptyMap();
178 }
179
180
181
182
183
184
185
186 @Override
187 public Map<K, ICacheElement<K, V>> getMultiple( String cacheName, Set<K> keys, long requesterId )
188 {
189 return new HashMap<K, ICacheElement<K, V>>();
190 }
191
192
193
194
195
196
197
198 @Override
199 public Set<K> getKeySet( String cacheName )
200 {
201 return Collections.emptySet();
202 }
203
204
205
206
207
208
209
210 public synchronized void propagateEvents( ICacheServiceNonLocal<K, V> service )
211 throws Exception
212 {
213 int cnt = 0;
214 if ( log.isInfoEnabled() )
215 {
216 log.info( "Propagating events to the new ICacheServiceNonLocal." );
217 }
218 ElapsedTimer timer = new ElapsedTimer();
219 while ( !queue.isEmpty() )
220 {
221 cnt++;
222
223
224 ZombieEvent event = queue.poll();
225
226 if ( event instanceof PutEvent )
227 {
228 @SuppressWarnings("unchecked")
229 PutEvent<K, V> putEvent = (PutEvent<K, V>) event;
230 service.update( putEvent.element, event.requesterId );
231 }
232 else if ( event instanceof RemoveEvent )
233 {
234 @SuppressWarnings("unchecked")
235 RemoveEvent<K> removeEvent = (RemoveEvent<K>) event;
236 service.remove( event.cacheName, removeEvent.key, event.requesterId );
237 }
238 else if ( event instanceof RemoveAllEvent )
239 {
240 service.removeAll( event.cacheName, event.requesterId );
241 }
242 }
243 if ( log.isInfoEnabled() )
244 {
245 log.info( "Propagated " + cnt + " events to the new ICacheServiceNonLocal in "
246 + timer.getElapsedTimeString() );
247 }
248 }
249
250
251
252
253 protected static abstract class ZombieEvent
254 {
255
256 String cacheName;
257
258
259 long requesterId;
260 }
261
262
263
264
265 private static class PutEvent<K, V>
266 extends ZombieEvent
267 {
268
269 ICacheElement<K, V> element;
270
271
272
273
274
275
276 public PutEvent( ICacheElement<K, V> element, long requesterId )
277 {
278 this.requesterId = requesterId;
279 this.element = element;
280 }
281 }
282
283
284
285
286 private static class RemoveEvent<K>
287 extends ZombieEvent
288 {
289
290 K key;
291
292
293
294
295
296
297
298 public RemoveEvent( String cacheName, K key, long requesterId )
299 {
300 this.cacheName = cacheName;
301 this.requesterId = requesterId;
302 this.key = key;
303 }
304 }
305
306
307
308
309 private static class RemoveAllEvent
310 extends ZombieEvent
311 {
312
313
314
315
316 public RemoveAllEvent( String cacheName, long requesterId )
317 {
318 this.cacheName = cacheName;
319 this.requesterId = requesterId;
320 }
321 }
322 }