1 package org.apache.commons.jcs.auxiliary.lateral.socket.tcp;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.StringTokenizer;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.locks.ReentrantLock;
27
28 import org.apache.commons.jcs.auxiliary.AbstractAuxiliaryCacheFactory;
29 import org.apache.commons.jcs.auxiliary.AuxiliaryCacheAttributes;
30 import org.apache.commons.jcs.auxiliary.lateral.LateralCache;
31 import org.apache.commons.jcs.auxiliary.lateral.LateralCacheMonitor;
32 import org.apache.commons.jcs.auxiliary.lateral.LateralCacheNoWait;
33 import org.apache.commons.jcs.auxiliary.lateral.LateralCacheNoWaitFacade;
34 import org.apache.commons.jcs.auxiliary.lateral.behavior.ILateralCacheListener;
35 import org.apache.commons.jcs.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
36 import org.apache.commons.jcs.engine.CacheWatchRepairable;
37 import org.apache.commons.jcs.engine.ZombieCacheServiceNonLocal;
38 import org.apache.commons.jcs.engine.ZombieCacheWatch;
39 import org.apache.commons.jcs.engine.behavior.ICache;
40 import org.apache.commons.jcs.engine.behavior.ICacheServiceNonLocal;
41 import org.apache.commons.jcs.engine.behavior.ICompositeCacheManager;
42 import org.apache.commons.jcs.engine.behavior.IElementSerializer;
43 import org.apache.commons.jcs.engine.behavior.IShutdownObserver;
44 import org.apache.commons.jcs.engine.logging.behavior.ICacheEventLogger;
45 import org.apache.commons.jcs.utils.discovery.UDPDiscoveryManager;
46 import org.apache.commons.jcs.utils.discovery.UDPDiscoveryService;
47 import org.apache.commons.logging.Log;
48 import org.apache.commons.logging.LogFactory;
49
50
51
52
53
54
55
56
57 public class LateralTCPCacheFactory
58 extends AbstractAuxiliaryCacheFactory
59 {
60
61 private static final Log log = LogFactory.getLog( LateralTCPCacheFactory.class );
62
63
64 private ConcurrentHashMap<String, ICacheServiceNonLocal<?, ?>> csnlInstances;
65
66
67 private ReentrantLock csnlLock;
68
69
70 private ConcurrentHashMap<String, LateralTCPDiscoveryListener> lTCPDLInstances;
71
72
73 private LateralCacheMonitor monitor;
74
75
76
77
78
79 private CacheWatchRepairable lateralWatch;
80
81
82
83
84
85
86
87
88
89
90 @Override
91 public <K, V> LateralCacheNoWaitFacade<K, V> createCache(
92 AuxiliaryCacheAttributes iaca, ICompositeCacheManager cacheMgr,
93 ICacheEventLogger cacheEventLogger, IElementSerializer elementSerializer )
94 {
95 ITCPLateralCacheAttributes lac = (ITCPLateralCacheAttributes) iaca;
96 ArrayList<ICache<K, V>> noWaits = new ArrayList<ICache<K, V>>();
97
98
99
100
101 if ( lac.getTcpServers() != null )
102 {
103 StringTokenizer it = new StringTokenizer( lac.getTcpServers(), "," );
104 if ( log.isDebugEnabled() )
105 {
106 log.debug( "Configured for [" + it.countTokens() + "] servers." );
107 }
108 while ( it.hasMoreElements() )
109 {
110 String server = (String) it.nextElement();
111 if ( log.isDebugEnabled() )
112 {
113 log.debug( "tcp server = " + server );
114 }
115 ITCPLateralCacheAttributes lacC = (ITCPLateralCacheAttributes) lac.clone();
116 lacC.setTcpServer( server );
117
118 LateralCacheNoWait<K, V> lateralNoWait = createCacheNoWait(lacC, cacheEventLogger, elementSerializer);
119
120 addListenerIfNeeded( lacC, cacheMgr );
121 monitor.addCache(lateralNoWait);
122 noWaits.add( lateralNoWait );
123 }
124 }
125
126 ILateralCacheListener<K, V> listener = createListener( lac, cacheMgr );
127
128
129 @SuppressWarnings("unchecked")
130 LateralCacheNoWait<K, V>[] lcnwArray = noWaits.toArray( new LateralCacheNoWait[0] );
131 LateralCacheNoWaitFacade<K, V> lcnwf =
132 new LateralCacheNoWaitFacade<K, V>(listener, lcnwArray, lac );
133
134
135 createDiscoveryService( lac, lcnwf, cacheMgr, cacheEventLogger, elementSerializer );
136
137 return lcnwf;
138 }
139
140 protected <K, V> LateralCacheNoWait<K, V> createCacheNoWait( ITCPLateralCacheAttributes lca,
141 ICacheEventLogger cacheEventLogger, IElementSerializer elementSerializer )
142 {
143 ICacheServiceNonLocal<K, V> lateralService = getCSNLInstance(lca);
144
145 LateralCache<K, V> cache = new LateralCache<K, V>( lca, lateralService, this.monitor );
146 cache.setCacheEventLogger( cacheEventLogger );
147 cache.setElementSerializer( elementSerializer );
148
149 if ( log.isDebugEnabled() )
150 {
151 log.debug( "Created cache for noWait, cache [" + cache + "]" );
152 }
153
154 LateralCacheNoWait<K, V> lateralNoWait = new LateralCacheNoWait<K, V>( cache );
155 lateralNoWait.setCacheEventLogger( cacheEventLogger );
156 lateralNoWait.setElementSerializer( elementSerializer );
157
158 if ( log.isInfoEnabled() )
159 {
160 log.info( "Created LateralCacheNoWait for [" + lca + "] LateralCacheNoWait = [" + lateralNoWait
161 + "]" );
162 }
163
164 return lateralNoWait;
165 }
166
167
168
169
170 @Override
171 public void initialize()
172 {
173 this.csnlInstances = new ConcurrentHashMap<String, ICacheServiceNonLocal<?, ?>>();
174 this.csnlLock = new ReentrantLock();
175 this.lTCPDLInstances = new ConcurrentHashMap<String, LateralTCPDiscoveryListener>();
176
177
178 this.monitor = new LateralCacheMonitor(this);
179 this.monitor.setDaemon( true );
180 this.monitor.start();
181
182 this.lateralWatch = new CacheWatchRepairable();
183 this.lateralWatch.setCacheWatch( new ZombieCacheWatch() );
184 }
185
186
187
188
189 @Override
190 public void dispose()
191 {
192 for (ICacheServiceNonLocal<?, ?> service : this.csnlInstances.values())
193 {
194 try
195 {
196 service.dispose("");
197 }
198 catch (IOException e)
199 {
200 log.error("Could not dispose service " + service, e);
201 }
202 }
203
204 this.csnlInstances.clear();
205
206
207 this.lTCPDLInstances.clear();
208
209 if (this.monitor != null)
210 {
211 this.monitor.notifyShutdown();
212 try
213 {
214 this.monitor.join(5000);
215 }
216 catch (InterruptedException e)
217 {
218
219 }
220 this.monitor = null;
221 }
222 }
223
224
225
226
227
228
229
230
231
232 @SuppressWarnings("unchecked")
233 public <K, V> ICacheServiceNonLocal<K, V> getCSNLInstance( ITCPLateralCacheAttributes lca )
234 {
235 String key = lca.getTcpServer();
236
237 ICacheServiceNonLocal<K, V> service = (ICacheServiceNonLocal<K, V>)csnlInstances.get( key );
238
239 if ( service == null || service instanceof ZombieCacheServiceNonLocal )
240 {
241 csnlLock.lock();
242
243 try
244 {
245
246 service = (ICacheServiceNonLocal<K, V>)csnlInstances.get( key );
247
248
249 if ( service instanceof ZombieCacheServiceNonLocal)
250 {
251 service = null;
252 log.info("Disposing of zombie service instance for [" + key + "]");
253 }
254
255 if ( service == null )
256 {
257 log.info( "Instance for [" + key + "] is null, creating" );
258
259
260 try
261 {
262 if ( log.isInfoEnabled() )
263 {
264 log.info( "Creating TCP service, lca = " + lca );
265 }
266
267 service = new LateralTCPService<K, V>( lca );
268 }
269 catch ( IOException ex )
270 {
271
272
273
274 log.error( "Failure, lateral instance will use zombie service", ex );
275
276 service = new ZombieCacheServiceNonLocal<K, V>( lca.getZombieQueueMaxSize() );
277
278
279
280 monitor.notifyError();
281 }
282
283 csnlInstances.put( key, service );
284 }
285 }
286 finally
287 {
288 csnlLock.unlock();
289 }
290 }
291
292 return service;
293 }
294
295
296
297
298
299
300
301
302
303 private LateralTCPDiscoveryListener getDiscoveryListener( ITCPLateralCacheAttributes ilca, ICompositeCacheManager cacheManager )
304 {
305 String key = ilca.getUdpDiscoveryAddr() + ":" + ilca.getUdpDiscoveryPort();
306 LateralTCPDiscoveryListener ins = null;
307
308 LateralTCPDiscoveryListener newListener = new LateralTCPDiscoveryListener( this.getName(), cacheManager);
309 ins = lTCPDLInstances.putIfAbsent(key, newListener );
310
311 if ( ins == null )
312 {
313 ins = newListener;
314
315 if ( log.isInfoEnabled() )
316 {
317 log.info( "Created new discovery listener for " + key + " cacheName for request " + ilca.getCacheName() );
318 }
319 }
320
321 return ins;
322 }
323
324
325
326
327
328
329
330 private void addListenerIfNeeded( ITCPLateralCacheAttributes iaca, ICompositeCacheManager cacheMgr )
331 {
332
333 if ( iaca.isReceive() )
334 {
335 try
336 {
337 addLateralCacheListener( iaca.getCacheName(), LateralTCPListener.getInstance( iaca, cacheMgr ) );
338 }
339 catch ( IOException ioe )
340 {
341 log.error( "Problem creating lateral listener", ioe );
342 }
343 }
344 else
345 {
346 if ( log.isDebugEnabled() )
347 {
348 log.debug( "Not creating a listener since we are not receiving." );
349 }
350 }
351 }
352
353
354
355
356
357
358
359
360 private <K, V> void addLateralCacheListener( String cacheName, ILateralCacheListener<K, V> listener )
361 throws IOException
362 {
363 synchronized ( this.lateralWatch )
364 {
365 lateralWatch.addCacheListener( cacheName, listener );
366 }
367 }
368
369
370
371
372
373
374
375
376
377
378
379
380 private <K, V> ILateralCacheListener<K, V> createListener( ITCPLateralCacheAttributes attr,
381 ICompositeCacheManager cacheMgr )
382 {
383 ILateralCacheListener<K, V> listener = null;
384
385
386 if ( attr.isReceive() )
387 {
388 if ( log.isInfoEnabled() )
389 {
390 log.info( "Getting listener for " + attr );
391 }
392
393
394 listener = LateralTCPListener.getInstance( attr, cacheMgr );
395
396
397 cacheMgr.registerShutdownObserver( (IShutdownObserver) listener );
398 }
399 else
400 {
401 if ( log.isDebugEnabled() )
402 {
403 log.debug( "Not creating a listener since we are not receiving." );
404 }
405 }
406
407 return listener;
408 }
409
410
411
412
413
414
415
416
417
418
419
420 private synchronized <K, V> UDPDiscoveryService createDiscoveryService(
421 ITCPLateralCacheAttributes lac,
422 LateralCacheNoWaitFacade<K, V> lcnwf,
423 ICompositeCacheManager cacheMgr,
424 ICacheEventLogger cacheEventLogger,
425 IElementSerializer elementSerializer )
426 {
427 UDPDiscoveryService discovery = null;
428
429
430 if ( lac.isUdpDiscoveryEnabled() )
431 {
432
433 LateralTCPDiscoveryListener discoveryListener = getDiscoveryListener( lac, cacheMgr );
434 discoveryListener.addNoWaitFacade( lac.getCacheName(), lcnwf );
435
436
437
438 discovery = UDPDiscoveryManager.getInstance().getService( lac.getUdpDiscoveryAddr(),
439 lac.getUdpDiscoveryPort(),
440 lac.getTcpListenerPort(), cacheMgr);
441
442 discovery.addParticipatingCacheName( lac.getCacheName() );
443 discovery.addDiscoveryListener( discoveryListener );
444
445 if ( log.isInfoEnabled() )
446 {
447 log.info( "Registered TCP lateral cache [" + lac.getCacheName() + "] with UDPDiscoveryService." );
448 }
449 }
450 return discovery;
451 }
452 }