001package org.apache.commons.jcs.auxiliary.remote.server;
002
003/*
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements.  See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership.  The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License.  You may obtain a copy of the License at
011 *
012 *   http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing,
015 * software distributed under the License is distributed on an
016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017 * KIND, either express or implied.  See the License for the
018 * specific language governing permissions and limitations
019 * under the License.
020 */
021
022import java.io.IOException;
023import java.io.Serializable;
024import java.rmi.RemoteException;
025import java.rmi.registry.Registry;
026import java.rmi.server.RMISocketFactory;
027import java.rmi.server.UnicastRemoteObject;
028import java.rmi.server.Unreferenced;
029import java.util.Collections;
030import java.util.Iterator;
031import java.util.Map;
032import java.util.Properties;
033import java.util.Set;
034import java.util.concurrent.ConcurrentHashMap;
035import java.util.concurrent.ConcurrentMap;
036import java.util.concurrent.locks.ReentrantLock;
037
038import org.apache.commons.jcs.access.exception.CacheException;
039import org.apache.commons.jcs.auxiliary.remote.RemoteUtils;
040import org.apache.commons.jcs.auxiliary.remote.behavior.IRemoteCacheListener;
041import org.apache.commons.jcs.auxiliary.remote.server.behavior.IRemoteCacheServer;
042import org.apache.commons.jcs.auxiliary.remote.server.behavior.IRemoteCacheServerAttributes;
043import org.apache.commons.jcs.auxiliary.remote.server.behavior.RemoteType;
044import org.apache.commons.jcs.engine.CacheEventQueueFactory;
045import org.apache.commons.jcs.engine.CacheListeners;
046import org.apache.commons.jcs.engine.behavior.ICacheElement;
047import org.apache.commons.jcs.engine.behavior.ICacheEventQueue;
048import org.apache.commons.jcs.engine.behavior.ICacheListener;
049import org.apache.commons.jcs.engine.control.CompositeCache;
050import org.apache.commons.jcs.engine.control.CompositeCacheManager;
051import org.apache.commons.jcs.engine.logging.CacheEvent;
052import org.apache.commons.jcs.engine.logging.behavior.ICacheEvent;
053import org.apache.commons.jcs.engine.logging.behavior.ICacheEventLogger;
054import org.apache.commons.logging.Log;
055import org.apache.commons.logging.LogFactory;
056
057/**
058 * This class provides remote cache services. The remote cache server propagates events from local
059 * caches to other local caches. It can also store cached data, making it available to new clients.
060 * <p>
061 * Remote cache servers can be clustered. If the cache used by this remote cache is configured to
062 * use a remote cache of type cluster, the two remote caches will communicate with each other.
063 * Remote and put requests can be sent from one remote to another. If they are configured to
064 * broadcast such event to their client, then remove an puts can be sent to all locals in the
065 * cluster.
066 * <p>
067 * Get requests are made between clustered servers if AllowClusterGet is true. You can setup several
068 * clients to use one remote server and several to use another. The get local will be distributed
069 * between the two servers. Since caches are usually high get and low put, this should allow you to
070 * scale.
071 */
072public class RemoteCacheServer<K, V>
073    extends UnicastRemoteObject
074    implements IRemoteCacheServer<K, V>, Unreferenced
075{
076    public static final String DFEAULT_REMOTE_CONFIGURATION_FILE = "/remote.cache.ccf";
077
078    /** For serialization. Don't change. */
079    private static final long serialVersionUID = -8072345435941473116L;
080
081    /** log instance */
082    private static final Log log = LogFactory.getLog( RemoteCacheServer.class );
083
084    /** timing -- if we should record operation times. */
085    private static final boolean timing = true;
086
087    /** Number of puts into the cache. */
088    private int puts = 0;
089
090    /** Maps cache name to CacheListeners object. association of listeners (regions). */
091    private final transient ConcurrentMap<String, CacheListeners<K, V>> cacheListenersMap =
092        new ConcurrentHashMap<String, CacheListeners<K, V>>();
093
094    /** maps cluster listeners to regions. */
095    private final transient ConcurrentMap<String, CacheListeners<K, V>> clusterListenersMap =
096        new ConcurrentHashMap<String, CacheListeners<K, V>>();
097
098    /** The central hub */
099    private transient CompositeCacheManager cacheManager;
100
101    /** relates listener id with a type */
102    private final ConcurrentMap<Long, RemoteType> idTypeMap = new ConcurrentHashMap<Long, RemoteType>();
103
104    /** relates listener id with an ip address */
105    private final ConcurrentMap<Long, String> idIPMap = new ConcurrentHashMap<Long, String>();
106
107    /** Used to get the next listener id. */
108    private final int[] listenerId = new int[1];
109
110    /** Configuration settings. */
111    // package protected for access by unit test code
112    final IRemoteCacheServerAttributes remoteCacheServerAttributes;
113
114    /** The interval at which we will log updates. */
115    private final int logInterval = 100;
116
117    /** An optional event logger */
118    private transient ICacheEventLogger cacheEventLogger;
119
120    /** Lock for Cache listener initialization */
121    private ReentrantLock cacheListenersLock = new ReentrantLock();
122
123    /** Lock for Cluster listener initialization */
124    private ReentrantLock clusterListenersLock = new ReentrantLock();
125
126    /**
127     * Constructor for the RemoteCacheServer object. This initializes the server with the values
128     * from the properties object.
129     * <p>
130     * @param rcsa 
131     * @param config cache hub configuration
132     * @throws RemoteException
133     */
134    protected RemoteCacheServer( IRemoteCacheServerAttributes rcsa, Properties config )
135        throws RemoteException
136    {
137        super( rcsa.getServicePort() );
138        this.remoteCacheServerAttributes = rcsa;
139        init( config );
140    }
141
142    /**
143     * Constructor for the RemoteCacheServer object. This initializes the server with the values
144     * from the properties object.
145     * <p>
146     * @param rcsa
147     * @param config cache hub configuration
148     * @param customRMISocketFactory
149     * @throws RemoteException
150     */
151    protected RemoteCacheServer( IRemoteCacheServerAttributes rcsa, Properties config, RMISocketFactory customRMISocketFactory )
152        throws RemoteException
153    {
154        super( rcsa.getServicePort(), customRMISocketFactory, customRMISocketFactory );
155        this.remoteCacheServerAttributes = rcsa;
156        init( config );
157    }
158
159    /**
160     * Constructor for the RemoteCacheServer object. This initializes the server with the values
161     * from the config file.
162     * <p>
163     * @param rcsa
164     * @throws RemoteException
165     * 
166     * @deprecated Use version with Properties object instead
167     */
168    @Deprecated
169    protected RemoteCacheServer( IRemoteCacheServerAttributes rcsa )
170        throws RemoteException
171    {
172        super( rcsa.getServicePort() );
173        this.remoteCacheServerAttributes = rcsa;
174        init( rcsa.getConfigFileName() );
175    }
176
177    /**
178     * Constructor for the RemoteCacheServer object. This initializes the server with the values
179     * from the config file.
180     * <p>
181     * @param rcsa
182     * @param customRMISocketFactory
183     * @throws RemoteException
184     * 
185     * @deprecated Use version with Properties object instead
186     */
187    @Deprecated
188    protected RemoteCacheServer( IRemoteCacheServerAttributes rcsa, RMISocketFactory customRMISocketFactory )
189        throws RemoteException
190    {
191        super( rcsa.getServicePort(), customRMISocketFactory, customRMISocketFactory );
192        this.remoteCacheServerAttributes = rcsa;
193        init( rcsa.getConfigFileName() );
194    }
195
196    /**
197     * Initialize the RMI Cache Server from a properties file.
198     * <p>
199     * @param prop
200     * @throws RemoteException if the configuration of the cache manager instance fails
201     * 
202     * @deprecated Use version with Properties parameter instead
203     */
204    @Deprecated
205    private void init( String propFile ) throws RemoteException
206    {
207        String propFileName = propFile == null ? DFEAULT_REMOTE_CONFIGURATION_FILE : propFile;
208        
209        Properties prop = null;
210        try
211        {
212            prop = RemoteUtils.loadProps(propFileName);
213        }
214        catch (IOException e)
215        {
216            throw new RemoteException(e.getMessage(), e);
217        }
218        
219        init(prop);
220    }
221    
222    /**
223     * Initialize the RMI Cache Server from a properties object.
224     * <p>
225     * @param prop the configuration properties
226     * @throws RemoteException if the configuration of the cache manager instance fails
227     */
228    private void init( Properties prop ) throws RemoteException
229    {
230        try
231        {
232            cacheManager = createCacheManager( prop );
233        }
234        catch (CacheException e)
235        {
236            throw new RemoteException(e.getMessage(), e);
237        }
238
239        // cacheManager would have created a number of ICache objects.
240        // Use these objects to set up the cacheListenersMap.
241        String[] list = cacheManager.getCacheNames();
242        for ( int i = 0; i < list.length; i++ )
243        {
244            String name = list[i];
245            CompositeCache<K, V> cache = cacheManager.getCache( name );
246            cacheListenersMap.put( name, new CacheListeners<K, V>( cache ) );
247        }
248    }
249
250    /**
251     * Subclass can override this method to create the specific cache manager.
252     * <p>
253     * @param prop the configuration object.
254     * @return The cache hub configured with this configuration.
255     *
256     * @throws CacheException if the configuration cannot be loaded
257     */
258    private CompositeCacheManager createCacheManager( Properties prop ) throws CacheException
259    {
260        CompositeCacheManager hub = CompositeCacheManager.getUnconfiguredInstance();
261        hub.configure( prop );
262        return hub;
263    }
264
265    /**
266     * Puts a cache bean to the remote cache and notifies all listeners which <br>
267     * <ol>
268     * <li>have a different listener id than the originating host;</li>
269     * <li>are currently subscribed to the related cache.</li>
270     * </ol>
271     * <p>
272     * @param item
273     * @throws IOException
274     */
275    public void put( ICacheElement<K, V> item )
276        throws IOException
277    {
278        update( item );
279    }
280
281    /**
282     * @param item
283     * @throws IOException
284     */
285    @Override
286    public void update( ICacheElement<K, V> item )
287        throws IOException
288    {
289        update( item, 0 );
290    }
291
292    /**
293     * The internal processing is wrapped in event logging calls.
294     * <p>
295     * @param item
296     * @param requesterId
297     * @throws IOException
298     */
299    @Override
300    public void update( ICacheElement<K, V> item, long requesterId )
301        throws IOException
302    {
303        ICacheEvent<ICacheElement<K, V>> cacheEvent = createICacheEvent( item, requesterId, ICacheEventLogger.UPDATE_EVENT );
304        try
305        {
306            processUpdate( item, requesterId );
307        }
308        finally
309        {
310            logICacheEvent( cacheEvent );
311        }
312    }
313
314    /**
315     * An update can come from either a local cache's remote auxiliary, or it can come from a remote
316     * server. A remote server is considered a a source of type cluster.
317     * <p>
318     * If the update came from a cluster, then we should tell the cache manager that this was a
319     * remote put. This way, any lateral and remote auxiliaries configured for the region will not
320     * be updated. This is basically how a remote listener works when plugged into a local cache.
321     * <p>
322     * If the cluster is configured to keep local cluster consistency, then all listeners will be
323     * updated. This allows cluster server A to update cluster server B and then B to update its
324     * clients if it is told to keep local cluster consistency. Otherwise, server A will update
325     * server B and B will not tell its clients. If you cluster using lateral caches for instance,
326     * this is how it will work. Updates to a cluster node, will never get to the leaves. The remote
327     * cluster, with local cluster consistency, allows you to update leaves. This basically allows
328     * you to have a failover remote server.
329     * <p>
330     * Since currently a cluster will not try to get from other cluster servers, you can scale a bit
331     * with a cluster configuration. Puts and removes will be broadcasted to all clients, but the
332     * get load on a remote server can be reduced.
333     * <p>
334     * @param item
335     * @param requesterId
336     */
337    private void processUpdate( ICacheElement<K, V> item, long requesterId )
338    {
339        long start = 0;
340        if ( timing )
341        {
342            start = System.currentTimeMillis();
343        }
344
345        logUpdateInfo( item );
346
347        try
348        {
349            CacheListeners<K, V> cacheDesc = getCacheListeners( item.getCacheName() );
350            /* Object val = */item.getVal();
351
352            boolean fromCluster = isRequestFromCluster( requesterId );
353
354            if ( log.isDebugEnabled() )
355            {
356                log.debug( "In update, requesterId = [" + requesterId + "] fromCluster = " + fromCluster );
357            }
358
359            // ordered cache item update and notification.
360            synchronized ( cacheDesc )
361            {
362                try
363                {
364                    CompositeCache<K, V> c = (CompositeCache<K, V>) cacheDesc.cache;
365
366                    // If the source of this request was not from a cluster,
367                    // then consider it a local update. The cache manager will
368                    // try to
369                    // update all auxiliaries.
370                    //
371                    // This requires that two local caches not be connected to
372                    // two clustered remote caches. The failover runner will
373                    // have to make sure of this. ALos, the local cache needs
374                    // avoid updating this source. Will need to pass the source
375                    // id somehow. The remote cache should update all local
376                    // caches
377                    // but not update the cluster source. Cluster remote caches
378                    // should only be updated by the server and not the
379                    // RemoteCache.
380                    if ( fromCluster )
381                    {
382                        if ( log.isDebugEnabled() )
383                        {
384                            log.debug( "Put FROM cluster, NOT updating other auxiliaries for region. "
385                                + " requesterId [" + requesterId + "]" );
386                        }
387                        c.localUpdate( item );
388                    }
389                    else
390                    {
391                        if ( log.isDebugEnabled() )
392                        {
393                            log.debug( "Put NOT from cluster, updating other auxiliaries for region. "
394                                + " requesterId [" + requesterId + "]" );
395                        }
396                        c.update( item );
397                    }
398                }
399                catch ( Exception ce )
400                {
401                    // swallow
402                    if ( log.isInfoEnabled() )
403                    {
404                        log.info( "Exception caught updating item. requesterId [" + requesterId + "] "
405                            + ce.getMessage() );
406                    }
407                }
408
409                // UPDATE LOCALS IF A REQUEST COMES FROM A CLUSTER
410                // IF LOCAL CLUSTER CONSISTENCY IS CONFIGURED
411                if ( !fromCluster || ( fromCluster && remoteCacheServerAttributes.isLocalClusterConsistency() ) )
412                {
413                    ICacheEventQueue<K, V>[] qlist = getEventQList( cacheDesc, requesterId );
414                    if ( log.isDebugEnabled() )
415                    {
416                        log.debug( "qlist.length = " + qlist.length );
417                    }
418                    for ( int i = 0; i < qlist.length; i++ )
419                    {
420                        qlist[i].addPutEvent( item );
421                    }
422                }
423            }
424        }
425        catch ( IOException e )
426        {
427            if ( cacheEventLogger != null )
428            {
429                cacheEventLogger.logError( "RemoteCacheServer", ICacheEventLogger.UPDATE_EVENT, e.getMessage()
430                    + " REGION: " + item.getCacheName() + " ITEM: " + item );
431            }
432
433            log.error( "Trouble in Update. requesterId [" + requesterId + "]", e );
434        }
435
436        // TODO use JAMON for timing
437        if ( timing )
438        {
439            long end = System.currentTimeMillis();
440            if ( log.isDebugEnabled() )
441            {
442                log.debug( "put took " + String.valueOf( end - start ) + " ms." );
443            }
444        }
445    }
446
447    /**
448     * Log some details.
449     * <p>
450     * @param item
451     */
452    private void logUpdateInfo( ICacheElement<K, V> item )
453    {
454        // not thread safe, but it doesn't have to be 100% accurate
455        puts++;
456
457        if ( log.isInfoEnabled() )
458        {
459            if ( puts % logInterval == 0 )
460            {
461                log.info( "puts = " + puts );
462            }
463        }
464
465        if ( log.isDebugEnabled() )
466        {
467            log.debug( "In update, put [" + item.getKey() + "] in [" + item.getCacheName() + "]" );
468        }
469    }
470
471    /**
472     * Returns a cache value from the specified remote cache; or null if the cache or key does not
473     * exist.
474     * <p>
475     * @param cacheName
476     * @param key
477     * @return ICacheElement
478     * @throws IOException
479     */
480    @Override
481    public ICacheElement<K, V> get( String cacheName, K key )
482        throws IOException
483    {
484        return this.get( cacheName, key, 0 );
485    }
486
487    /**
488     * Returns a cache bean from the specified cache; or null if the key does not exist.
489     * <p>
490     * Adding the requestor id, allows the cache to determine the source of the get.
491     * <p>
492     * The internal processing is wrapped in event logging calls.
493     * <p>
494     * @param cacheName
495     * @param key
496     * @param requesterId
497     * @return ICacheElement
498     * @throws IOException
499     */
500    @Override
501    public ICacheElement<K, V> get( String cacheName, K key, long requesterId )
502        throws IOException
503    {
504        ICacheElement<K, V> element = null;
505        ICacheEvent<K> cacheEvent = createICacheEvent( cacheName, key, requesterId, ICacheEventLogger.GET_EVENT );
506        try
507        {
508            element = processGet( cacheName, key, requesterId );
509        }
510        finally
511        {
512            logICacheEvent( cacheEvent );
513        }
514        return element;
515    }
516
517    /**
518     * Returns a cache bean from the specified cache; or null if the key does not exist.
519     * <p>
520     * Adding the requester id, allows the cache to determine the source of the get.
521     * <p>
522     * @param cacheName
523     * @param key
524     * @param requesterId
525     * @return ICacheElement
526     */
527    private ICacheElement<K, V> processGet( String cacheName, K key, long requesterId )
528    {
529        boolean fromCluster = isRequestFromCluster( requesterId );
530
531        if ( log.isDebugEnabled() )
532        {
533            log.debug( "get [" + key + "] from cache [" + cacheName + "] requesterId = [" + requesterId
534                + "] fromCluster = " + fromCluster );
535        }
536
537        CacheListeners<K, V> cacheDesc = null;
538        try
539        {
540            cacheDesc = getCacheListeners( cacheName );
541        }
542        catch ( Exception e )
543        {
544            log.error( "Problem getting listeners.", e );
545
546            if ( cacheEventLogger != null )
547            {
548                cacheEventLogger.logError( "RemoteCacheServer", ICacheEventLogger.GET_EVENT, e.getMessage() + cacheName
549                    + " KEY: " + key );
550            }
551        }
552
553        ICacheElement<K, V> element = getFromCacheListeners( key, fromCluster, cacheDesc, null );
554        return element;
555    }
556
557    /**
558     * Gets the item from the associated cache listeners.
559     * <p>
560     * @param key
561     * @param fromCluster
562     * @param cacheDesc
563     * @param element
564     * @return ICacheElement
565     */
566    private ICacheElement<K, V> getFromCacheListeners( K key, boolean fromCluster, CacheListeners<K, V> cacheDesc,
567                                                 ICacheElement<K, V> element )
568    {
569        ICacheElement<K, V> returnElement = element;
570
571        if ( cacheDesc != null )
572        {
573            CompositeCache<K, V> c = (CompositeCache<K, V>) cacheDesc.cache;
574
575            // If we have a get come in from a client and we don't have the item
576            // locally, we will allow the cache to look in other non local sources,
577            // such as a remote cache or a lateral.
578            //
579            // Since remote servers never get from clients and clients never go
580            // remote from a remote call, this
581            // will not result in any loops.
582            //
583            // This is the only instance I can think of where we allow a remote get
584            // from a remote call. The purpose is to allow remote cache servers to
585            // talk to each other. If one goes down, you want it to be able to get
586            // data from those that were up when the failed server comes back o
587            // line.
588
589            if ( !fromCluster && this.remoteCacheServerAttributes.isAllowClusterGet() )
590            {
591                if ( log.isDebugEnabled() )
592                {
593                    log.debug( "NonLocalGet. fromCluster [" + fromCluster + "] AllowClusterGet ["
594                        + this.remoteCacheServerAttributes.isAllowClusterGet() + "]" );
595                }
596                returnElement = c.get( key );
597            }
598            else
599            {
600                // Gets from cluster type remote will end up here.
601                // Gets from all clients will end up here if allow cluster get is
602                // false.
603
604                if ( log.isDebugEnabled() )
605                {
606                    log.debug( "LocalGet.  fromCluster [" + fromCluster + "] AllowClusterGet ["
607                        + this.remoteCacheServerAttributes.isAllowClusterGet() + "]" );
608                }
609                returnElement = c.localGet( key );
610            }
611        }
612
613        return returnElement;
614    }
615
616    /**
617     * Gets all matching items.
618     * <p>
619     * @param cacheName
620     * @param pattern
621     * @return Map of keys and wrapped objects
622     * @throws IOException
623     */
624    @Override
625    public Map<K, ICacheElement<K, V>> getMatching( String cacheName, String pattern )
626        throws IOException
627    {
628        return getMatching( cacheName, pattern, 0 );
629    }
630
631    /**
632     * Retrieves all matching keys.
633     * <p>
634     * @param cacheName
635     * @param pattern
636     * @param requesterId
637     * @return Map of keys and wrapped objects
638     * @throws IOException
639     */
640    @Override
641    public Map<K, ICacheElement<K, V>> getMatching( String cacheName, String pattern, long requesterId )
642        throws IOException
643    {
644        ICacheEvent<String> cacheEvent = createICacheEvent( cacheName, pattern, requesterId,
645                                                    ICacheEventLogger.GETMATCHING_EVENT );
646        try
647        {
648            return processGetMatching( cacheName, pattern, requesterId );
649        }
650        finally
651        {
652            logICacheEvent( cacheEvent );
653        }
654    }
655
656    /**
657     * Retrieves all matching keys.
658     * <p>
659     * @param cacheName
660     * @param pattern
661     * @param requesterId
662     * @return Map of keys and wrapped objects
663     */
664    protected Map<K, ICacheElement<K, V>> processGetMatching( String cacheName, String pattern, long requesterId )
665    {
666        boolean fromCluster = isRequestFromCluster( requesterId );
667
668        if ( log.isDebugEnabled() )
669        {
670            log.debug( "getMatching [" + pattern + "] from cache [" + cacheName + "] requesterId = [" + requesterId
671                + "] fromCluster = " + fromCluster );
672        }
673
674        CacheListeners<K, V> cacheDesc = null;
675        try
676        {
677            cacheDesc = getCacheListeners( cacheName );
678        }
679        catch ( Exception e )
680        {
681            log.error( "Problem getting listeners.", e );
682
683            if ( cacheEventLogger != null )
684            {
685                cacheEventLogger.logError( "RemoteCacheServer", ICacheEventLogger.GETMATCHING_EVENT, e.getMessage()
686                    + cacheName + " pattern: " + pattern );
687            }
688        }
689
690        return getMatchingFromCacheListeners( pattern, fromCluster, cacheDesc );
691    }
692
693    /**
694     * Gets the item from the associated cache listeners.
695     * <p>
696     * @param pattern
697     * @param fromCluster
698     * @param cacheDesc
699     * @return Map of keys to results
700     */
701    private Map<K, ICacheElement<K, V>> getMatchingFromCacheListeners( String pattern, boolean fromCluster, CacheListeners<K, V> cacheDesc )
702    {
703        Map<K, ICacheElement<K, V>> elements = null;
704        if ( cacheDesc != null )
705        {
706            CompositeCache<K, V> c = (CompositeCache<K, V>) cacheDesc.cache;
707
708            // We always want to go remote and then merge the items.  But this can lead to inconsistencies after
709            // failover recovery.  Removed items may show up.  There is no good way to prevent this.
710            // We should make it configurable.
711
712            if ( !fromCluster && this.remoteCacheServerAttributes.isAllowClusterGet() )
713            {
714                if ( log.isDebugEnabled() )
715                {
716                    log.debug( "NonLocalGetMatching. fromCluster [" + fromCluster + "] AllowClusterGet ["
717                        + this.remoteCacheServerAttributes.isAllowClusterGet() + "]" );
718                }
719                elements = c.getMatching( pattern );
720            }
721            else
722            {
723                // Gets from cluster type remote will end up here.
724                // Gets from all clients will end up here if allow cluster get is
725                // false.
726
727                if ( log.isDebugEnabled() )
728                {
729                    log.debug( "LocalGetMatching.  fromCluster [" + fromCluster + "] AllowClusterGet ["
730                        + this.remoteCacheServerAttributes.isAllowClusterGet() + "]" );
731                }
732                elements = c.localGetMatching( pattern );
733            }
734        }
735        return elements;
736    }
737
738    /**
739     * Gets multiple items from the cache based on the given set of keys.
740     * <p>
741     * @param cacheName
742     * @param keys
743     * @return a map of K key to ICacheElement&lt;K, V&gt; element, or an empty map if there is no
744     *         data in cache for any of these keys
745     * @throws IOException
746     */
747    @Override
748    public Map<K, ICacheElement<K, V>> getMultiple( String cacheName, Set<K> keys )
749        throws IOException
750    {
751        return this.getMultiple( cacheName, keys, 0 );
752    }
753
754    /**
755     * Gets multiple items from the cache based on the given set of keys.
756     * <p>
757     * The internal processing is wrapped in event logging calls.
758     * <p>
759     * @param cacheName
760     * @param keys
761     * @param requesterId
762     * @return a map of K key to ICacheElement&lt;K, V&gt; element, or an empty map if there is no
763     *         data in cache for any of these keys
764     * @throws IOException
765     */
766    @Override
767    public Map<K, ICacheElement<K, V>> getMultiple( String cacheName, Set<K> keys, long requesterId )
768        throws IOException
769    {
770        ICacheEvent<Serializable> cacheEvent = createICacheEvent( cacheName, (Serializable) keys, requesterId,
771                                                    ICacheEventLogger.GETMULTIPLE_EVENT );
772        try
773        {
774            return processGetMultiple( cacheName, keys, requesterId );
775        }
776        finally
777        {
778            logICacheEvent( cacheEvent );
779        }
780    }
781
782    /**
783     * Gets multiple items from the cache based on the given set of keys.
784     * <p>
785     * @param cacheName
786     * @param keys
787     * @param requesterId
788     * @return a map of K key to ICacheElement&lt;K, V&gt; element, or an empty map if there is no
789     *         data in cache for any of these keys
790     */
791    private Map<K, ICacheElement<K, V>> processGetMultiple( String cacheName, Set<K> keys, long requesterId )
792    {
793        boolean fromCluster = isRequestFromCluster( requesterId );
794
795        if ( log.isDebugEnabled() )
796        {
797            log.debug( "getMultiple [" + keys + "] from cache [" + cacheName + "] requesterId = [" + requesterId
798                + "] fromCluster = " + fromCluster );
799        }
800
801        CacheListeners<K, V> cacheDesc = getCacheListeners( cacheName );
802        Map<K, ICacheElement<K, V>> elements = getMultipleFromCacheListeners( keys, null, fromCluster, cacheDesc );
803        return elements;
804    }
805
806    /**
807     * Since a non-receiving remote cache client will not register a listener, it will not have a
808     * listener id assigned from the server. As such the remote server cannot determine if it is a
809     * cluster or a normal client. It will assume that it is a normal client.
810     * <p>
811     * @param requesterId
812     * @return true is from a cluster.
813     */
814    private boolean isRequestFromCluster( long requesterId )
815    {
816        RemoteType remoteTypeL = idTypeMap.get( Long.valueOf( requesterId ) );
817        return remoteTypeL == RemoteType.CLUSTER;
818    }
819
820    /**
821     * Gets the items from the associated cache listeners.
822     * <p>
823     * @param keys
824     * @param elements
825     * @param fromCluster
826     * @param cacheDesc
827     * @return Map
828     */
829    private Map<K, ICacheElement<K, V>> getMultipleFromCacheListeners( Set<K> keys, Map<K, ICacheElement<K, V>> elements, boolean fromCluster, CacheListeners<K, V> cacheDesc )
830    {
831        Map<K, ICacheElement<K, V>> returnElements = elements;
832
833        if ( cacheDesc != null )
834        {
835            CompositeCache<K, V> c = (CompositeCache<K, V>) cacheDesc.cache;
836
837            // If we have a getMultiple come in from a client and we don't have the item
838            // locally, we will allow the cache to look in other non local sources,
839            // such as a remote cache or a lateral.
840            //
841            // Since remote servers never get from clients and clients never go
842            // remote from a remote call, this
843            // will not result in any loops.
844            //
845            // This is the only instance I can think of where we allow a remote get
846            // from a remote call. The purpose is to allow remote cache servers to
847            // talk to each other. If one goes down, you want it to be able to get
848            // data from those that were up when the failed server comes back on
849            // line.
850
851            if ( !fromCluster && this.remoteCacheServerAttributes.isAllowClusterGet() )
852            {
853                if ( log.isDebugEnabled() )
854                {
855                    log.debug( "NonLocalGetMultiple. fromCluster [" + fromCluster + "] AllowClusterGet ["
856                        + this.remoteCacheServerAttributes.isAllowClusterGet() + "]" );
857                }
858
859                returnElements = c.getMultiple( keys );
860            }
861            else
862            {
863                // Gets from cluster type remote will end up here.
864                // Gets from all clients will end up here if allow cluster get is
865                // false.
866
867                if ( log.isDebugEnabled() )
868                {
869                    log.debug( "LocalGetMultiple.  fromCluster [" + fromCluster + "] AllowClusterGet ["
870                        + this.remoteCacheServerAttributes.isAllowClusterGet() + "]" );
871                }
872
873                returnElements = c.localGetMultiple( keys );
874            }
875        }
876
877        return returnElements;
878    }
879
880    /**
881     * Return the keys in the cache.
882     * <p>
883     * @param cacheName the name of the cache region
884     * @see org.apache.commons.jcs.auxiliary.AuxiliaryCache#getKeySet()
885     */
886    @Override
887    public Set<K> getKeySet(String cacheName) throws IOException
888    {
889        return processGetKeySet( cacheName );
890    }
891
892    /**
893     * Gets the set of keys of objects currently in the cache.
894     * <p>
895     * @param cacheName
896     * @return Set
897     */
898    protected Set<K> processGetKeySet( String cacheName )
899    {
900        CacheListeners<K, V> cacheDesc = null;
901        try
902        {
903            cacheDesc = getCacheListeners( cacheName );
904        }
905        catch ( Exception e )
906        {
907            log.error( "Problem getting listeners.", e );
908        }
909
910        if ( cacheDesc == null )
911        {
912            return Collections.emptySet();
913        }
914
915        CompositeCache<K, V> c = (CompositeCache<K, V>) cacheDesc.cache;
916        return c.getKeySet();
917    }
918
919    /**
920     * Removes the given key from the specified remote cache. Defaults the listener id to 0.
921     * <p>
922     * @param cacheName
923     * @param key
924     * @throws IOException
925     */
926    @Override
927    public void remove( String cacheName, K key )
928        throws IOException
929    {
930        remove( cacheName, key, 0 );
931    }
932
933    /**
934     * Remove the key from the cache region and don't tell the source listener about it.
935     * <p>
936     * The internal processing is wrapped in event logging calls.
937     * <p>
938     * @param cacheName
939     * @param key
940     * @param requesterId
941     * @throws IOException
942     */
943    @Override
944    public void remove( String cacheName, K key, long requesterId )
945        throws IOException
946    {
947        ICacheEvent<K> cacheEvent = createICacheEvent( cacheName, key, requesterId, ICacheEventLogger.REMOVE_EVENT );
948        try
949        {
950            processRemove( cacheName, key, requesterId );
951        }
952        finally
953        {
954            logICacheEvent( cacheEvent );
955        }
956    }
957
958    /**
959     * Remove the key from the cache region and don't tell the source listener about it.
960     * <p>
961     * @param cacheName
962     * @param key
963     * @param requesterId
964     * @throws IOException
965     */
966    private void processRemove( String cacheName, K key, long requesterId )
967        throws IOException
968    {
969        if ( log.isDebugEnabled() )
970        {
971            log.debug( "remove [" + key + "] from cache [" + cacheName + "]" );
972        }
973
974        CacheListeners<K, V> cacheDesc = cacheListenersMap.get( cacheName );
975
976        boolean fromCluster = isRequestFromCluster( requesterId );
977
978        if ( cacheDesc != null )
979        {
980            // best attempt to achieve ordered cache item removal and
981            // notification.
982            synchronized ( cacheDesc )
983            {
984                boolean removeSuccess = false;
985
986                // No need to notify if it was not cached.
987                CompositeCache<K, V> c = (CompositeCache<K, V>) cacheDesc.cache;
988
989                if ( fromCluster )
990                {
991                    if ( log.isDebugEnabled() )
992                    {
993                        log.debug( "Remove FROM cluster, NOT updating other auxiliaries for region" );
994                    }
995                    removeSuccess = c.localRemove( key );
996                }
997                else
998                {
999                    if ( log.isDebugEnabled() )
1000                    {
1001                        log.debug( "Remove NOT from cluster, updating other auxiliaries for region" );
1002                    }
1003                    removeSuccess = c.remove( key );
1004                }
1005
1006                if ( log.isDebugEnabled() )
1007                {
1008                    log.debug( "remove [" + key + "] from cache [" + cacheName + "] success (was it found) = "
1009                        + removeSuccess );
1010                }
1011
1012                // UPDATE LOCALS IF A REQUEST COMES FROM A CLUSTER
1013                // IF LOCAL CLUSTER CONSISTENCY IS CONFIGURED
1014                if ( !fromCluster || ( fromCluster && remoteCacheServerAttributes.isLocalClusterConsistency() ) )
1015                {
1016                    ICacheEventQueue<K, V>[] qlist = getEventQList( cacheDesc, requesterId );
1017
1018                    for ( int i = 0; i < qlist.length; i++ )
1019                    {
1020                        qlist[i].addRemoveEvent( key );
1021                    }
1022                }
1023            }
1024        }
1025    }
1026
1027    /**
1028     * Remove all keys from the specified remote cache.
1029     * <p>
1030     * @param cacheName
1031     * @throws IOException
1032     */
1033    @Override
1034    public void removeAll( String cacheName )
1035        throws IOException
1036    {
1037        removeAll( cacheName, 0 );
1038    }
1039
1040    /**
1041     * Remove all keys from the specified remote cache.
1042     * <p>
1043     * The internal processing is wrapped in event logging calls.
1044     * <p>
1045     * @param cacheName
1046     * @param requesterId
1047     * @throws IOException
1048     */
1049    @Override
1050    public void removeAll( String cacheName, long requesterId )
1051        throws IOException
1052    {
1053        ICacheEvent<String> cacheEvent = createICacheEvent( cacheName, "all", requesterId, ICacheEventLogger.REMOVEALL_EVENT );
1054        try
1055        {
1056            processRemoveAll( cacheName, requesterId );
1057        }
1058        finally
1059        {
1060            logICacheEvent( cacheEvent );
1061        }
1062    }
1063
1064    /**
1065     * Remove all keys from the specified remote cache.
1066     * <p>
1067     * @param cacheName
1068     * @param requesterId
1069     * @throws IOException
1070     */
1071    private void processRemoveAll( String cacheName, long requesterId )
1072        throws IOException
1073    {
1074        CacheListeners<K, V> cacheDesc = cacheListenersMap.get( cacheName );
1075
1076        boolean fromCluster = isRequestFromCluster( requesterId );
1077
1078        if ( cacheDesc != null )
1079        {
1080            // best attempt to achieve ordered cache item removal and
1081            // notification.
1082            synchronized ( cacheDesc )
1083            {
1084                // No need to broadcast, or notify if it was not cached.
1085                CompositeCache<K, V> c = (CompositeCache<K, V>) cacheDesc.cache;
1086
1087                if ( fromCluster )
1088                {
1089                    if ( log.isDebugEnabled() )
1090                    {
1091                        log.debug( "RemoveALL FROM cluster, NOT updating other auxiliaries for region" );
1092                    }
1093                    c.localRemoveAll();
1094                }
1095                else
1096                {
1097                    if ( log.isDebugEnabled() )
1098                    {
1099                        log.debug( "RemoveALL NOT from cluster, updating other auxiliaries for region" );
1100                    }
1101                    c.removeAll();
1102                }
1103
1104                // update registered listeners
1105                if ( !fromCluster || ( fromCluster && remoteCacheServerAttributes.isLocalClusterConsistency() ) )
1106                {
1107                    ICacheEventQueue<K, V>[] qlist = getEventQList( cacheDesc, requesterId );
1108
1109                    for ( int i = 0; i < qlist.length; i++ )
1110                    {
1111                        qlist[i].addRemoveAllEvent();
1112                    }
1113                }
1114            }
1115        }
1116    }
1117
1118    /**
1119     * How many put events have we received.
1120     * <p>
1121     * @return puts
1122     */
1123    // Currently only intended for use by unit tests
1124    int getPutCount()
1125    {
1126        return puts;
1127    }
1128
1129    /**
1130     * Frees the specified remote cache.
1131     * <p>
1132     * @param cacheName
1133     * @throws IOException
1134     */
1135    @Override
1136    public void dispose( String cacheName )
1137        throws IOException
1138    {
1139        dispose( cacheName, 0 );
1140    }
1141
1142    /**
1143     * Frees the specified remote cache.
1144     * <p>
1145     * @param cacheName
1146     * @param requesterId
1147     * @throws IOException
1148     */
1149    public void dispose( String cacheName, long requesterId )
1150        throws IOException
1151    {
1152        ICacheEvent<String> cacheEvent = createICacheEvent( cacheName, "none", requesterId, ICacheEventLogger.DISPOSE_EVENT );
1153        try
1154        {
1155            processDispose( cacheName, requesterId );
1156        }
1157        finally
1158        {
1159            logICacheEvent( cacheEvent );
1160        }
1161    }
1162
1163    /**
1164     * @param cacheName
1165     * @param requesterId
1166     * @throws IOException
1167     */
1168    private void processDispose( String cacheName, long requesterId )
1169        throws IOException
1170    {
1171        if ( log.isInfoEnabled() )
1172        {
1173            log.info( "Dispose request received from listener [" + requesterId + "]" );
1174        }
1175
1176        CacheListeners<K, V> cacheDesc = cacheListenersMap.get( cacheName );
1177
1178        // this is dangerous
1179        if ( cacheDesc != null )
1180        {
1181            // best attempt to achieve ordered free-cache-op and notification.
1182            synchronized ( cacheDesc )
1183            {
1184                ICacheEventQueue<K, V>[] qlist = getEventQList( cacheDesc, requesterId );
1185
1186                for ( int i = 0; i < qlist.length; i++ )
1187                {
1188                    qlist[i].addDisposeEvent();
1189                }
1190                cacheManager.freeCache( cacheName );
1191            }
1192        }
1193    }
1194
1195    /**
1196     * Frees all remote caches.
1197     * <p>
1198     * @throws IOException
1199     */
1200    @Override
1201    public void release()
1202        throws IOException
1203    {
1204        for (CacheListeners<K, V> cacheDesc : cacheListenersMap.values())
1205        {
1206            ICacheEventQueue<K, V>[] qlist = getEventQList( cacheDesc, 0 );
1207
1208            for ( int i = 0; i < qlist.length; i++ )
1209            {
1210                qlist[i].addDisposeEvent();
1211            }
1212        }
1213        cacheManager.release();
1214    }
1215
1216    /**
1217     * Returns the cache listener for the specified cache. Creates the cache and the cache
1218     * descriptor if they do not already exist.
1219     * <p>
1220     * @param cacheName
1221     * @return The cacheListeners value
1222     */
1223    protected CacheListeners<K, V> getCacheListeners( String cacheName )
1224    {
1225        CacheListeners<K, V> cacheListeners = cacheListenersMap.get( cacheName );
1226
1227        if ( cacheListeners == null )
1228        {
1229            cacheListenersLock.lock();
1230
1231            try
1232            {
1233                // double check
1234                cacheListeners = cacheListenersMap.get( cacheName );
1235                if ( cacheListeners == null )
1236                {
1237                    CompositeCache<K, V> cache = cacheManager.getCache( cacheName );
1238                    cacheListeners = new CacheListeners<K, V>( cache );
1239                    cacheListenersMap.put( cacheName, cacheListeners );
1240                }
1241            }
1242            finally
1243            {
1244                cacheListenersLock.unlock();
1245            }
1246        }
1247
1248        return cacheListeners;
1249    }
1250
1251    /**
1252     * Gets the clusterListeners attribute of the RemoteCacheServer object.
1253     * <p>
1254     * TODO may be able to remove this
1255     * @param cacheName
1256     * @return The clusterListeners value
1257     */
1258    protected CacheListeners<K, V> getClusterListeners( String cacheName )
1259    {
1260        CacheListeners<K, V> cacheListeners = clusterListenersMap.get( cacheName );
1261
1262        if ( cacheListeners == null )
1263        {
1264            clusterListenersLock.lock();
1265
1266            try
1267            {
1268                cacheListeners = clusterListenersMap.get( cacheName );
1269                if ( cacheListeners == null )
1270                {
1271                    CompositeCache<K, V> cache = cacheManager.getCache( cacheName );
1272                    cacheListeners = new CacheListeners<K, V>( cache );
1273                    clusterListenersMap.put( cacheName, cacheListeners );
1274                }
1275            }
1276            finally
1277            {
1278                clusterListenersLock.unlock();
1279            }
1280        }
1281
1282        return cacheListeners;
1283    }
1284
1285    /**
1286     * Gets the eventQList attribute of the RemoteCacheServer object. This returns the event queues
1287     * stored in the cacheListeners object for a particular region, if the queue is not for this
1288     * requester.
1289     * <p>
1290     * Basically, this makes sure that a request from a particular local cache, identified by its
1291     * listener id, does not result in a call to that same listener.
1292     * <p>
1293     * @param cacheListeners
1294     * @param requesterId
1295     * @return The eventQList value
1296     */
1297    @SuppressWarnings("unchecked") // No generic arrays in java
1298    private ICacheEventQueue<K, V>[] getEventQList( CacheListeners<K, V> cacheListeners, long requesterId )
1299    {
1300        ICacheEventQueue<K, V>[] list = cacheListeners.eventQMap.values().toArray( new ICacheEventQueue[0] );
1301        int count = 0;
1302        // Set those not qualified to null; Count those qualified.
1303        for ( int i = 0; i < list.length; i++ )
1304        {
1305            ICacheEventQueue<K, V> q = list[i];
1306            if ( q.isWorking() && q.getListenerId() != requesterId )
1307            {
1308                count++;
1309            }
1310            else
1311            {
1312                list[i] = null;
1313            }
1314        }
1315        if ( count == list.length )
1316        {
1317            // All qualified.
1318            return list;
1319        }
1320
1321        // Returns only the qualified.
1322        ICacheEventQueue<K, V>[] qq = new ICacheEventQueue[count];
1323        count = 0;
1324        for ( int i = 0; i < list.length; i++ )
1325        {
1326            if ( list[i] != null )
1327            {
1328                qq[count++] = list[i];
1329            }
1330        }
1331        return qq;
1332    }
1333
1334    /**
1335     * Removes dead event queues. Should clean out deregistered listeners.
1336     * <p>
1337     * @param eventQMap
1338     */
1339    private static <KK, VV> void cleanupEventQMap( Map<Long, ICacheEventQueue<KK, VV>> eventQMap )
1340    {
1341        synchronized ( eventQMap )
1342        {
1343            for (Iterator<Map.Entry<Long, ICacheEventQueue<KK, VV>>> itr = eventQMap.entrySet().iterator(); itr.hasNext(); )
1344            {
1345                Map.Entry<Long, ICacheEventQueue<KK, VV>> e = itr.next();
1346                ICacheEventQueue<KK, VV> q = e.getValue();
1347
1348                // this does not care if the q is alive (i.e. if
1349                // there are active threads; it cares if the queue
1350                // is working -- if it has not encountered errors
1351                // above the failure threshold
1352                if ( !q.isWorking() )
1353                {
1354                    itr.remove();
1355                    log.warn( "Cache event queue " + q + " is not working and removed from cache server." );
1356                }
1357            }
1358        }
1359    }
1360
1361    /**
1362     * Subscribes to the specified remote cache.
1363     * <p>
1364     * If the client id is 0, then the remote cache server will increment it's local count and
1365     * assign an id to the client.
1366     * <p>
1367     * @param cacheName the specified remote cache.
1368     * @param listener object to notify for cache changes. must be synchronized since there are
1369     *            remote calls involved.
1370     * @throws IOException
1371     */
1372    @Override
1373    @SuppressWarnings("unchecked") // Need to cast to specific return type from getClusterListeners()
1374    public <KK, VV> void addCacheListener( String cacheName, ICacheListener<KK, VV> listener )
1375        throws IOException
1376    {
1377        if ( cacheName == null || listener == null )
1378        {
1379            throw new IllegalArgumentException( "cacheName and listener must not be null" );
1380        }
1381        CacheListeners<KK, VV> cacheListeners;
1382
1383        IRemoteCacheListener<KK, VV> ircl = (IRemoteCacheListener<KK, VV>) listener;
1384
1385        String listenerAddress = ircl.getLocalHostAddress();
1386
1387        RemoteType remoteType = ircl.getRemoteType();
1388        if ( remoteType == RemoteType.CLUSTER )
1389        {
1390            log.debug( "adding cluster listener, listenerAddress [" + listenerAddress + "]" );
1391            cacheListeners = (CacheListeners<KK, VV>)getClusterListeners( cacheName );
1392        }
1393        else
1394        {
1395            log.debug( "adding normal listener, listenerAddress [" + listenerAddress + "]" );
1396            cacheListeners = (CacheListeners<KK, VV>)getCacheListeners( cacheName );
1397        }
1398        Map<Long, ICacheEventQueue<KK, VV>> eventQMap = cacheListeners.eventQMap;
1399        cleanupEventQMap( eventQMap );
1400
1401        // synchronized ( listenerId )
1402        synchronized ( ICacheListener.class )
1403        {
1404            long id = 0;
1405            try
1406            {
1407                id = listener.getListenerId();
1408                // clients probably shouldn't do this.
1409                if ( id == 0 )
1410                {
1411                    // must start at one so the next gets recognized
1412                    long listenerIdB = nextListenerId();
1413                    if ( log.isDebugEnabled() )
1414                    {
1415                        log.debug( "listener id=" + ( listenerIdB & 0xff ) + " addded for cache [" + cacheName
1416                            + "], listenerAddress [" + listenerAddress + "]" );
1417                    }
1418                    listener.setListenerId( listenerIdB );
1419                    id = listenerIdB;
1420
1421                    // in case it needs synchronization
1422                    String message = "Adding vm listener under new id = [" + listenerIdB + "], listenerAddress ["
1423                        + listenerAddress + "]";
1424                    logApplicationEvent( "RemoteCacheServer", "addCacheListener", message );
1425                    if ( log.isInfoEnabled() )
1426                    {
1427                        log.info( message );
1428                    }
1429                }
1430                else
1431                {
1432                    String message = "Adding listener under existing id = [" + id + "], listenerAddress ["
1433                        + listenerAddress + "]";
1434                    logApplicationEvent( "RemoteCacheServer", "addCacheListener", message );
1435                    if ( log.isInfoEnabled() )
1436                    {
1437                        log.info( message );
1438                    }
1439                    // should confirm the the host is the same as we have on
1440                    // record, just in case a client has made a mistake.
1441                }
1442
1443                // relate the type to an id
1444                this.idTypeMap.put( Long.valueOf( id ), remoteType);
1445                if ( listenerAddress != null )
1446                {
1447                    this.idIPMap.put( Long.valueOf( id ), listenerAddress );
1448                }
1449            }
1450            catch ( IOException ioe )
1451            {
1452                String message = "Problem setting listener id, listenerAddress [" + listenerAddress + "]";
1453                log.error( message, ioe );
1454
1455                if ( cacheEventLogger != null )
1456                {
1457                    cacheEventLogger.logError( "RemoteCacheServer", "addCacheListener", message + " - "
1458                        + ioe.getMessage() );
1459                }
1460            }
1461
1462            CacheEventQueueFactory<KK, VV> fact = new CacheEventQueueFactory<KK, VV>();
1463            ICacheEventQueue<KK, VV> q = fact.createCacheEventQueue( listener, id, cacheName, remoteCacheServerAttributes
1464                .getEventQueuePoolName(), remoteCacheServerAttributes.getEventQueueType() );
1465
1466            eventQMap.put(Long.valueOf(listener.getListenerId()), q);
1467
1468            if ( log.isInfoEnabled() )
1469            {
1470                log.info( cacheListeners );
1471            }
1472        }
1473    }
1474
1475    /**
1476     * Subscribes to all remote caches.
1477     * <p>
1478     * @param listener The feature to be added to the CacheListener attribute
1479     * @throws IOException
1480     */
1481    @Override
1482    public <KK, VV> void addCacheListener( ICacheListener<KK, VV> listener )
1483        throws IOException
1484    {
1485        for (String cacheName : cacheListenersMap.keySet())
1486        {
1487            addCacheListener( cacheName, listener );
1488
1489            if ( log.isDebugEnabled() )
1490            {
1491                log.debug( "Adding listener for cache [" + cacheName + "]" );
1492            }
1493        }
1494    }
1495
1496    /**
1497     * Unsubscribe this listener from this region. If the listener is registered, it will be removed
1498     * from the event queue map list.
1499     * <p>
1500     * @param cacheName
1501     * @param listener
1502     * @throws IOException
1503     */
1504    @Override
1505    public <KK, VV> void removeCacheListener( String cacheName, ICacheListener<KK, VV> listener )
1506        throws IOException
1507    {
1508        removeCacheListener( cacheName, listener.getListenerId() );
1509    }
1510
1511    /**
1512     * Unsubscribe this listener from this region. If the listener is registered, it will be removed
1513     * from the event queue map list.
1514     * <p>
1515     * @param cacheName
1516     * @param listenerId
1517     */
1518    public void removeCacheListener( String cacheName, long listenerId )
1519    {
1520        String message = "Removing listener for cache region = [" + cacheName + "] and listenerId [" + listenerId + "]";
1521        logApplicationEvent( "RemoteCacheServer", "removeCacheListener", message );
1522        if ( log.isInfoEnabled() )
1523        {
1524            log.info( message );
1525        }
1526
1527        boolean isClusterListener = isRequestFromCluster( listenerId );
1528
1529        CacheListeners<K, V> cacheDesc = null;
1530
1531        if ( isClusterListener )
1532        {
1533            cacheDesc = getClusterListeners( cacheName );
1534        }
1535        else
1536        {
1537            cacheDesc = getCacheListeners( cacheName );
1538        }
1539        Map<Long, ICacheEventQueue<K, V>> eventQMap = cacheDesc.eventQMap;
1540        cleanupEventQMap( eventQMap );
1541        ICacheEventQueue<K, V> q = eventQMap.remove( Long.valueOf( listenerId ) );
1542
1543        if ( q != null )
1544        {
1545            if ( log.isDebugEnabled() )
1546            {
1547                log.debug( "Found queue for cache region = [" + cacheName + "] and listenerId  [" + listenerId + "]" );
1548            }
1549            q.destroy();
1550            cleanupEventQMap( eventQMap );
1551        }
1552        else
1553        {
1554            if ( log.isDebugEnabled() )
1555            {
1556                log.debug( "Did not find queue for cache region = [" + cacheName + "] and listenerId [" + listenerId
1557                    + "]" );
1558            }
1559        }
1560
1561        // cleanup
1562        idTypeMap.remove( Long.valueOf( listenerId ) );
1563        idIPMap.remove( Long.valueOf( listenerId ) );
1564
1565        if ( log.isInfoEnabled() )
1566        {
1567            log.info( "After removing listener [" + listenerId + "] cache region " + cacheName + "'s listener size ["
1568                + cacheDesc.eventQMap.size() + "]" );
1569        }
1570    }
1571
1572    /**
1573     * Unsubscribes from all remote caches.
1574     * <p>
1575     * @param listener
1576     * @throws IOException
1577     */
1578    @Override
1579    public <KK, VV> void removeCacheListener( ICacheListener<KK, VV> listener )
1580        throws IOException
1581    {
1582        for (String cacheName : cacheListenersMap.keySet())
1583        {
1584            removeCacheListener( cacheName, listener );
1585
1586            if ( log.isInfoEnabled() )
1587            {
1588                log.info( "Removing listener for cache [" + cacheName + "]" );
1589            }
1590        }
1591    }
1592
1593    /**
1594     * Shuts down the remote server.
1595     * <p>
1596     * @throws IOException
1597     */
1598    @Override
1599    public void shutdown()
1600        throws IOException
1601    {
1602        shutdown("", Registry.REGISTRY_PORT);
1603    }
1604
1605    /**
1606     * Shuts down a server at a particular host and port. Then it calls shutdown on the cache
1607     * itself.
1608     * <p>
1609     * @param host
1610     * @param port
1611     * @throws IOException
1612     */
1613    @Override
1614    public void shutdown( String host, int port )
1615        throws IOException
1616    {
1617        if ( log.isInfoEnabled() )
1618        {
1619            log.info( "Received shutdown request. Shutting down server." );
1620        }
1621
1622        synchronized (listenerId)
1623        {
1624            for (String cacheName : cacheListenersMap.keySet())
1625            {
1626                for (int i = 0; i <= listenerId[0]; i++)
1627                {
1628                    removeCacheListener( cacheName, i );
1629                }
1630
1631                if ( log.isInfoEnabled() )
1632                {
1633                    log.info( "Removing listener for cache [" + cacheName + "]" );
1634                }
1635            }
1636
1637            cacheListenersMap.clear();
1638            clusterListenersMap.clear();
1639        }
1640        RemoteCacheServerFactory.shutdownImpl( host, port );
1641        this.cacheManager.shutDown();
1642    }
1643
1644    /**
1645     * Called by the RMI runtime sometime after the runtime determines that the reference list, the
1646     * list of clients referencing the remote object, becomes empty.
1647     */
1648    // TODO: test out the DGC.
1649    @Override
1650    public void unreferenced()
1651    {
1652        if ( log.isInfoEnabled() )
1653        {
1654            log.info( "*** Server now unreferenced and subject to GC. ***" );
1655        }
1656    }
1657
1658    /**
1659     * Returns the next generated listener id [0,255].
1660     * <p>
1661     * @return the listener id of a client. This should be unique for this server.
1662     */
1663    private long nextListenerId()
1664    {
1665        long id = 0;
1666        if ( listenerId[0] == Integer.MAX_VALUE )
1667        {
1668            synchronized ( listenerId )
1669            {
1670                id = listenerId[0];
1671                listenerId[0] = 0;
1672                // TODO: record & check if the generated id is currently being
1673                // used by a valid listener. Currently if the id wraps after
1674                // Long.MAX_VALUE,
1675                // we just assume it won't collide with an existing listener who
1676                // is live.
1677            }
1678        }
1679        else
1680        {
1681            synchronized ( listenerId )
1682            {
1683                id = ++listenerId[0];
1684            }
1685        }
1686        return id;
1687    }
1688
1689    /**
1690     * Gets the stats attribute of the RemoteCacheServer object.
1691     * <p>
1692     * @return The stats value
1693     * @throws IOException
1694     */
1695    @Override
1696    public String getStats()
1697        throws IOException
1698    {
1699        return cacheManager.getStats();
1700    }
1701
1702    /**
1703     * Logs an event if an event logger is configured.
1704     * <p>
1705     * @param item
1706     * @param requesterId
1707     * @param eventName
1708     * @return ICacheEvent
1709     */
1710    private ICacheEvent<ICacheElement<K, V>> createICacheEvent( ICacheElement<K, V> item, long requesterId, String eventName )
1711    {
1712        if ( cacheEventLogger == null )
1713        {
1714            return new CacheEvent<ICacheElement<K, V>>();
1715        }
1716        String ipAddress = getExtraInfoForRequesterId( requesterId );
1717        return cacheEventLogger
1718            .createICacheEvent( "RemoteCacheServer", item.getCacheName(), eventName, ipAddress, item );
1719    }
1720
1721    /**
1722     * Logs an event if an event logger is configured.
1723     * <p>
1724     * @param cacheName
1725     * @param key
1726     * @param requesterId
1727     * @param eventName
1728     * @return ICacheEvent
1729     */
1730    private <T> ICacheEvent<T> createICacheEvent( String cacheName, T key, long requesterId, String eventName )
1731    {
1732        if ( cacheEventLogger == null )
1733        {
1734            return new CacheEvent<T>();
1735        }
1736        String ipAddress = getExtraInfoForRequesterId( requesterId );
1737        return cacheEventLogger.createICacheEvent( "RemoteCacheServer", cacheName, eventName, ipAddress, key );
1738    }
1739
1740    /**
1741     * Logs an event if an event logger is configured.
1742     * <p>
1743     * @param source
1744     * @param eventName
1745     * @param optionalDetails
1746     */
1747    protected void logApplicationEvent( String source, String eventName, String optionalDetails )
1748    {
1749        if ( cacheEventLogger != null )
1750        {
1751            cacheEventLogger.logApplicationEvent( source, eventName, optionalDetails );
1752        }
1753    }
1754
1755    /**
1756     * Logs an event if an event logger is configured.
1757     * <p>
1758     * @param cacheEvent
1759     */
1760    protected <T> void logICacheEvent( ICacheEvent<T> cacheEvent )
1761    {
1762        if ( cacheEventLogger != null )
1763        {
1764            cacheEventLogger.logICacheEvent( cacheEvent );
1765        }
1766    }
1767
1768    /**
1769     * Ip address for the client, if one is stored.
1770     * <p>
1771     * Protected for testing.
1772     * <p>
1773     * @param requesterId
1774     * @return String
1775     */
1776    protected String getExtraInfoForRequesterId( long requesterId )
1777    {
1778        String ipAddress = idIPMap.get( Long.valueOf( requesterId ) );
1779        return ipAddress;
1780    }
1781
1782    /**
1783     * Allows it to be injected.
1784     * <p>
1785     * @param cacheEventLogger
1786     */
1787    public void setCacheEventLogger( ICacheEventLogger cacheEventLogger )
1788    {
1789        this.cacheEventLogger = cacheEventLogger;
1790    }
1791}