1 package org.apache.jcs.auxiliary.lateral.socket.tcp;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.IOException;
23 import java.io.Serializable;
24 import java.util.Collections;
25 import java.util.HashMap;
26 import java.util.Map;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.jcs.auxiliary.lateral.LateralCache;
31 import org.apache.jcs.auxiliary.lateral.LateralCacheAbstractManager;
32 import org.apache.jcs.auxiliary.lateral.LateralCacheAttributes;
33 import org.apache.jcs.auxiliary.lateral.LateralCacheMonitor;
34 import org.apache.jcs.auxiliary.lateral.LateralCacheNoWait;
35 import org.apache.jcs.auxiliary.lateral.LateralCacheWatchRepairable;
36 import org.apache.jcs.auxiliary.lateral.ZombieLateralCacheWatch;
37 import org.apache.jcs.auxiliary.lateral.behavior.ILateralCacheListener;
38 import org.apache.jcs.auxiliary.lateral.behavior.ILateralCacheManager;
39 import org.apache.jcs.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
40 import org.apache.jcs.engine.ZombieCacheServiceNonLocal;
41 import org.apache.jcs.engine.behavior.ICacheServiceNonLocal;
42 import org.apache.jcs.engine.behavior.ICompositeCacheManager;
43 import org.apache.jcs.engine.behavior.IElementSerializer;
44 import org.apache.jcs.engine.logging.behavior.ICacheEventLogger;
45
46
47
48
49
50
51
52
53
54
55
56
57 public class LateralTCPCacheManager
58 extends LateralCacheAbstractManager
59 {
60
61 private static final long serialVersionUID = -9213011856644392480L;
62
63
64 private final static Log log = LogFactory.getLog( LateralTCPCacheManager.class );
65
66
67 private static LateralCacheMonitor monitor;
68
69
70 protected static Map<String, LateralTCPCacheManager> instances =
71 Collections.synchronizedMap( new HashMap<String, LateralTCPCacheManager>() );
72
73
74 protected ITCPLateralCacheAttributes lateralCacheAttributes;
75
76
77 private int clients;
78
79
80
81
82 private ICacheServiceNonLocal<? extends Serializable, ? extends Serializable> lateralService;
83
84
85
86
87
88 private LateralCacheWatchRepairable lateralWatch;
89
90
91 private final ICompositeCacheManager cacheMgr;
92
93
94
95
96
97
98
99
100
101
102 public static LateralTCPCacheManager getInstance( ITCPLateralCacheAttributes lca, ICompositeCacheManager cacheMgr,
103 ICacheEventLogger cacheEventLogger,
104 IElementSerializer elementSerializer )
105 {
106 synchronized ( instances )
107 {
108 String key = lca.getTcpServer();
109 LateralTCPCacheManager ins = instances.get( key );
110 if ( ins == null )
111 {
112 log.info( "Instance for [" + key + "] is null, creating" );
113
114 ins = instances.get( lca.getTcpServer() );
115 if ( ins == null )
116 {
117 ins = new LateralTCPCacheManager( lca, cacheMgr, cacheEventLogger, elementSerializer );
118 instances.put( key, ins );
119 }
120
121 createMonitor( ins );
122 }
123 ins.clients++;
124
125 return ins;
126 }
127 }
128
129
130
131
132
133
134
135
136 private static synchronized void createMonitor( ILateralCacheManager instance )
137 {
138
139
140 if ( monitor == null )
141 {
142 monitor = new LateralCacheMonitor( instance );
143 Thread t = new Thread( monitor );
144 t.setDaemon( true );
145 t.start();
146 }
147 }
148
149
150
151
152
153
154
155
156
157 private LateralTCPCacheManager( ITCPLateralCacheAttributes lcaA, ICompositeCacheManager cacheMgr,
158 ICacheEventLogger cacheEventLogger, IElementSerializer elementSerializer )
159 {
160 this.lateralCacheAttributes = lcaA;
161 this.cacheMgr = cacheMgr;
162 this.cacheEventLogger = cacheEventLogger;
163 this.elementSerializer = elementSerializer;
164
165 this.lateralWatch = new LateralCacheWatchRepairable();
166 this.lateralWatch.setCacheWatch( new ZombieLateralCacheWatch() );
167
168 if ( log.isDebugEnabled() )
169 {
170 log.debug( "Creating lateral cache service, lca = " + this.lateralCacheAttributes );
171 }
172
173
174 try
175 {
176 if ( log.isInfoEnabled() )
177 {
178 log.info( "Creating TCP service, lca = " + this.lateralCacheAttributes );
179 }
180
181 this.lateralService = new LateralTCPService<Serializable, Serializable>( this.lateralCacheAttributes );
182 }
183 catch ( Exception ex )
184 {
185
186
187
188 log.error( "Failure, lateral instance will use zombie service", ex );
189
190 this.lateralService = new ZombieCacheServiceNonLocal<Serializable, Serializable>( lateralCacheAttributes.getZombieQueueMaxSize() );
191
192
193
194 createMonitor( this );
195 monitor.notifyError();
196 }
197 }
198
199
200
201
202
203
204
205
206 @Override
207 public <K extends Serializable, V extends Serializable> void addLateralCacheListener( String cacheName, ILateralCacheListener<K, V> listener )
208 throws IOException
209 {
210 synchronized ( this.caches )
211 {
212 lateralWatch.addCacheListener( cacheName, listener );
213 }
214 }
215
216
217
218
219
220
221
222
223
224
225
226
227
228 @SuppressWarnings("unchecked")
229 @Override
230 public <K extends Serializable, V extends Serializable> LateralCacheNoWait<K, V> getCache( String cacheName )
231 {
232 LateralCacheNoWait<K, V> lateralNoWait = null;
233 synchronized ( caches )
234 {
235
236 lateralNoWait = (LateralCacheNoWait<K, V>) caches.get( cacheName );
237 if ( lateralNoWait == null )
238 {
239 LateralCacheAttributes attr = (LateralCacheAttributes) lateralCacheAttributes.copy();
240 attr.setCacheName( cacheName );
241
242
243 LateralCache<K, V> cache = new LateralCache<K, V>( attr,
244 (ICacheServiceNonLocal<K, V>)this.lateralService, monitor );
245 cache.setCacheEventLogger( cacheEventLogger );
246 cache.setElementSerializer( elementSerializer );
247
248 if ( log.isDebugEnabled() )
249 {
250 log.debug( "Created cache for noWait, cache [" + cache + "]" );
251 }
252
253 lateralNoWait = new LateralCacheNoWait<K, V>( cache );
254 lateralNoWait.setCacheEventLogger( cacheEventLogger );
255 lateralNoWait.setElementSerializer( elementSerializer );
256
257 caches.put( cacheName, lateralNoWait );
258
259 if ( log.isInfoEnabled() )
260 {
261 log.info( "Created LateralCacheNoWait for [" + lateralCacheAttributes + "] LateralCacheNoWait = [" + lateralNoWait
262 + "]" );
263 }
264
265
266 addListenerIfNeeded( cacheName );
267 }
268 }
269
270 return lateralNoWait;
271 }
272
273
274
275
276
277
278 private void addListenerIfNeeded( String cacheName )
279 {
280
281 if ( lateralCacheAttributes.isReceive() )
282 {
283 try
284 {
285 addLateralCacheListener( cacheName, LateralTCPListener.getInstance( lateralCacheAttributes, cacheMgr ) );
286 }
287 catch ( IOException ioe )
288 {
289 log.error( "Problem creating lateral listener", ioe );
290 }
291 catch ( Exception e )
292 {
293 log.error( "Problem creating lateral listener", e );
294 }
295 }
296 else
297 {
298 if ( log.isDebugEnabled() )
299 {
300 log.debug( "Not creating a listener since we are not receiving." );
301 }
302 }
303 }
304
305
306
307
308 public Map<String, ? extends ILateralCacheManager> getInstances()
309 {
310 return instances;
311 }
312
313
314
315
316
317 public Object fixService()
318 throws IOException
319 {
320 Object service = null;
321 try
322 {
323 service = new LateralTCPService<Serializable, Serializable>( lateralCacheAttributes );
324 }
325 catch ( Exception ex )
326 {
327 log.error( "Can't fix " + ex.getMessage() );
328 throw new IOException( "Can't fix " + ex.getMessage() );
329 }
330 return service;
331 }
332
333
334
335
336
337 public void shutdown()
338 {
339
340
341 try
342 {
343 lateralService.dispose( "ALL" );
344 }
345 catch ( IOException e )
346 {
347 log.error( "Problem disposing of service", e );
348 }
349
350
351 if (monitor != null)
352 {
353 monitor.notifyShutdown();
354 }
355 }
356 }