1 package org.apache.commons.jcs.auxiliary.remote;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.IOException;
23 import java.rmi.Naming;
24 import java.util.concurrent.ConcurrentHashMap;
25 import java.util.concurrent.ConcurrentMap;
26 import java.util.concurrent.locks.ReentrantLock;
27
28 import org.apache.commons.jcs.auxiliary.remote.behavior.IRemoteCacheAttributes;
29 import org.apache.commons.jcs.auxiliary.remote.behavior.IRemoteCacheClient;
30 import org.apache.commons.jcs.auxiliary.remote.behavior.IRemoteCacheListener;
31 import org.apache.commons.jcs.engine.CacheStatus;
32 import org.apache.commons.jcs.engine.CacheWatchRepairable;
33 import org.apache.commons.jcs.engine.ZombieCacheServiceNonLocal;
34 import org.apache.commons.jcs.engine.ZombieCacheWatch;
35 import org.apache.commons.jcs.engine.behavior.ICacheObserver;
36 import org.apache.commons.jcs.engine.behavior.ICacheServiceNonLocal;
37 import org.apache.commons.jcs.engine.behavior.ICompositeCacheManager;
38 import org.apache.commons.jcs.engine.behavior.IElementSerializer;
39 import org.apache.commons.jcs.engine.logging.behavior.ICacheEventLogger;
40 import org.apache.commons.logging.Log;
41 import org.apache.commons.logging.LogFactory;
42
43
44
45
46
47
48
49
50
51 public class RemoteCacheManager
52 {
53
54 private static final Log log = LogFactory.getLog( RemoteCacheManager.class );
55
56
57 private final ConcurrentMap<String, RemoteCacheNoWait<?, ?>> caches =
58 new ConcurrentHashMap<String, RemoteCacheNoWait<?, ?>>();
59
60
61 private ReentrantLock cacheLock = new ReentrantLock();
62
63
64 private final ICacheEventLogger cacheEventLogger;
65
66
67 private final IElementSerializer elementSerializer;
68
69
70 private ICacheServiceNonLocal<?, ?> remoteService;
71
72
73
74
75
76 private CacheWatchRepairable remoteWatch;
77
78
79 private final ICompositeCacheManager cacheMgr;
80
81
82 private final RemoteCacheMonitor monitor;
83
84
85 private final String registry;
86
87
88 private boolean canFix = true;
89
90
91
92
93
94
95
96
97
98
99
100
101 protected RemoteCacheManager( IRemoteCacheAttributes cattr, ICompositeCacheManager cacheMgr,
102 RemoteCacheMonitor monitor,
103 ICacheEventLogger cacheEventLogger, IElementSerializer elementSerializer)
104 {
105 this.cacheMgr = cacheMgr;
106 this.monitor = monitor;
107 this.cacheEventLogger = cacheEventLogger;
108 this.elementSerializer = elementSerializer;
109 this.remoteWatch = new CacheWatchRepairable();
110
111 this.registry = RemoteUtils.getNamingURL(cattr.getRemoteLocation(), cattr.getRemoteServiceName());
112
113 try
114 {
115 lookupRemoteService();
116 }
117 catch (IOException e)
118 {
119 log.error("Could not find server", e);
120
121
122 monitor.notifyError();
123 }
124 }
125
126
127
128
129
130
131 protected void lookupRemoteService() throws IOException
132 {
133 if ( log.isInfoEnabled() )
134 {
135 log.info( "Looking up server [" + registry + "]" );
136 }
137 try
138 {
139 Object obj = Naming.lookup( registry );
140 if ( log.isInfoEnabled() )
141 {
142 log.info( "Server found: " + obj );
143 }
144
145
146 this.remoteService = (ICacheServiceNonLocal<?, ?>) obj;
147 if ( log.isDebugEnabled() )
148 {
149 log.debug( "Remote Service = " + remoteService );
150 }
151 remoteWatch.setCacheWatch( (ICacheObserver) remoteService );
152 }
153 catch ( Exception ex )
154 {
155
156
157
158 this.remoteService = new ZombieCacheServiceNonLocal<String, String>();
159 remoteWatch.setCacheWatch( new ZombieCacheWatch() );
160 throw new IOException( "Problem finding server at [" + registry + "]", ex );
161 }
162 }
163
164
165
166
167
168
169
170
171 public <K, V> void addRemoteCacheListener( IRemoteCacheAttributes cattr, IRemoteCacheListener<K, V> listener )
172 throws IOException
173 {
174 if ( cattr.isReceive() )
175 {
176 if ( log.isInfoEnabled() )
177 {
178 log.info( "The remote cache is configured to receive events from the remote server. "
179 + "We will register a listener. remoteWatch = " + remoteWatch + " | IRemoteCacheListener = "
180 + listener + " | cacheName " + cattr.getCacheName() );
181 }
182
183 remoteWatch.addCacheListener( cattr.getCacheName(), listener );
184 }
185 else
186 {
187 if ( log.isInfoEnabled() )
188 {
189 log.info( "The remote cache is configured to NOT receive events from the remote server. "
190 + "We will NOT register a listener." );
191 }
192 }
193 }
194
195
196
197
198
199
200
201
202
203
204 public void removeRemoteCacheListener( IRemoteCacheAttributes cattr )
205 throws IOException
206 {
207 RemoteCacheNoWait<?, ?> cache = caches.get( cattr.getCacheName() );
208 if ( cache != null )
209 {
210 removeListenerFromCache(cache);
211 }
212 else
213 {
214 if ( cattr.isReceive() )
215 {
216 log.warn( "Trying to deregister Cache Listener that was never registered." );
217 }
218 else if ( log.isDebugEnabled() )
219 {
220 log.debug( "Since the remote cache is configured to not receive, "
221 + "there is no listener to deregister." );
222 }
223 }
224 }
225
226
227 private void removeListenerFromCache(RemoteCacheNoWait<?, ?> cache) throws IOException
228 {
229 IRemoteCacheClient<?, ?> rc = cache.getRemoteCache();
230 if ( log.isDebugEnabled() )
231 {
232 log.debug( "Found cache for [" + cache.getCacheName() + "], deregistering listener." );
233 }
234
235 IRemoteCacheListener<?, ?> listener = rc.getListener();
236 remoteWatch.removeCacheListener( cache.getCacheName(), listener );
237 }
238
239
240
241
242
243
244
245
246
247
248
249 @SuppressWarnings("unchecked")
250 public <K, V> RemoteCacheNoWait<K, V> getCache( IRemoteCacheAttributes cattr )
251 {
252 RemoteCacheNoWait<K, V> remoteCacheNoWait = (RemoteCacheNoWait<K, V>) caches.get( cattr.getCacheName() );
253
254 if ( remoteCacheNoWait == null )
255 {
256 cacheLock.lock();
257
258 try
259 {
260 remoteCacheNoWait = (RemoteCacheNoWait<K, V>) caches.get( cattr.getCacheName() );
261
262 if (remoteCacheNoWait == null)
263 {
264 remoteCacheNoWait = newRemoteCacheNoWait(cattr);
265 caches.put( cattr.getCacheName(), remoteCacheNoWait );
266 }
267 }
268 finally
269 {
270 cacheLock.unlock();
271 }
272 }
273
274
275
276 return remoteCacheNoWait;
277 }
278
279
280
281
282
283
284
285 protected <K, V> RemoteCacheNoWait<K, V> newRemoteCacheNoWait(IRemoteCacheAttributes cattr)
286 {
287 RemoteCacheNoWait<K, V> remoteCacheNoWait;
288
289
290 RemoteCacheListener<K, V> listener = null;
291 try
292 {
293 listener = new RemoteCacheListener<K, V>( cattr, cacheMgr, elementSerializer );
294 addRemoteCacheListener( cattr, listener );
295 }
296 catch ( IOException ioe )
297 {
298 log.error( "Problem adding listener. Message: " + ioe.getMessage()
299 + " | RemoteCacheListener = " + listener, ioe );
300 }
301 catch ( Exception e )
302 {
303 log.error( "Problem adding listener. Message: " + e.getMessage() + " | RemoteCacheListener = "
304 + listener, e );
305 }
306
307 IRemoteCacheClient<K, V> remoteCacheClient =
308 new RemoteCache<K, V>( cattr, (ICacheServiceNonLocal<K, V>) remoteService, listener, monitor );
309 remoteCacheClient.setCacheEventLogger( cacheEventLogger );
310 remoteCacheClient.setElementSerializer( elementSerializer );
311
312 remoteCacheNoWait = new RemoteCacheNoWait<K, V>( remoteCacheClient );
313 remoteCacheNoWait.setCacheEventLogger( cacheEventLogger );
314 remoteCacheNoWait.setElementSerializer( elementSerializer );
315
316 return remoteCacheNoWait;
317 }
318
319
320 public void release()
321 {
322 cacheLock.lock();
323
324 try
325 {
326 for (RemoteCacheNoWait<?, ?> c : caches.values())
327 {
328 try
329 {
330 if ( log.isInfoEnabled() )
331 {
332 log.info( "freeCache [" + c.getCacheName() + "]" );
333 }
334
335 removeListenerFromCache(c);
336 c.dispose();
337 }
338 catch ( IOException ex )
339 {
340 log.error( "Problem releasing " + c.getCacheName(), ex );
341 }
342 }
343
344 caches.clear();
345 }
346 finally
347 {
348 cacheLock.unlock();
349 }
350 }
351
352
353
354
355 public void fixCaches()
356 {
357 if ( !canFix )
358 {
359 return;
360 }
361
362 if ( log.isInfoEnabled() )
363 {
364 log.info( "Fixing caches. ICacheServiceNonLocal " + remoteService + " | IRemoteCacheObserver " + remoteWatch );
365 }
366
367 for (RemoteCacheNoWait<?, ?> c : caches.values())
368 {
369 if (c.getStatus() == CacheStatus.ERROR)
370 {
371 c.fixCache( remoteService );
372 }
373 }
374
375 if ( log.isInfoEnabled() )
376 {
377 String msg = "Remote connection to " + registry + " resumed.";
378 if ( cacheEventLogger != null )
379 {
380 cacheEventLogger.logApplicationEvent( "RemoteCacheManager", "fix", msg );
381 }
382 log.info( msg );
383 }
384 }
385
386
387
388
389
390
391
392 public boolean canFixCaches()
393 {
394 try
395 {
396 lookupRemoteService();
397 }
398 catch (IOException e)
399 {
400 log.error("Could not find server", e);
401 canFix = false;
402 }
403
404 return canFix;
405 }
406 }