View Javadoc
1   package org.apache.commons.jcs3.auxiliary.remote;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.IOException;
23  import java.rmi.Naming;
24  import java.util.concurrent.ConcurrentHashMap;
25  import java.util.concurrent.ConcurrentMap;
26  
27  import org.apache.commons.jcs3.auxiliary.remote.behavior.IRemoteCacheAttributes;
28  import org.apache.commons.jcs3.auxiliary.remote.behavior.IRemoteCacheClient;
29  import org.apache.commons.jcs3.auxiliary.remote.behavior.IRemoteCacheListener;
30  import org.apache.commons.jcs3.engine.CacheStatus;
31  import org.apache.commons.jcs3.engine.CacheWatchRepairable;
32  import org.apache.commons.jcs3.engine.ZombieCacheServiceNonLocal;
33  import org.apache.commons.jcs3.engine.ZombieCacheWatch;
34  import org.apache.commons.jcs3.engine.behavior.ICacheObserver;
35  import org.apache.commons.jcs3.engine.behavior.ICacheServiceNonLocal;
36  import org.apache.commons.jcs3.engine.behavior.ICompositeCacheManager;
37  import org.apache.commons.jcs3.engine.behavior.IElementSerializer;
38  import org.apache.commons.jcs3.engine.logging.behavior.ICacheEventLogger;
39  import org.apache.commons.jcs3.log.Log;
40  import org.apache.commons.jcs3.log.LogManager;
41  
42  /**
43   * An instance of RemoteCacheManager corresponds to one remote connection of a specific host and
44   * port. All RemoteCacheManager instances are monitored by the singleton RemoteCacheMonitor
45   * monitoring daemon for error detection and recovery.
46   * <p>
47   * Getting an instance of the remote cache has the effect of getting a handle on the remote server.
48   * Listeners are not registered with the server until a cache is requested from the manager.
49   */
50  public class RemoteCacheManager
51  {
52      /** The logger */
53      private static final Log log = LogManager.getLog( RemoteCacheManager.class );
54  
55      /** Contains instances of RemoteCacheNoWait managed by a RemoteCacheManager instance. */
56      private final ConcurrentMap<String, RemoteCacheNoWait<?, ?>> caches =
57              new ConcurrentHashMap<>();
58  
59      /** The event logger. */
60      private final ICacheEventLogger cacheEventLogger;
61  
62      /** The serializer. */
63      private final IElementSerializer elementSerializer;
64  
65      /** Handle to the remote cache service; or a zombie handle if failed to connect. */
66      private ICacheServiceNonLocal<?, ?> remoteService;
67  
68      /**
69       * Wrapper of the remote cache watch service; or wrapper of a zombie service if failed to
70       * connect.
71       */
72      private final CacheWatchRepairable remoteWatch;
73  
74      /** The cache manager listeners will need to use to get a cache. */
75      private final ICompositeCacheManager cacheMgr;
76  
77      /** For error notification */
78      private final RemoteCacheMonitor monitor;
79  
80      /** The service found through lookup */
81      private final String registry;
82  
83      /** can it be restored */
84      private boolean canFix = true;
85  
86      /**
87       * Constructs an instance to with the given remote connection parameters. If the connection
88       * cannot be made, "zombie" services will be temporarily used until a successful re-connection
89       * is made by the monitoring daemon.
90       * <p>
91       * @param cattr cache attributes
92       * @param cacheMgr the cache hub
93       * @param monitor the cache monitor thread for error notifications
94       * @param cacheEventLogger
95       * @param elementSerializer
96       */
97      protected RemoteCacheManager( final IRemoteCacheAttributes cattr, final ICompositeCacheManager cacheMgr,
98                                  final RemoteCacheMonitor monitor,
99                                  final ICacheEventLogger cacheEventLogger, final IElementSerializer elementSerializer)
100     {
101         this.cacheMgr = cacheMgr;
102         this.monitor = monitor;
103         this.cacheEventLogger = cacheEventLogger;
104         this.elementSerializer = elementSerializer;
105         this.remoteWatch = new CacheWatchRepairable();
106 
107         this.registry = RemoteUtils.getNamingURL(cattr.getRemoteLocation(), cattr.getRemoteServiceName());
108 
109         try
110         {
111             lookupRemoteService();
112         }
113         catch (final IOException e)
114         {
115             log.error("Could not find server", e);
116             // Notify the cache monitor about the error, and kick off the
117             // recovery process.
118             monitor.notifyError();
119         }
120     }
121 
122     /**
123      * Lookup remote service from registry
124      * @throws IOException if the remote service could not be found
125      *
126      */
127     protected void lookupRemoteService() throws IOException
128     {
129         log.info( "Looking up server [{0}]", registry );
130         try
131         {
132             final Object obj = Naming.lookup( registry );
133             log.info( "Server found: {0}", obj );
134 
135             // Successful connection to the remote server.
136             this.remoteService = (ICacheServiceNonLocal<?, ?>) obj;
137             log.debug( "Remote Service = {0}", remoteService );
138             remoteWatch.setCacheWatch( (ICacheObserver) remoteService );
139         }
140         catch ( final Exception ex )
141         {
142             // Failed to connect to the remote server.
143             // Configure this RemoteCacheManager instance to use the "zombie"
144             // services.
145             this.remoteService = new ZombieCacheServiceNonLocal<>();
146             remoteWatch.setCacheWatch( new ZombieCacheWatch() );
147             throw new IOException( "Problem finding server at [" + registry + "]", ex );
148         }
149     }
150 
151     /**
152      * Adds the remote cache listener to the underlying cache-watch service.
153      * <p>
154      * @param cattr The feature to be added to the RemoteCacheListener attribute
155      * @param listener The feature to be added to the RemoteCacheListener attribute
156      * @throws IOException
157      */
158     public <K, V> void addRemoteCacheListener( final IRemoteCacheAttributes cattr, final IRemoteCacheListener<K, V> listener )
159         throws IOException
160     {
161         if ( cattr.isReceive() )
162         {
163             log.info( "The remote cache is configured to receive events from the remote server. "
164                 + "We will register a listener. remoteWatch = {0} | IRemoteCacheListener = {1}"
165                 + " | cacheName ", remoteWatch, listener, cattr.getCacheName() );
166 
167             remoteWatch.addCacheListener( cattr.getCacheName(), listener );
168         }
169         else
170         {
171             log.info( "The remote cache is configured to NOT receive events from the remote server. "
172                     + "We will NOT register a listener." );
173         }
174     }
175 
176     /**
177      * Removes a listener. When the primary recovers the failover must deregister itself for a
178      * region. The failover runner will call this method to de-register. We do not want to deregister
179      * all listeners to a remote server, in case a failover is a primary of another region. Having
180      * one regions failover act as another servers primary is not currently supported.
181      * <p>
182      * @param cattr
183      * @throws IOException
184      */
185     public void removeRemoteCacheListener( final IRemoteCacheAttributes cattr )
186         throws IOException
187     {
188         final RemoteCacheNoWait<?, ?> cache = caches.get( cattr.getCacheName() );
189         if ( cache != null )
190         {
191         	removeListenerFromCache(cache);
192         }
193         else
194         {
195             if ( cattr.isReceive() )
196             {
197                 log.warn( "Trying to deregister Cache Listener that was never registered." );
198             }
199             else
200             {
201                 log.debug( "Since the remote cache is configured to not receive, "
202                     + "there is no listener to deregister." );
203             }
204         }
205     }
206 
207     // common helper method
208 	private void removeListenerFromCache(final RemoteCacheNoWait<?, ?> cache) throws IOException
209 	{
210 		final IRemoteCacheClient<?, ?> rc = cache.getRemoteCache();
211 	    log.debug( "Found cache for [{0}], deregistering listener.", cache::getCacheName);
212 		// could also store the listener for a server in the manager.
213         remoteWatch.removeCacheListener(cache.getCacheName(), rc.getListener());
214 	}
215 
216     /**
217      * Gets a RemoteCacheNoWait from the RemoteCacheManager. The RemoteCacheNoWait objects are
218      * identified by the cache name value of the RemoteCacheAttributes object.
219      * <p>
220      * If the client is configured to register a listener, this call results on a listener being
221      * created if one isn't already registered with the remote cache for this region.
222      * <p>
223      * @param cattr
224      * @return The cache value
225      */
226     @SuppressWarnings("unchecked") // Need to cast because of common map for all caches
227     public <K, V> RemoteCacheNoWait<K, V> getCache( final IRemoteCacheAttributes cattr )
228     {
229         // might want to do some listener sanity checking here.
230         return (RemoteCacheNoWait<K, V>) caches.computeIfAbsent(cattr.getCacheName(),
231                 key -> newRemoteCacheNoWait(cattr));
232     }
233 
234     /**
235      * Create new RemoteCacheNoWait instance
236      *
237      * @param cattr the cache configuration
238      * @return the instance
239      */
240     protected <K, V> RemoteCacheNoWait<K, V> newRemoteCacheNoWait(final IRemoteCacheAttributes cattr)
241     {
242         final RemoteCacheNoWait<K, V> remoteCacheNoWait;
243         // create a listener first and pass it to the remotecache
244         // sender.
245         RemoteCacheListener<K, V> listener = null;
246         try
247         {
248             listener = new RemoteCacheListener<>( cattr, cacheMgr, elementSerializer );
249             addRemoteCacheListener( cattr, listener );
250         }
251         catch ( final IOException e )
252         {
253             log.error( "Problem adding listener. RemoteCacheListener = {0}",
254                     listener, e );
255         }
256 
257         @SuppressWarnings("unchecked")
258         final IRemoteCacheClient<K, V> remoteCacheClient =
259             new RemoteCache<>(cattr, (ICacheServiceNonLocal<K, V>) remoteService, listener, monitor);
260         remoteCacheClient.setCacheEventLogger( cacheEventLogger );
261         remoteCacheClient.setElementSerializer( elementSerializer );
262 
263         remoteCacheNoWait = new RemoteCacheNoWait<>( remoteCacheClient );
264         remoteCacheNoWait.setCacheEventLogger( cacheEventLogger );
265         remoteCacheNoWait.setElementSerializer( elementSerializer );
266 
267         return remoteCacheNoWait;
268     }
269 
270     /** Shutdown all. */
271     public void release()
272     {
273         caches.forEach((name, cache) -> {
274             try
275             {
276                 log.info("freeCache [{0}]", name);
277 
278                 removeListenerFromCache(cache);
279                 cache.dispose();
280             }
281             catch ( final IOException ex )
282             {
283                 log.error("Problem releasing {0}", name, ex);
284             }
285         });
286         caches.clear();
287     }
288 
289     /**
290      * Fixes up all the caches managed by this cache manager.
291      */
292     public void fixCaches()
293     {
294         if ( !canFix )
295         {
296             return;
297         }
298 
299         log.info( "Fixing caches. ICacheServiceNonLocal {0} | IRemoteCacheObserver {1}",
300                 remoteService, remoteWatch );
301 
302         caches.values().stream()
303             .filter(cache -> cache.getStatus() == CacheStatus.ERROR)
304             .forEach(cache -> cache.fixCache(remoteService));
305 
306         if ( log.isInfoEnabled() )
307         {
308             final String msg = "Remote connection to " + registry + " resumed.";
309             if ( cacheEventLogger != null )
310             {
311                 cacheEventLogger.logApplicationEvent( "RemoteCacheManager", "fix", msg );
312             }
313             log.info( msg );
314         }
315     }
316 
317     /**
318      * Returns true if the connection to the remote host can be
319      * successfully re-established.
320      * <p>
321      * @return true if we found a failover server
322      */
323     public boolean canFixCaches()
324     {
325         try
326         {
327             lookupRemoteService();
328         }
329         catch (final IOException e)
330         {
331             log.error("Could not find server", e);
332             canFix = false;
333         }
334 
335         return canFix;
336     }
337 }