001package org.apache.commons.jcs.auxiliary.remote;
002
003/*
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements.  See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership.  The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License.  You may obtain a copy of the License at
011 *
012 *   http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing,
015 * software distributed under the License is distributed on an
016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017 * KIND, either express or implied.  See the License for the
018 * specific language governing permissions and limitations
019 * under the License.
020 */
021
022import java.io.IOException;
023import java.rmi.Naming;
024import java.util.concurrent.ConcurrentHashMap;
025import java.util.concurrent.ConcurrentMap;
026import java.util.concurrent.locks.ReentrantLock;
027
028import org.apache.commons.jcs.auxiliary.remote.behavior.IRemoteCacheAttributes;
029import org.apache.commons.jcs.auxiliary.remote.behavior.IRemoteCacheClient;
030import org.apache.commons.jcs.auxiliary.remote.behavior.IRemoteCacheListener;
031import org.apache.commons.jcs.engine.CacheStatus;
032import org.apache.commons.jcs.engine.CacheWatchRepairable;
033import org.apache.commons.jcs.engine.ZombieCacheServiceNonLocal;
034import org.apache.commons.jcs.engine.ZombieCacheWatch;
035import org.apache.commons.jcs.engine.behavior.ICacheObserver;
036import org.apache.commons.jcs.engine.behavior.ICacheServiceNonLocal;
037import org.apache.commons.jcs.engine.behavior.ICompositeCacheManager;
038import org.apache.commons.jcs.engine.behavior.IElementSerializer;
039import org.apache.commons.jcs.engine.logging.behavior.ICacheEventLogger;
040import org.apache.commons.logging.Log;
041import org.apache.commons.logging.LogFactory;
042
043/**
044 * An instance of RemoteCacheManager corresponds to one remote connection of a specific host and
045 * port. All RemoteCacheManager instances are monitored by the singleton RemoteCacheMonitor
046 * monitoring daemon for error detection and recovery.
047 * <p>
048 * Getting an instance of the remote cache has the effect of getting a handle on the remote server.
049 * Listeners are not registered with the server until a cache is requested from the manager.
050 */
051public class RemoteCacheManager
052{
053    /** The logger */
054    private static final Log log = LogFactory.getLog( RemoteCacheManager.class );
055
056    /** Contains instances of RemoteCacheNoWait managed by a RemoteCacheManager instance. */
057    private final ConcurrentMap<String, RemoteCacheNoWait<?, ?>> caches =
058            new ConcurrentHashMap<String, RemoteCacheNoWait<?, ?>>();
059
060    /** Lock for initialization of caches */
061    private ReentrantLock cacheLock = new ReentrantLock();
062
063    /** The event logger. */
064    private final ICacheEventLogger cacheEventLogger;
065
066    /** The serializer. */
067    private final IElementSerializer elementSerializer;
068
069    /** Handle to the remote cache service; or a zombie handle if failed to connect. */
070    private ICacheServiceNonLocal<?, ?> remoteService;
071
072    /**
073     * Wrapper of the remote cache watch service; or wrapper of a zombie service if failed to
074     * connect.
075     */
076    private CacheWatchRepairable remoteWatch;
077
078    /** The cache manager listeners will need to use to get a cache. */
079    private final ICompositeCacheManager cacheMgr;
080
081    /** For error notification */
082    private final RemoteCacheMonitor monitor;
083
084    /** The service found through lookup */
085    private final String registry;
086
087    /** can it be restored */
088    private boolean canFix = true;
089
090    /**
091     * Constructs an instance to with the given remote connection parameters. If the connection
092     * cannot be made, "zombie" services will be temporarily used until a successful re-connection
093     * is made by the monitoring daemon.
094     * <p>
095     * @param cattr cache attributes
096     * @param cacheMgr the cache hub
097     * @param monitor the cache monitor thread for error notifications
098     * @param cacheEventLogger
099     * @param elementSerializer
100     */
101    protected RemoteCacheManager( IRemoteCacheAttributes cattr, ICompositeCacheManager cacheMgr,
102                                RemoteCacheMonitor monitor,
103                                ICacheEventLogger cacheEventLogger, IElementSerializer elementSerializer)
104    {
105        this.cacheMgr = cacheMgr;
106        this.monitor = monitor;
107        this.cacheEventLogger = cacheEventLogger;
108        this.elementSerializer = elementSerializer;
109        this.remoteWatch = new CacheWatchRepairable();
110
111        this.registry = RemoteUtils.getNamingURL(cattr.getRemoteLocation(), cattr.getRemoteServiceName());
112
113        try
114        {
115            lookupRemoteService();
116        }
117        catch (IOException e)
118        {
119            log.error("Could not find server", e);
120            // Notify the cache monitor about the error, and kick off the
121            // recovery process.
122            monitor.notifyError();
123        }
124    }
125
126    /**
127     * Lookup remote service from registry
128     * @throws IOException if the remote service could not be found
129     *
130     */
131    protected void lookupRemoteService() throws IOException
132    {
133        if ( log.isInfoEnabled() )
134        {
135            log.info( "Looking up server [" + registry + "]" );
136        }
137        try
138        {
139            Object obj = Naming.lookup( registry );
140            if ( log.isInfoEnabled() )
141            {
142                log.info( "Server found: " + obj );
143            }
144
145            // Successful connection to the remote server.
146            this.remoteService = (ICacheServiceNonLocal<?, ?>) obj;
147            if ( log.isDebugEnabled() )
148            {
149                log.debug( "Remote Service = " + remoteService );
150            }
151            remoteWatch.setCacheWatch( (ICacheObserver) remoteService );
152        }
153        catch ( Exception ex )
154        {
155            // Failed to connect to the remote server.
156            // Configure this RemoteCacheManager instance to use the "zombie"
157            // services.
158            this.remoteService = new ZombieCacheServiceNonLocal<String, String>();
159            remoteWatch.setCacheWatch( new ZombieCacheWatch() );
160            throw new IOException( "Problem finding server at [" + registry + "]", ex );
161        }
162    }
163
164    /**
165     * Adds the remote cache listener to the underlying cache-watch service.
166     * <p>
167     * @param cattr The feature to be added to the RemoteCacheListener attribute
168     * @param listener The feature to be added to the RemoteCacheListener attribute
169     * @throws IOException
170     */
171    public <K, V> void addRemoteCacheListener( IRemoteCacheAttributes cattr, IRemoteCacheListener<K, V> listener )
172        throws IOException
173    {
174        if ( cattr.isReceive() )
175        {
176            if ( log.isInfoEnabled() )
177            {
178                log.info( "The remote cache is configured to receive events from the remote server.  "
179                    + "We will register a listener. remoteWatch = " + remoteWatch + " | IRemoteCacheListener = "
180                    + listener + " | cacheName " + cattr.getCacheName() );
181            }
182
183            remoteWatch.addCacheListener( cattr.getCacheName(), listener );
184        }
185        else
186        {
187            if ( log.isInfoEnabled() )
188            {
189                log.info( "The remote cache is configured to NOT receive events from the remote server.  "
190                    + "We will NOT register a listener." );
191            }
192        }
193    }
194
195    /**
196     * Removes a listener. When the primary recovers the failover must deregister itself for a
197     * region. The failover runner will call this method to de-register. We do not want to deregister
198     * all listeners to a remote server, in case a failover is a primary of another region. Having
199     * one regions failover act as another servers primary is not currently supported.
200     * <p>
201     * @param cattr
202     * @throws IOException
203     */
204    public void removeRemoteCacheListener( IRemoteCacheAttributes cattr )
205        throws IOException
206    {
207        RemoteCacheNoWait<?, ?> cache = caches.get( cattr.getCacheName() );
208        if ( cache != null )
209        {
210                removeListenerFromCache(cache);
211        }
212        else
213        {
214            if ( cattr.isReceive() )
215            {
216                log.warn( "Trying to deregister Cache Listener that was never registered." );
217            }
218            else if ( log.isDebugEnabled() )
219            {
220                log.debug( "Since the remote cache is configured to not receive, "
221                    + "there is no listener to deregister." );
222            }
223        }
224    }
225
226    // common helper method
227        private void removeListenerFromCache(RemoteCacheNoWait<?, ?> cache) throws IOException
228        {
229                IRemoteCacheClient<?, ?> rc = cache.getRemoteCache();
230                if ( log.isDebugEnabled() )
231                {
232                    log.debug( "Found cache for [" + cache.getCacheName() + "], deregistering listener." );
233                }
234                // could also store the listener for a server in the manager.
235                IRemoteCacheListener<?, ?> listener = rc.getListener();
236        remoteWatch.removeCacheListener( cache.getCacheName(), listener );
237        }
238
239    /**
240     * Gets a RemoteCacheNoWait from the RemoteCacheManager. The RemoteCacheNoWait objects are
241     * identified by the cache name value of the RemoteCacheAttributes object.
242     * <p>
243     * If the client is configured to register a listener, this call results on a listener being
244     * created if one isn't already registered with the remote cache for this region.
245     * <p>
246     * @param cattr
247     * @return The cache value
248     */
249    @SuppressWarnings("unchecked") // Need to cast because of common map for all caches
250    public <K, V> RemoteCacheNoWait<K, V> getCache( IRemoteCacheAttributes cattr )
251    {
252        RemoteCacheNoWait<K, V> remoteCacheNoWait = (RemoteCacheNoWait<K, V>) caches.get( cattr.getCacheName() );
253
254        if ( remoteCacheNoWait == null )
255        {
256            cacheLock.lock();
257
258            try
259            {
260                remoteCacheNoWait = (RemoteCacheNoWait<K, V>) caches.get( cattr.getCacheName() );
261
262                if (remoteCacheNoWait == null)
263                {
264                    remoteCacheNoWait = newRemoteCacheNoWait(cattr);
265                    caches.put( cattr.getCacheName(), remoteCacheNoWait );
266                }
267            }
268            finally
269            {
270                cacheLock.unlock();
271            }
272        }
273
274        // might want to do some listener sanity checking here.
275
276        return remoteCacheNoWait;
277    }
278
279    /**
280     * Create new RemoteCacheNoWait instance
281     *
282     * @param cattr the cache configuration
283     * @return the instance
284     */
285    protected <K, V> RemoteCacheNoWait<K, V> newRemoteCacheNoWait(IRemoteCacheAttributes cattr)
286    {
287        RemoteCacheNoWait<K, V> remoteCacheNoWait;
288        // create a listener first and pass it to the remotecache
289        // sender.
290        RemoteCacheListener<K, V> listener = null;
291        try
292        {
293            listener = new RemoteCacheListener<K, V>( cattr, cacheMgr, elementSerializer );
294            addRemoteCacheListener( cattr, listener );
295        }
296        catch ( IOException ioe )
297        {
298            log.error( "Problem adding listener. Message: " + ioe.getMessage()
299                + " | RemoteCacheListener = " + listener, ioe );
300        }
301        catch ( Exception e )
302        {
303            log.error( "Problem adding listener. Message: " + e.getMessage() + " | RemoteCacheListener = "
304                + listener, e );
305        }
306
307        IRemoteCacheClient<K, V> remoteCacheClient =
308            new RemoteCache<K, V>( cattr, (ICacheServiceNonLocal<K, V>) remoteService, listener, monitor );
309        remoteCacheClient.setCacheEventLogger( cacheEventLogger );
310        remoteCacheClient.setElementSerializer( elementSerializer );
311
312        remoteCacheNoWait = new RemoteCacheNoWait<K, V>( remoteCacheClient );
313        remoteCacheNoWait.setCacheEventLogger( cacheEventLogger );
314        remoteCacheNoWait.setElementSerializer( elementSerializer );
315
316        return remoteCacheNoWait;
317    }
318
319    /** Shutdown all. */
320    public void release()
321    {
322        cacheLock.lock();
323
324        try
325        {
326            for (RemoteCacheNoWait<?, ?> c : caches.values())
327            {
328                try
329                {
330                    if ( log.isInfoEnabled() )
331                    {
332                        log.info( "freeCache [" + c.getCacheName() + "]" );
333                    }
334
335                    removeListenerFromCache(c);
336                    c.dispose();
337                }
338                catch ( IOException ex )
339                {
340                    log.error( "Problem releasing " + c.getCacheName(), ex );
341                }
342            }
343
344            caches.clear();
345        }
346        finally
347        {
348            cacheLock.unlock();
349        }
350    }
351
352    /**
353     * Fixes up all the caches managed by this cache manager.
354     */
355    public void fixCaches()
356    {
357        if ( !canFix )
358        {
359            return;
360        }
361
362        if ( log.isInfoEnabled() )
363        {
364            log.info( "Fixing caches. ICacheServiceNonLocal " + remoteService + " | IRemoteCacheObserver " + remoteWatch );
365        }
366
367        for (RemoteCacheNoWait<?, ?> c : caches.values())
368        {
369            if (c.getStatus() == CacheStatus.ERROR)
370            {
371                c.fixCache( remoteService );
372            }
373        }
374
375        if ( log.isInfoEnabled() )
376        {
377            String msg = "Remote connection to " + registry + " resumed.";
378            if ( cacheEventLogger != null )
379            {
380                cacheEventLogger.logApplicationEvent( "RemoteCacheManager", "fix", msg );
381            }
382            log.info( msg );
383        }
384    }
385
386    /**
387     * Returns true if the connection to the remote host can be
388     * successfully re-established.
389     * <p>
390     * @return true if we found a failover server
391     */
392    public boolean canFixCaches()
393    {
394        try
395        {
396            lookupRemoteService();
397        }
398        catch (IOException e)
399        {
400            log.error("Could not find server", e);
401            canFix = false;
402        }
403
404        return canFix;
405    }
406}