View Javadoc

1   package org.apache.jcs.auxiliary.remote;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.IOException;
23  import java.rmi.Naming;
24  import java.rmi.registry.Registry;
25  import java.util.HashMap;
26  import java.util.Map;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.jcs.auxiliary.AuxiliaryCache;
31  import org.apache.jcs.auxiliary.AuxiliaryCacheManager;
32  import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheAttributes;
33  import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheClient;
34  import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheListener;
35  import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheObserver;
36  import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheService;
37  import org.apache.jcs.engine.behavior.ICache;
38  import org.apache.jcs.engine.behavior.ICompositeCacheManager;
39  import org.apache.jcs.engine.behavior.IElementSerializer;
40  import org.apache.jcs.engine.behavior.IShutdownObserver;
41  import org.apache.jcs.engine.control.CompositeCacheManager;
42  import org.apache.jcs.engine.logging.behavior.ICacheEventLogger;
43  
44  /**
45   * An instance of RemoteCacheManager corresponds to one remote connection of a specific host and
46   * port. All RemoteCacheManager instances are monitored by the singleton RemoteCacheMonitor
47   * monitoring daemon for error detection and recovery.
48   * <p>
49   * Getting an instance of the remote cache has the effect of getting a handle on the remote server.
50   * Listeners are not registered with the server until a cache is requested from the manager.
51   */
52  public class RemoteCacheManager
53      implements AuxiliaryCacheManager, IShutdownObserver
54  {
55      /** Don't change */
56      private static final long serialVersionUID = 798077557166389498L;
57  
58      /** The logger */
59      private final static Log log = LogFactory.getLog( RemoteCacheManager.class );
60  
61      /** Contains mappings of Location instance to RemoteCacheManager instance. */
62      final static Map<Location, RemoteCacheManager> instances = new HashMap<Location, RemoteCacheManager>();
63  
64      /** Monitors connections. */
65      private static RemoteCacheMonitor monitor;
66  
67      /** Not so useful. How many getCaches over releases were called. */
68      private int clients;
69  
70      /** Contains instances of RemoteCacheNoWait managed by a RemoteCacheManager instance. */
71      final Map<String, RemoteCacheNoWait> caches = new HashMap<String, RemoteCacheNoWait>();
72  
73      /** The remote host */
74      final String host;
75  
76      /** The remote port */
77      final int port;
78  
79      /** The service name */
80      final String service;
81  
82      /** The configuration attributes. */
83      private IRemoteCacheAttributes remoteCacheAttributes;
84  
85      /** The event logger. */
86      private final ICacheEventLogger cacheEventLogger;
87  
88      /** The serializer. */
89      private final IElementSerializer elementSerializer;
90  
91      /** Handle to the remote cache service; or a zombie handle if failed to connect. */
92      private IRemoteCacheService remoteService;
93  
94      /**
95       * Wrapper of the remote cache watch service; or wrapper of a zombie service if failed to
96       * connect.
97       */
98      private RemoteCacheWatchRepairable remoteWatch;
99  
100     /** The cache manager listeners will need to use to get a cache. */
101     private final ICompositeCacheManager cacheMgr;
102 
103     /** The service found through lookup */
104     //private String registry;
105 
106     /**
107      * Constructs an instance to with the given remote connection parameters. If the connection
108      * cannot be made, "zombie" services will be temporarily used until a successful re-connection
109      * is made by the monitoring daemon.
110      * <p>
111      * @param host
112      * @param port
113      * @param service
114      * @param cacheMgr
115      * @param cacheEventLogger
116      * @param elementSerializer
117      */
118     private RemoteCacheManager( String host, int port, String service, ICompositeCacheManager cacheMgr,
119                                 ICacheEventLogger cacheEventLogger, IElementSerializer elementSerializer )
120     {
121         this.host = host;
122         this.port = port;
123         this.service = service;
124         this.cacheMgr = cacheMgr;
125         this.cacheEventLogger = cacheEventLogger;
126         this.elementSerializer = elementSerializer;
127 
128         // register shutdown observer
129         // TODO add the shutdown observable methods to the interface
130         if ( this.cacheMgr instanceof CompositeCacheManager )
131         {
132             ( (CompositeCacheManager) this.cacheMgr ).registerShutdownObserver( this );
133         }
134 
135         String registry = "//" + host + ":" + port + "/" + service;
136         if ( log.isInfoEnabled() )
137         {
138             log.info( "Looking up server [" + registry + "]" );
139         }
140         try
141         {
142             Object obj = Naming.lookup( registry );
143             if ( log.isInfoEnabled() )
144             {
145                 log.info( "Server found: " + obj );
146             }
147 
148             // Successful connection to the remote server.
149             remoteService = (IRemoteCacheService) obj;
150             if ( log.isDebugEnabled() )
151             {
152                 log.debug( "remoteService = " + remoteService );
153             }
154 
155             remoteWatch = new RemoteCacheWatchRepairable();
156             remoteWatch.setCacheWatch( (IRemoteCacheObserver) obj );
157         }
158         catch ( Exception ex )
159         {
160             // Failed to connect to the remote server.
161             // Configure this RemoteCacheManager instance to use the "zombie"
162             // services.
163             log.error( "Problem finding server at [" + registry + "]", ex );
164             remoteService = new ZombieRemoteCacheService();
165             remoteWatch = new RemoteCacheWatchRepairable();
166             remoteWatch.setCacheWatch( new ZombieRemoteCacheWatch() );
167 
168             // Notify the cache monitor about the error, and kick off the
169             // recovery process.
170             RemoteCacheMonitor.getInstance().notifyError();
171         }
172     }
173 
174     /**
175      * Gets the defaultCattr attribute of the RemoteCacheManager object.
176      * <p>
177      * @return The defaultCattr value
178      */
179     public IRemoteCacheAttributes getDefaultCattr()
180     {
181         return this.remoteCacheAttributes;
182     }
183 
184     /**
185      * Adds the remote cache listener to the underlying cache-watch service.
186      * <p>
187      * @param cattr The feature to be added to the RemoteCacheListener attribute
188      * @param listener The feature to be added to the RemoteCacheListener attribute
189      * @throws IOException
190      */
191     public void addRemoteCacheListener( IRemoteCacheAttributes cattr, IRemoteCacheListener listener )
192         throws IOException
193     {
194         if ( cattr.isReceive() )
195         {
196             if ( log.isInfoEnabled() )
197             {
198                 log.info( "The remote cache is configured to receive events from the remote server.  "
199                     + "We will register a listener. remoteWatch = " + remoteWatch + " | IRemoteCacheListener = "
200                     + listener + " | cacheName " + cattr.getCacheName() );
201             }
202 
203             synchronized ( caches )
204             {
205                 remoteWatch.addCacheListener( cattr.getCacheName(), listener );
206             }
207         }
208         else
209         {
210             if ( log.isInfoEnabled() )
211             {
212                 log.info( "The remote cache is configured to NOT receive events from the remote server.  "
213                     + "We will NOT register a listener." );
214             }
215         }
216     }
217 
218     /**
219      * Removes a listener. When the primary recovers the failover must deregister itself for a
220      * region. The failover runner will call this method to de-register. We do not want to deregister
221      * all listeners to a remote server, in case a failover is a primary of another region. Having
222      * one regions failover act as another servers primary is not currently supported.
223      * <p>
224      * @param cattr
225      * @param listener
226      * @throws IOException
227      */
228     public void removeRemoteCacheListener( IRemoteCacheAttributes cattr, IRemoteCacheListener listener )
229         throws IOException
230     {
231         synchronized ( caches )
232         {
233             remoteWatch.removeCacheListener( cattr.getCacheName(), listener );
234         }
235     }
236 
237     /**
238      * Stops a listener. This is used to deregister a failover after primary reconnection.
239      * <p>
240      * @param cattr
241      * @throws IOException
242      */
243     public void removeRemoteCacheListener( IRemoteCacheAttributes cattr )
244         throws IOException
245     {
246         synchronized ( caches )
247         {
248             RemoteCacheNoWait cache = caches.get( cattr.getCacheName() );
249             if ( cache != null )
250             {
251                 IRemoteCacheClient rc = cache.getRemoteCache();
252                 if ( log.isDebugEnabled() )
253                 {
254                     log.debug( "Found cache for[ " + cattr.getCacheName() + "], deregistering listener." );
255                 }
256                 // could also store the listener for a server in the manager.
257                 IRemoteCacheListener listener = rc.getListener();
258                 remoteWatch.removeCacheListener( cattr.getCacheName(), listener );
259             }
260             else
261             {
262                 if ( cattr.isReceive() )
263                 {
264                     log.warn( "Trying to deregister Cache Listener that was never registered." );
265                 }
266                 else
267                 {
268                     if ( log.isDebugEnabled() )
269                     {
270                         log.debug( "Since the remote cache is configured to not receive, "
271                             + "there is no listener to deregister." );
272                     }
273                 }
274             }
275         }
276     }
277 
278     /**
279      * Stops a listener. This is used to deregister a failover after primary reconnection.
280      * <p>
281      * @param cacheName
282      * @throws IOException
283      */
284     public void removeRemoteCacheListener( String cacheName )
285         throws IOException
286     {
287         synchronized ( caches )
288         {
289             RemoteCacheNoWait cache = caches.get( cacheName );
290             if ( cache != null )
291             {
292                 IRemoteCacheClient rc = cache.getRemoteCache();
293                 if ( log.isDebugEnabled() )
294                 {
295                     log.debug( "Found cache for [" + cacheName + "], deregistering listener." );
296                 }
297                 // could also store the listener for a server in the manager.
298                 IRemoteCacheListener listener = rc.getListener();
299                 remoteWatch.removeCacheListener( cacheName, listener );
300             }
301         }
302     }
303 
304     /**
305      * Returns an instance of RemoteCacheManager for the given connection parameters.
306      * <p>
307      * Host and Port uniquely identify a manager instance.
308      * <p>
309      * Also starts up the monitoring daemon, if not already started.
310      * <p>
311      * If the connection cannot be established, zombie objects will be used for future recovery
312      * purposes.
313      * <p>
314      * @param cattr
315      * @param cacheMgr
316      * @param cacheEventLogger
317      * @param elementSerializer
318      * @return The instance value
319      */
320     public static RemoteCacheManager getInstance( IRemoteCacheAttributes cattr, ICompositeCacheManager cacheMgr,
321                                                   ICacheEventLogger cacheEventLogger,
322                                                   IElementSerializer elementSerializer )
323     {
324         String host = cattr.getRemoteHost();
325         int port = cattr.getRemotePort();
326         String service = cattr.getRemoteServiceName();
327         if ( host == null )
328         {
329             host = "";
330         }
331         if ( port < 1024 )
332         {
333             port = Registry.REGISTRY_PORT;
334         }
335         Location loc = new Location( host, port );
336 
337         RemoteCacheManager ins = null;
338         synchronized ( instances )
339         {
340             ins = instances.get( loc );
341             if ( ins == null )
342             {
343                 // Change to use cattr and to set defaults
344                 ins = new RemoteCacheManager( host, port, service, cacheMgr, cacheEventLogger, elementSerializer );
345                 ins.remoteCacheAttributes = cattr;
346                 instances.put( loc, ins );
347             }
348         }
349 
350         ins.clients++;
351         // Fires up the monitoring daemon.
352         if ( monitor == null )
353         {
354             monitor = RemoteCacheMonitor.getInstance();
355             // If the returned monitor is null, it means it's already started
356             // elsewhere.
357             if ( monitor != null )
358             {
359                 Thread t = new Thread( monitor );
360                 t.setDaemon( true );
361                 t.start();
362             }
363         }
364         return ins;
365     }
366 
367     /**
368      * Returns a remote cache for the given cache name.
369      * <p>
370      * @param cacheName
371      * @return The cache value
372      */
373     public AuxiliaryCache getCache( String cacheName )
374     {
375         IRemoteCacheAttributes ca = (IRemoteCacheAttributes) remoteCacheAttributes.copy();
376         ca.setCacheName( cacheName );
377         return getCache( ca );
378     }
379 
380     /**
381      * Gets a RemoteCacheNoWait from the RemoteCacheManager. The RemoteCacheNoWait objects are
382      * identified by the cache name value of the RemoteCacheAttributes object.
383      * <p>
384      * If the client is configured to register a listener, this call results on a listener being
385      * created if one isn't already registered with the remote cache for this region.
386      * <p>
387      * @param cattr
388      * @return The cache value
389      */
390     public AuxiliaryCache getCache( IRemoteCacheAttributes cattr )
391     {
392         RemoteCacheNoWait remoteCacheNoWait = null;
393 
394         synchronized ( caches )
395         {
396             remoteCacheNoWait = caches.get( cattr.getCacheName() );
397             if ( remoteCacheNoWait == null )
398             {
399                 // create a listener first and pass it to the remotecache
400                 // sender.
401                 RemoteCacheListener listener = null;
402                 try
403                 {
404                     listener = new RemoteCacheListener( cattr, cacheMgr );
405                     addRemoteCacheListener( cattr, listener );
406                 }
407                 catch ( IOException ioe )
408                 {
409                     log.error( "IOException. Problem adding listener. Message: " + ioe.getMessage()
410                         + " | RemoteCacheListener = " + listener, ioe );
411                 }
412                 catch ( Exception e )
413                 {
414                     log.error( "Problem adding listener. Message: " + e.getMessage() + " | RemoteCacheListener = "
415                         + listener, e );
416                 }
417 
418                 IRemoteCacheClient remoteCacheClient = new RemoteCache( cattr, remoteService, listener );
419                 remoteCacheClient.setCacheEventLogger( cacheEventLogger );
420                 remoteCacheClient.setElementSerializer( elementSerializer );
421 
422                 remoteCacheNoWait = new RemoteCacheNoWait( remoteCacheClient );
423                 remoteCacheNoWait.setCacheEventLogger( cacheEventLogger );
424                 remoteCacheNoWait.setElementSerializer( elementSerializer );
425 
426                 caches.put( cattr.getCacheName(), remoteCacheNoWait );
427             }
428 
429             // might want to do some listener sanity checking here.
430         }
431 
432         return remoteCacheNoWait;
433     }
434 
435     /**
436      * Releases.
437      * <p>
438      * @param name
439      * @throws IOException
440      */
441     public void freeCache( String name )
442         throws IOException
443     {
444         if ( log.isInfoEnabled() )
445         {
446             log.info( "freeCache [" + name + "]" );
447         }
448         ICache c = null;
449         synchronized ( caches )
450         {
451             c = caches.get( name );
452         }
453         if ( c != null )
454         {
455             this.removeRemoteCacheListener( name );
456             c.dispose();
457         }
458     }
459 
460     /**
461      * Gets the stats attribute of the RemoteCacheManager object
462      * <p>
463      * @return The stats value
464      */
465     public String getStats()
466     {
467         StringBuffer stats = new StringBuffer();
468         for (RemoteCacheNoWait c : caches.values())
469         {
470             if ( c != null )
471             {
472                 stats.append( c.getCacheName() );
473             }
474         }
475         return stats.toString();
476     }
477 
478     /** Shutdown all. */
479     public void release()
480     {
481         // Wait until called by the last client
482         if ( --clients != 0 )
483         {
484             return;
485         }
486         synchronized ( caches )
487         {
488             for (RemoteCacheNoWait c : caches.values())
489             {
490                 if ( c != null )
491                 {
492                     try
493                     {
494                         // c.dispose();
495                         freeCache( c.getCacheName() );
496                     }
497                     catch ( IOException ex )
498                     {
499                         log.error( "Problem in release.", ex );
500                     }
501                 }
502             }
503         }
504     }
505 
506     /**
507      * Fixes up all the caches managed by this cache manager.
508      * <p>
509      * @param remoteService
510      * @param remoteWatch
511      */
512     public void fixCaches( IRemoteCacheService remoteService, IRemoteCacheObserver remoteWatch )
513     {
514         if ( log.isInfoEnabled() )
515         {
516             log.info( "Fixing caches. IRemoteCacheService " + remoteService + " | IRemoteCacheObserver " + remoteWatch );
517         }
518         synchronized ( caches )
519         {
520             this.remoteService = remoteService;
521             this.remoteWatch.setCacheWatch( remoteWatch );
522             for (RemoteCacheNoWait c : caches.values())
523             {
524                 c.fixCache( this.remoteService );
525             }
526         }
527     }
528 
529     /**
530      * Gets the cacheType attribute of the RemoteCacheManager object
531      * @return The cacheType value
532      */
533     public int getCacheType()
534     {
535         return REMOTE_CACHE;
536     }
537 
538     /**
539      * Location of the RMI registry.
540      */
541     private final static class Location
542     {
543         /** Description of the Field */
544         public final String host;
545 
546         /** Description of the Field */
547         public final int port;
548 
549         /**
550          * Constructor for the Location object
551          * <p>
552          * @param host
553          * @param port
554          */
555         public Location( String host, int port )
556         {
557             this.host = host;
558             this.port = port;
559         }
560 
561         /**
562          * @param obj
563          * @return true if the host and port are equal
564          */
565         @Override
566         public boolean equals( Object obj )
567         {
568             if ( obj == this )
569             {
570                 return true;
571             }
572             if ( obj == null || !( obj instanceof Location ) )
573             {
574                 return false;
575             }
576             Location l = (Location) obj;
577             if ( this.host == null && l.host != null )
578             {
579                 return false;
580             }
581             return host.equals( l.host ) && port == l.port;
582         }
583 
584         /**
585          * @return int
586          */
587         @Override
588         public int hashCode()
589         {
590             return host == null ? port : host.hashCode() ^ port;
591         }
592     }
593 
594     /**
595      * Shutdown callback from composite cache manager.
596      * <p>
597      * @see org.apache.jcs.engine.behavior.IShutdownObserver#shutdown()
598      */
599     public void shutdown()
600     {
601         if ( log.isInfoEnabled() )
602         {
603             log.info( "Observed shutdown request." );
604         }
605         release();
606     }
607 
608     /**
609      * Logs an event if an event logger is configured.
610      * <p>
611      * @param source
612      * @param eventName
613      * @param optionalDetails
614      */
615     protected void logApplicationEvent( String source, String eventName, String optionalDetails )
616     {
617         if ( cacheEventLogger != null )
618         {
619             cacheEventLogger.logApplicationEvent( source, eventName, optionalDetails );
620         }
621     }
622 }