View Javadoc
1   package org.apache.commons.jcs.auxiliary.remote;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.IOException;
23  import java.rmi.Naming;
24  import java.util.concurrent.ConcurrentHashMap;
25  import java.util.concurrent.ConcurrentMap;
26  import java.util.concurrent.locks.ReentrantLock;
27  
28  import org.apache.commons.jcs.auxiliary.remote.behavior.IRemoteCacheAttributes;
29  import org.apache.commons.jcs.auxiliary.remote.behavior.IRemoteCacheClient;
30  import org.apache.commons.jcs.auxiliary.remote.behavior.IRemoteCacheListener;
31  import org.apache.commons.jcs.engine.CacheStatus;
32  import org.apache.commons.jcs.engine.CacheWatchRepairable;
33  import org.apache.commons.jcs.engine.ZombieCacheServiceNonLocal;
34  import org.apache.commons.jcs.engine.ZombieCacheWatch;
35  import org.apache.commons.jcs.engine.behavior.ICacheObserver;
36  import org.apache.commons.jcs.engine.behavior.ICacheServiceNonLocal;
37  import org.apache.commons.jcs.engine.behavior.ICompositeCacheManager;
38  import org.apache.commons.jcs.engine.behavior.IElementSerializer;
39  import org.apache.commons.jcs.engine.logging.behavior.ICacheEventLogger;
40  import org.apache.commons.logging.Log;
41  import org.apache.commons.logging.LogFactory;
42  
43  /**
44   * An instance of RemoteCacheManager corresponds to one remote connection of a specific host and
45   * port. All RemoteCacheManager instances are monitored by the singleton RemoteCacheMonitor
46   * monitoring daemon for error detection and recovery.
47   * <p>
48   * Getting an instance of the remote cache has the effect of getting a handle on the remote server.
49   * Listeners are not registered with the server until a cache is requested from the manager.
50   */
51  public class RemoteCacheManager
52  {
53      /** The logger */
54      private static final Log log = LogFactory.getLog( RemoteCacheManager.class );
55  
56      /** Contains instances of RemoteCacheNoWait managed by a RemoteCacheManager instance. */
57      private final ConcurrentMap<String, RemoteCacheNoWait<?, ?>> caches =
58              new ConcurrentHashMap<String, RemoteCacheNoWait<?, ?>>();
59  
60      /** Lock for initialization of caches */
61      private ReentrantLock cacheLock = new ReentrantLock();
62  
63      /** The event logger. */
64      private final ICacheEventLogger cacheEventLogger;
65  
66      /** The serializer. */
67      private final IElementSerializer elementSerializer;
68  
69      /** Handle to the remote cache service; or a zombie handle if failed to connect. */
70      private ICacheServiceNonLocal<?, ?> remoteService;
71  
72      /**
73       * Wrapper of the remote cache watch service; or wrapper of a zombie service if failed to
74       * connect.
75       */
76      private CacheWatchRepairable remoteWatch;
77  
78      /** The cache manager listeners will need to use to get a cache. */
79      private final ICompositeCacheManager cacheMgr;
80  
81      /** For error notification */
82      private final RemoteCacheMonitor monitor;
83  
84      /** The service found through lookup */
85      private final String registry;
86  
87      /** can it be restored */
88      private boolean canFix = true;
89  
90      /**
91       * Constructs an instance to with the given remote connection parameters. If the connection
92       * cannot be made, "zombie" services will be temporarily used until a successful re-connection
93       * is made by the monitoring daemon.
94       * <p>
95       * @param cattr cache attributes
96       * @param cacheMgr the cache hub
97       * @param monitor the cache monitor thread for error notifications
98       * @param cacheEventLogger
99       * @param elementSerializer
100      */
101     protected RemoteCacheManager( IRemoteCacheAttributes cattr, ICompositeCacheManager cacheMgr,
102                                 RemoteCacheMonitor monitor,
103                                 ICacheEventLogger cacheEventLogger, IElementSerializer elementSerializer)
104     {
105         this.cacheMgr = cacheMgr;
106         this.monitor = monitor;
107         this.cacheEventLogger = cacheEventLogger;
108         this.elementSerializer = elementSerializer;
109         this.remoteWatch = new CacheWatchRepairable();
110 
111         this.registry = RemoteUtils.getNamingURL(cattr.getRemoteLocation(), cattr.getRemoteServiceName());
112 
113         try
114         {
115             lookupRemoteService();
116         }
117         catch (IOException e)
118         {
119             log.error("Could not find server", e);
120             // Notify the cache monitor about the error, and kick off the
121             // recovery process.
122             monitor.notifyError();
123         }
124     }
125 
126     /**
127      * Lookup remote service from registry
128      * @throws IOException if the remote service could not be found
129      *
130      */
131     protected void lookupRemoteService() throws IOException
132     {
133         if ( log.isInfoEnabled() )
134         {
135             log.info( "Looking up server [" + registry + "]" );
136         }
137         try
138         {
139             Object obj = Naming.lookup( registry );
140             if ( log.isInfoEnabled() )
141             {
142                 log.info( "Server found: " + obj );
143             }
144 
145             // Successful connection to the remote server.
146             this.remoteService = (ICacheServiceNonLocal<?, ?>) obj;
147             if ( log.isDebugEnabled() )
148             {
149                 log.debug( "Remote Service = " + remoteService );
150             }
151             remoteWatch.setCacheWatch( (ICacheObserver) remoteService );
152         }
153         catch ( Exception ex )
154         {
155             // Failed to connect to the remote server.
156             // Configure this RemoteCacheManager instance to use the "zombie"
157             // services.
158             this.remoteService = new ZombieCacheServiceNonLocal<String, String>();
159             remoteWatch.setCacheWatch( new ZombieCacheWatch() );
160             throw new IOException( "Problem finding server at [" + registry + "]", ex );
161         }
162     }
163 
164     /**
165      * Adds the remote cache listener to the underlying cache-watch service.
166      * <p>
167      * @param cattr The feature to be added to the RemoteCacheListener attribute
168      * @param listener The feature to be added to the RemoteCacheListener attribute
169      * @throws IOException
170      */
171     public <K, V> void addRemoteCacheListener( IRemoteCacheAttributes cattr, IRemoteCacheListener<K, V> listener )
172         throws IOException
173     {
174         if ( cattr.isReceive() )
175         {
176             if ( log.isInfoEnabled() )
177             {
178                 log.info( "The remote cache is configured to receive events from the remote server.  "
179                     + "We will register a listener. remoteWatch = " + remoteWatch + " | IRemoteCacheListener = "
180                     + listener + " | cacheName " + cattr.getCacheName() );
181             }
182 
183             remoteWatch.addCacheListener( cattr.getCacheName(), listener );
184         }
185         else
186         {
187             if ( log.isInfoEnabled() )
188             {
189                 log.info( "The remote cache is configured to NOT receive events from the remote server.  "
190                     + "We will NOT register a listener." );
191             }
192         }
193     }
194 
195     /**
196      * Removes a listener. When the primary recovers the failover must deregister itself for a
197      * region. The failover runner will call this method to de-register. We do not want to deregister
198      * all listeners to a remote server, in case a failover is a primary of another region. Having
199      * one regions failover act as another servers primary is not currently supported.
200      * <p>
201      * @param cattr
202      * @throws IOException
203      */
204     public void removeRemoteCacheListener( IRemoteCacheAttributes cattr )
205         throws IOException
206     {
207         RemoteCacheNoWait<?, ?> cache = caches.get( cattr.getCacheName() );
208         if ( cache != null )
209         {
210         	removeListenerFromCache(cache);
211         }
212         else
213         {
214             if ( cattr.isReceive() )
215             {
216                 log.warn( "Trying to deregister Cache Listener that was never registered." );
217             }
218             else if ( log.isDebugEnabled() )
219             {
220                 log.debug( "Since the remote cache is configured to not receive, "
221                     + "there is no listener to deregister." );
222             }
223         }
224     }
225 
226     // common helper method
227 	private void removeListenerFromCache(RemoteCacheNoWait<?, ?> cache) throws IOException
228 	{
229 		IRemoteCacheClient<?, ?> rc = cache.getRemoteCache();
230 		if ( log.isDebugEnabled() )
231 		{
232 		    log.debug( "Found cache for [" + cache.getCacheName() + "], deregistering listener." );
233 		}
234 		// could also store the listener for a server in the manager.
235 		IRemoteCacheListener<?, ?> listener = rc.getListener();
236         remoteWatch.removeCacheListener( cache.getCacheName(), listener );
237 	}
238 
239     /**
240      * Gets a RemoteCacheNoWait from the RemoteCacheManager. The RemoteCacheNoWait objects are
241      * identified by the cache name value of the RemoteCacheAttributes object.
242      * <p>
243      * If the client is configured to register a listener, this call results on a listener being
244      * created if one isn't already registered with the remote cache for this region.
245      * <p>
246      * @param cattr
247      * @return The cache value
248      */
249     @SuppressWarnings("unchecked") // Need to cast because of common map for all caches
250     public <K, V> RemoteCacheNoWait<K, V> getCache( IRemoteCacheAttributes cattr )
251     {
252         RemoteCacheNoWait<K, V> remoteCacheNoWait = (RemoteCacheNoWait<K, V>) caches.get( cattr.getCacheName() );
253 
254         if ( remoteCacheNoWait == null )
255         {
256             cacheLock.lock();
257 
258             try
259             {
260                 remoteCacheNoWait = (RemoteCacheNoWait<K, V>) caches.get( cattr.getCacheName() );
261 
262                 if (remoteCacheNoWait == null)
263                 {
264                     remoteCacheNoWait = newRemoteCacheNoWait(cattr);
265                     caches.put( cattr.getCacheName(), remoteCacheNoWait );
266                 }
267             }
268             finally
269             {
270                 cacheLock.unlock();
271             }
272         }
273 
274         // might want to do some listener sanity checking here.
275 
276         return remoteCacheNoWait;
277     }
278 
279     /**
280      * Create new RemoteCacheNoWait instance
281      *
282      * @param cattr the cache configuration
283      * @return the instance
284      */
285     protected <K, V> RemoteCacheNoWait<K, V> newRemoteCacheNoWait(IRemoteCacheAttributes cattr)
286     {
287         RemoteCacheNoWait<K, V> remoteCacheNoWait;
288         // create a listener first and pass it to the remotecache
289         // sender.
290         RemoteCacheListener<K, V> listener = null;
291         try
292         {
293             listener = new RemoteCacheListener<K, V>( cattr, cacheMgr, elementSerializer );
294             addRemoteCacheListener( cattr, listener );
295         }
296         catch ( IOException ioe )
297         {
298             log.error( "Problem adding listener. Message: " + ioe.getMessage()
299                 + " | RemoteCacheListener = " + listener, ioe );
300         }
301         catch ( Exception e )
302         {
303             log.error( "Problem adding listener. Message: " + e.getMessage() + " | RemoteCacheListener = "
304                 + listener, e );
305         }
306 
307         IRemoteCacheClient<K, V> remoteCacheClient =
308             new RemoteCache<K, V>( cattr, (ICacheServiceNonLocal<K, V>) remoteService, listener, monitor );
309         remoteCacheClient.setCacheEventLogger( cacheEventLogger );
310         remoteCacheClient.setElementSerializer( elementSerializer );
311 
312         remoteCacheNoWait = new RemoteCacheNoWait<K, V>( remoteCacheClient );
313         remoteCacheNoWait.setCacheEventLogger( cacheEventLogger );
314         remoteCacheNoWait.setElementSerializer( elementSerializer );
315 
316         return remoteCacheNoWait;
317     }
318 
319     /** Shutdown all. */
320     public void release()
321     {
322         cacheLock.lock();
323 
324         try
325         {
326             for (RemoteCacheNoWait<?, ?> c : caches.values())
327             {
328                 try
329                 {
330                     if ( log.isInfoEnabled() )
331                     {
332                         log.info( "freeCache [" + c.getCacheName() + "]" );
333                     }
334 
335                     removeListenerFromCache(c);
336                     c.dispose();
337                 }
338                 catch ( IOException ex )
339                 {
340                     log.error( "Problem releasing " + c.getCacheName(), ex );
341                 }
342             }
343 
344             caches.clear();
345         }
346         finally
347         {
348             cacheLock.unlock();
349         }
350     }
351 
352     /**
353      * Fixes up all the caches managed by this cache manager.
354      */
355     public void fixCaches()
356     {
357         if ( !canFix )
358         {
359             return;
360         }
361 
362         if ( log.isInfoEnabled() )
363         {
364             log.info( "Fixing caches. ICacheServiceNonLocal " + remoteService + " | IRemoteCacheObserver " + remoteWatch );
365         }
366 
367         for (RemoteCacheNoWait<?, ?> c : caches.values())
368         {
369             if (c.getStatus() == CacheStatus.ERROR)
370             {
371                 c.fixCache( remoteService );
372             }
373         }
374 
375         if ( log.isInfoEnabled() )
376         {
377             String msg = "Remote connection to " + registry + " resumed.";
378             if ( cacheEventLogger != null )
379             {
380                 cacheEventLogger.logApplicationEvent( "RemoteCacheManager", "fix", msg );
381             }
382             log.info( msg );
383         }
384     }
385 
386     /**
387      * Returns true if the connection to the remote host can be
388      * successfully re-established.
389      * <p>
390      * @return true if we found a failover server
391      */
392     public boolean canFixCaches()
393     {
394         try
395         {
396             lookupRemoteService();
397         }
398         catch (IOException e)
399         {
400             log.error("Could not find server", e);
401             canFix = false;
402         }
403 
404         return canFix;
405     }
406 }