View Javadoc
1   package org.apache.commons.jcs.auxiliary.remote.server;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.IOException;
23  import java.io.Serializable;
24  import java.rmi.RemoteException;
25  import java.rmi.registry.Registry;
26  import java.rmi.server.RMISocketFactory;
27  import java.rmi.server.UnicastRemoteObject;
28  import java.rmi.server.Unreferenced;
29  import java.util.Collections;
30  import java.util.Iterator;
31  import java.util.Map;
32  import java.util.Properties;
33  import java.util.Set;
34  import java.util.concurrent.ConcurrentHashMap;
35  import java.util.concurrent.ConcurrentMap;
36  import java.util.concurrent.locks.ReentrantLock;
37  
38  import org.apache.commons.jcs.access.exception.CacheException;
39  import org.apache.commons.jcs.auxiliary.remote.RemoteUtils;
40  import org.apache.commons.jcs.auxiliary.remote.behavior.IRemoteCacheListener;
41  import org.apache.commons.jcs.auxiliary.remote.server.behavior.IRemoteCacheServer;
42  import org.apache.commons.jcs.auxiliary.remote.server.behavior.IRemoteCacheServerAttributes;
43  import org.apache.commons.jcs.auxiliary.remote.server.behavior.RemoteType;
44  import org.apache.commons.jcs.engine.CacheEventQueueFactory;
45  import org.apache.commons.jcs.engine.CacheListeners;
46  import org.apache.commons.jcs.engine.behavior.ICacheElement;
47  import org.apache.commons.jcs.engine.behavior.ICacheEventQueue;
48  import org.apache.commons.jcs.engine.behavior.ICacheListener;
49  import org.apache.commons.jcs.engine.control.CompositeCache;
50  import org.apache.commons.jcs.engine.control.CompositeCacheManager;
51  import org.apache.commons.jcs.engine.logging.CacheEvent;
52  import org.apache.commons.jcs.engine.logging.behavior.ICacheEvent;
53  import org.apache.commons.jcs.engine.logging.behavior.ICacheEventLogger;
54  import org.apache.commons.logging.Log;
55  import org.apache.commons.logging.LogFactory;
56  
57  /**
58   * This class provides remote cache services. The remote cache server propagates events from local
59   * caches to other local caches. It can also store cached data, making it available to new clients.
60   * <p>
61   * Remote cache servers can be clustered. If the cache used by this remote cache is configured to
62   * use a remote cache of type cluster, the two remote caches will communicate with each other.
63   * Remote and put requests can be sent from one remote to another. If they are configured to
64   * broadcast such event to their client, then remove an puts can be sent to all locals in the
65   * cluster.
66   * <p>
67   * Get requests are made between clustered servers if AllowClusterGet is true. You can setup several
68   * clients to use one remote server and several to use another. The get local will be distributed
69   * between the two servers. Since caches are usually high get and low put, this should allow you to
70   * scale.
71   */
72  public class RemoteCacheServer<K, V>
73      extends UnicastRemoteObject
74      implements IRemoteCacheServer<K, V>, Unreferenced
75  {
76      public static final String DFEAULT_REMOTE_CONFIGURATION_FILE = "/remote.cache.ccf";
77  
78      /** For serialization. Don't change. */
79      private static final long serialVersionUID = -8072345435941473116L;
80  
81      /** log instance */
82      private static final Log log = LogFactory.getLog( RemoteCacheServer.class );
83  
84      /** timing -- if we should record operation times. */
85      private static final boolean timing = true;
86  
87      /** Number of puts into the cache. */
88      private int puts = 0;
89  
90      /** Maps cache name to CacheListeners object. association of listeners (regions). */
91      private final transient ConcurrentMap<String, CacheListeners<K, V>> cacheListenersMap =
92          new ConcurrentHashMap<String, CacheListeners<K, V>>();
93  
94      /** maps cluster listeners to regions. */
95      private final transient ConcurrentMap<String, CacheListeners<K, V>> clusterListenersMap =
96          new ConcurrentHashMap<String, CacheListeners<K, V>>();
97  
98      /** The central hub */
99      private transient CompositeCacheManager cacheManager;
100 
101     /** relates listener id with a type */
102     private final ConcurrentMap<Long, RemoteType> idTypeMap = new ConcurrentHashMap<Long, RemoteType>();
103 
104     /** relates listener id with an ip address */
105     private final ConcurrentMap<Long, String> idIPMap = new ConcurrentHashMap<Long, String>();
106 
107     /** Used to get the next listener id. */
108     private final int[] listenerId = new int[1];
109 
110     /** Configuration settings. */
111     // package protected for access by unit test code
112     final IRemoteCacheServerAttributes remoteCacheServerAttributes;
113 
114     /** The interval at which we will log updates. */
115     private final int logInterval = 100;
116 
117     /** An optional event logger */
118     private transient ICacheEventLogger cacheEventLogger;
119 
120     /** Lock for Cache listener initialization */
121     private ReentrantLock cacheListenersLock = new ReentrantLock();
122 
123     /** Lock for Cluster listener initialization */
124     private ReentrantLock clusterListenersLock = new ReentrantLock();
125 
126     /**
127      * Constructor for the RemoteCacheServer object. This initializes the server with the values
128      * from the properties object.
129      * <p>
130      * @param rcsa 
131      * @param config cache hub configuration
132      * @throws RemoteException
133      */
134     protected RemoteCacheServer( IRemoteCacheServerAttributes rcsa, Properties config )
135         throws RemoteException
136     {
137         super( rcsa.getServicePort() );
138         this.remoteCacheServerAttributes = rcsa;
139         init( config );
140     }
141 
142     /**
143      * Constructor for the RemoteCacheServer object. This initializes the server with the values
144      * from the properties object.
145      * <p>
146      * @param rcsa
147      * @param config cache hub configuration
148      * @param customRMISocketFactory
149      * @throws RemoteException
150      */
151     protected RemoteCacheServer( IRemoteCacheServerAttributes rcsa, Properties config, RMISocketFactory customRMISocketFactory )
152         throws RemoteException
153     {
154         super( rcsa.getServicePort(), customRMISocketFactory, customRMISocketFactory );
155         this.remoteCacheServerAttributes = rcsa;
156         init( config );
157     }
158 
159     /**
160      * Constructor for the RemoteCacheServer object. This initializes the server with the values
161      * from the config file.
162      * <p>
163      * @param rcsa
164      * @throws RemoteException
165      * 
166      * @deprecated Use version with Properties object instead
167      */
168     @Deprecated
169     protected RemoteCacheServer( IRemoteCacheServerAttributes rcsa )
170         throws RemoteException
171     {
172         super( rcsa.getServicePort() );
173         this.remoteCacheServerAttributes = rcsa;
174         init( rcsa.getConfigFileName() );
175     }
176 
177     /**
178      * Constructor for the RemoteCacheServer object. This initializes the server with the values
179      * from the config file.
180      * <p>
181      * @param rcsa
182      * @param customRMISocketFactory
183      * @throws RemoteException
184      * 
185      * @deprecated Use version with Properties object instead
186      */
187     @Deprecated
188     protected RemoteCacheServer( IRemoteCacheServerAttributes rcsa, RMISocketFactory customRMISocketFactory )
189         throws RemoteException
190     {
191         super( rcsa.getServicePort(), customRMISocketFactory, customRMISocketFactory );
192         this.remoteCacheServerAttributes = rcsa;
193         init( rcsa.getConfigFileName() );
194     }
195 
196     /**
197      * Initialize the RMI Cache Server from a properties file.
198      * <p>
199      * @param prop
200      * @throws RemoteException if the configuration of the cache manager instance fails
201      * 
202      * @deprecated Use version with Properties parameter instead
203      */
204     @Deprecated
205     private void init( String propFile ) throws RemoteException
206     {
207         String propFileName = propFile == null ? DFEAULT_REMOTE_CONFIGURATION_FILE : propFile;
208         
209         Properties prop = null;
210         try
211         {
212             prop = RemoteUtils.loadProps(propFileName);
213         }
214         catch (IOException e)
215         {
216             throw new RemoteException(e.getMessage(), e);
217         }
218         
219         init(prop);
220     }
221     
222     /**
223      * Initialize the RMI Cache Server from a properties object.
224      * <p>
225      * @param prop the configuration properties
226      * @throws RemoteException if the configuration of the cache manager instance fails
227      */
228     private void init( Properties prop ) throws RemoteException
229     {
230         try
231         {
232             cacheManager = createCacheManager( prop );
233         }
234         catch (CacheException e)
235         {
236             throw new RemoteException(e.getMessage(), e);
237         }
238 
239         // cacheManager would have created a number of ICache objects.
240         // Use these objects to set up the cacheListenersMap.
241         String[] list = cacheManager.getCacheNames();
242         for ( int i = 0; i < list.length; i++ )
243         {
244             String name = list[i];
245             CompositeCache<K, V> cache = cacheManager.getCache( name );
246             cacheListenersMap.put( name, new CacheListeners<K, V>( cache ) );
247         }
248     }
249 
250     /**
251      * Subclass can override this method to create the specific cache manager.
252      * <p>
253      * @param prop the configuration object.
254      * @return The cache hub configured with this configuration.
255      *
256      * @throws CacheException if the configuration cannot be loaded
257      */
258     private CompositeCacheManager createCacheManager( Properties prop ) throws CacheException
259     {
260         CompositeCacheManager hub = CompositeCacheManager.getUnconfiguredInstance();
261         hub.configure( prop );
262         return hub;
263     }
264 
265     /**
266      * Puts a cache bean to the remote cache and notifies all listeners which <br>
267      * <ol>
268      * <li>have a different listener id than the originating host;</li>
269      * <li>are currently subscribed to the related cache.</li>
270      * </ol>
271      * <p>
272      * @param item
273      * @throws IOException
274      */
275     public void put( ICacheElement<K, V> item )
276         throws IOException
277     {
278         update( item );
279     }
280 
281     /**
282      * @param item
283      * @throws IOException
284      */
285     @Override
286     public void update( ICacheElement<K, V> item )
287         throws IOException
288     {
289         update( item, 0 );
290     }
291 
292     /**
293      * The internal processing is wrapped in event logging calls.
294      * <p>
295      * @param item
296      * @param requesterId
297      * @throws IOException
298      */
299     @Override
300     public void update( ICacheElement<K, V> item, long requesterId )
301         throws IOException
302     {
303         ICacheEvent<ICacheElement<K, V>> cacheEvent = createICacheEvent( item, requesterId, ICacheEventLogger.UPDATE_EVENT );
304         try
305         {
306             processUpdate( item, requesterId );
307         }
308         finally
309         {
310             logICacheEvent( cacheEvent );
311         }
312     }
313 
314     /**
315      * An update can come from either a local cache's remote auxiliary, or it can come from a remote
316      * server. A remote server is considered a a source of type cluster.
317      * <p>
318      * If the update came from a cluster, then we should tell the cache manager that this was a
319      * remote put. This way, any lateral and remote auxiliaries configured for the region will not
320      * be updated. This is basically how a remote listener works when plugged into a local cache.
321      * <p>
322      * If the cluster is configured to keep local cluster consistency, then all listeners will be
323      * updated. This allows cluster server A to update cluster server B and then B to update its
324      * clients if it is told to keep local cluster consistency. Otherwise, server A will update
325      * server B and B will not tell its clients. If you cluster using lateral caches for instance,
326      * this is how it will work. Updates to a cluster node, will never get to the leaves. The remote
327      * cluster, with local cluster consistency, allows you to update leaves. This basically allows
328      * you to have a failover remote server.
329      * <p>
330      * Since currently a cluster will not try to get from other cluster servers, you can scale a bit
331      * with a cluster configuration. Puts and removes will be broadcasted to all clients, but the
332      * get load on a remote server can be reduced.
333      * <p>
334      * @param item
335      * @param requesterId
336      */
337     private void processUpdate( ICacheElement<K, V> item, long requesterId )
338     {
339         long start = 0;
340         if ( timing )
341         {
342             start = System.currentTimeMillis();
343         }
344 
345         logUpdateInfo( item );
346 
347         try
348         {
349             CacheListeners<K, V> cacheDesc = getCacheListeners( item.getCacheName() );
350             /* Object val = */item.getVal();
351 
352             boolean fromCluster = isRequestFromCluster( requesterId );
353 
354             if ( log.isDebugEnabled() )
355             {
356                 log.debug( "In update, requesterId = [" + requesterId + "] fromCluster = " + fromCluster );
357             }
358 
359             // ordered cache item update and notification.
360             synchronized ( cacheDesc )
361             {
362                 try
363                 {
364                     CompositeCache<K, V> c = (CompositeCache<K, V>) cacheDesc.cache;
365 
366                     // If the source of this request was not from a cluster,
367                     // then consider it a local update. The cache manager will
368                     // try to
369                     // update all auxiliaries.
370                     //
371                     // This requires that two local caches not be connected to
372                     // two clustered remote caches. The failover runner will
373                     // have to make sure of this. ALos, the local cache needs
374                     // avoid updating this source. Will need to pass the source
375                     // id somehow. The remote cache should update all local
376                     // caches
377                     // but not update the cluster source. Cluster remote caches
378                     // should only be updated by the server and not the
379                     // RemoteCache.
380                     if ( fromCluster )
381                     {
382                         if ( log.isDebugEnabled() )
383                         {
384                             log.debug( "Put FROM cluster, NOT updating other auxiliaries for region. "
385                                 + " requesterId [" + requesterId + "]" );
386                         }
387                         c.localUpdate( item );
388                     }
389                     else
390                     {
391                         if ( log.isDebugEnabled() )
392                         {
393                             log.debug( "Put NOT from cluster, updating other auxiliaries for region. "
394                                 + " requesterId [" + requesterId + "]" );
395                         }
396                         c.update( item );
397                     }
398                 }
399                 catch ( Exception ce )
400                 {
401                     // swallow
402                     if ( log.isInfoEnabled() )
403                     {
404                         log.info( "Exception caught updating item. requesterId [" + requesterId + "] "
405                             + ce.getMessage() );
406                     }
407                 }
408 
409                 // UPDATE LOCALS IF A REQUEST COMES FROM A CLUSTER
410                 // IF LOCAL CLUSTER CONSISTENCY IS CONFIGURED
411                 if ( !fromCluster || ( fromCluster && remoteCacheServerAttributes.isLocalClusterConsistency() ) )
412                 {
413                     ICacheEventQueue<K, V>[] qlist = getEventQList( cacheDesc, requesterId );
414                     if ( log.isDebugEnabled() )
415                     {
416                         log.debug( "qlist.length = " + qlist.length );
417                     }
418                     for ( int i = 0; i < qlist.length; i++ )
419                     {
420                         qlist[i].addPutEvent( item );
421                     }
422                 }
423             }
424         }
425         catch ( IOException e )
426         {
427             if ( cacheEventLogger != null )
428             {
429                 cacheEventLogger.logError( "RemoteCacheServer", ICacheEventLogger.UPDATE_EVENT, e.getMessage()
430                     + " REGION: " + item.getCacheName() + " ITEM: " + item );
431             }
432 
433             log.error( "Trouble in Update. requesterId [" + requesterId + "]", e );
434         }
435 
436         // TODO use JAMON for timing
437         if ( timing )
438         {
439             long end = System.currentTimeMillis();
440             if ( log.isDebugEnabled() )
441             {
442                 log.debug( "put took " + String.valueOf( end - start ) + " ms." );
443             }
444         }
445     }
446 
447     /**
448      * Log some details.
449      * <p>
450      * @param item
451      */
452     private void logUpdateInfo( ICacheElement<K, V> item )
453     {
454         // not thread safe, but it doesn't have to be 100% accurate
455         puts++;
456 
457         if ( log.isInfoEnabled() )
458         {
459             if ( puts % logInterval == 0 )
460             {
461                 log.info( "puts = " + puts );
462             }
463         }
464 
465         if ( log.isDebugEnabled() )
466         {
467             log.debug( "In update, put [" + item.getKey() + "] in [" + item.getCacheName() + "]" );
468         }
469     }
470 
471     /**
472      * Returns a cache value from the specified remote cache; or null if the cache or key does not
473      * exist.
474      * <p>
475      * @param cacheName
476      * @param key
477      * @return ICacheElement
478      * @throws IOException
479      */
480     @Override
481     public ICacheElement<K, V> get( String cacheName, K key )
482         throws IOException
483     {
484         return this.get( cacheName, key, 0 );
485     }
486 
487     /**
488      * Returns a cache bean from the specified cache; or null if the key does not exist.
489      * <p>
490      * Adding the requestor id, allows the cache to determine the source of the get.
491      * <p>
492      * The internal processing is wrapped in event logging calls.
493      * <p>
494      * @param cacheName
495      * @param key
496      * @param requesterId
497      * @return ICacheElement
498      * @throws IOException
499      */
500     @Override
501     public ICacheElement<K, V> get( String cacheName, K key, long requesterId )
502         throws IOException
503     {
504         ICacheElement<K, V> element = null;
505         ICacheEvent<K> cacheEvent = createICacheEvent( cacheName, key, requesterId, ICacheEventLogger.GET_EVENT );
506         try
507         {
508             element = processGet( cacheName, key, requesterId );
509         }
510         finally
511         {
512             logICacheEvent( cacheEvent );
513         }
514         return element;
515     }
516 
517     /**
518      * Returns a cache bean from the specified cache; or null if the key does not exist.
519      * <p>
520      * Adding the requester id, allows the cache to determine the source of the get.
521      * <p>
522      * @param cacheName
523      * @param key
524      * @param requesterId
525      * @return ICacheElement
526      */
527     private ICacheElement<K, V> processGet( String cacheName, K key, long requesterId )
528     {
529         boolean fromCluster = isRequestFromCluster( requesterId );
530 
531         if ( log.isDebugEnabled() )
532         {
533             log.debug( "get [" + key + "] from cache [" + cacheName + "] requesterId = [" + requesterId
534                 + "] fromCluster = " + fromCluster );
535         }
536 
537         CacheListeners<K, V> cacheDesc = null;
538         try
539         {
540             cacheDesc = getCacheListeners( cacheName );
541         }
542         catch ( Exception e )
543         {
544             log.error( "Problem getting listeners.", e );
545 
546             if ( cacheEventLogger != null )
547             {
548                 cacheEventLogger.logError( "RemoteCacheServer", ICacheEventLogger.GET_EVENT, e.getMessage() + cacheName
549                     + " KEY: " + key );
550             }
551         }
552 
553         ICacheElement<K, V> element = getFromCacheListeners( key, fromCluster, cacheDesc, null );
554         return element;
555     }
556 
557     /**
558      * Gets the item from the associated cache listeners.
559      * <p>
560      * @param key
561      * @param fromCluster
562      * @param cacheDesc
563      * @param element
564      * @return ICacheElement
565      */
566     private ICacheElement<K, V> getFromCacheListeners( K key, boolean fromCluster, CacheListeners<K, V> cacheDesc,
567                                                  ICacheElement<K, V> element )
568     {
569         ICacheElement<K, V> returnElement = element;
570 
571         if ( cacheDesc != null )
572         {
573             CompositeCache<K, V> c = (CompositeCache<K, V>) cacheDesc.cache;
574 
575             // If we have a get come in from a client and we don't have the item
576             // locally, we will allow the cache to look in other non local sources,
577             // such as a remote cache or a lateral.
578             //
579             // Since remote servers never get from clients and clients never go
580             // remote from a remote call, this
581             // will not result in any loops.
582             //
583             // This is the only instance I can think of where we allow a remote get
584             // from a remote call. The purpose is to allow remote cache servers to
585             // talk to each other. If one goes down, you want it to be able to get
586             // data from those that were up when the failed server comes back o
587             // line.
588 
589             if ( !fromCluster && this.remoteCacheServerAttributes.isAllowClusterGet() )
590             {
591                 if ( log.isDebugEnabled() )
592                 {
593                     log.debug( "NonLocalGet. fromCluster [" + fromCluster + "] AllowClusterGet ["
594                         + this.remoteCacheServerAttributes.isAllowClusterGet() + "]" );
595                 }
596                 returnElement = c.get( key );
597             }
598             else
599             {
600                 // Gets from cluster type remote will end up here.
601                 // Gets from all clients will end up here if allow cluster get is
602                 // false.
603 
604                 if ( log.isDebugEnabled() )
605                 {
606                     log.debug( "LocalGet.  fromCluster [" + fromCluster + "] AllowClusterGet ["
607                         + this.remoteCacheServerAttributes.isAllowClusterGet() + "]" );
608                 }
609                 returnElement = c.localGet( key );
610             }
611         }
612 
613         return returnElement;
614     }
615 
616     /**
617      * Gets all matching items.
618      * <p>
619      * @param cacheName
620      * @param pattern
621      * @return Map of keys and wrapped objects
622      * @throws IOException
623      */
624     @Override
625     public Map<K, ICacheElement<K, V>> getMatching( String cacheName, String pattern )
626         throws IOException
627     {
628         return getMatching( cacheName, pattern, 0 );
629     }
630 
631     /**
632      * Retrieves all matching keys.
633      * <p>
634      * @param cacheName
635      * @param pattern
636      * @param requesterId
637      * @return Map of keys and wrapped objects
638      * @throws IOException
639      */
640     @Override
641     public Map<K, ICacheElement<K, V>> getMatching( String cacheName, String pattern, long requesterId )
642         throws IOException
643     {
644         ICacheEvent<String> cacheEvent = createICacheEvent( cacheName, pattern, requesterId,
645                                                     ICacheEventLogger.GETMATCHING_EVENT );
646         try
647         {
648             return processGetMatching( cacheName, pattern, requesterId );
649         }
650         finally
651         {
652             logICacheEvent( cacheEvent );
653         }
654     }
655 
656     /**
657      * Retrieves all matching keys.
658      * <p>
659      * @param cacheName
660      * @param pattern
661      * @param requesterId
662      * @return Map of keys and wrapped objects
663      */
664     protected Map<K, ICacheElement<K, V>> processGetMatching( String cacheName, String pattern, long requesterId )
665     {
666         boolean fromCluster = isRequestFromCluster( requesterId );
667 
668         if ( log.isDebugEnabled() )
669         {
670             log.debug( "getMatching [" + pattern + "] from cache [" + cacheName + "] requesterId = [" + requesterId
671                 + "] fromCluster = " + fromCluster );
672         }
673 
674         CacheListeners<K, V> cacheDesc = null;
675         try
676         {
677             cacheDesc = getCacheListeners( cacheName );
678         }
679         catch ( Exception e )
680         {
681             log.error( "Problem getting listeners.", e );
682 
683             if ( cacheEventLogger != null )
684             {
685                 cacheEventLogger.logError( "RemoteCacheServer", ICacheEventLogger.GETMATCHING_EVENT, e.getMessage()
686                     + cacheName + " pattern: " + pattern );
687             }
688         }
689 
690         return getMatchingFromCacheListeners( pattern, fromCluster, cacheDesc );
691     }
692 
693     /**
694      * Gets the item from the associated cache listeners.
695      * <p>
696      * @param pattern
697      * @param fromCluster
698      * @param cacheDesc
699      * @return Map of keys to results
700      */
701     private Map<K, ICacheElement<K, V>> getMatchingFromCacheListeners( String pattern, boolean fromCluster, CacheListeners<K, V> cacheDesc )
702     {
703         Map<K, ICacheElement<K, V>> elements = null;
704         if ( cacheDesc != null )
705         {
706             CompositeCache<K, V> c = (CompositeCache<K, V>) cacheDesc.cache;
707 
708             // We always want to go remote and then merge the items.  But this can lead to inconsistencies after
709             // failover recovery.  Removed items may show up.  There is no good way to prevent this.
710             // We should make it configurable.
711 
712             if ( !fromCluster && this.remoteCacheServerAttributes.isAllowClusterGet() )
713             {
714                 if ( log.isDebugEnabled() )
715                 {
716                     log.debug( "NonLocalGetMatching. fromCluster [" + fromCluster + "] AllowClusterGet ["
717                         + this.remoteCacheServerAttributes.isAllowClusterGet() + "]" );
718                 }
719                 elements = c.getMatching( pattern );
720             }
721             else
722             {
723                 // Gets from cluster type remote will end up here.
724                 // Gets from all clients will end up here if allow cluster get is
725                 // false.
726 
727                 if ( log.isDebugEnabled() )
728                 {
729                     log.debug( "LocalGetMatching.  fromCluster [" + fromCluster + "] AllowClusterGet ["
730                         + this.remoteCacheServerAttributes.isAllowClusterGet() + "]" );
731                 }
732                 elements = c.localGetMatching( pattern );
733             }
734         }
735         return elements;
736     }
737 
738     /**
739      * Gets multiple items from the cache based on the given set of keys.
740      * <p>
741      * @param cacheName
742      * @param keys
743      * @return a map of K key to ICacheElement&lt;K, V&gt; element, or an empty map if there is no
744      *         data in cache for any of these keys
745      * @throws IOException
746      */
747     @Override
748     public Map<K, ICacheElement<K, V>> getMultiple( String cacheName, Set<K> keys )
749         throws IOException
750     {
751         return this.getMultiple( cacheName, keys, 0 );
752     }
753 
754     /**
755      * Gets multiple items from the cache based on the given set of keys.
756      * <p>
757      * The internal processing is wrapped in event logging calls.
758      * <p>
759      * @param cacheName
760      * @param keys
761      * @param requesterId
762      * @return a map of K key to ICacheElement&lt;K, V&gt; element, or an empty map if there is no
763      *         data in cache for any of these keys
764      * @throws IOException
765      */
766     @Override
767     public Map<K, ICacheElement<K, V>> getMultiple( String cacheName, Set<K> keys, long requesterId )
768         throws IOException
769     {
770         ICacheEvent<Serializable> cacheEvent = createICacheEvent( cacheName, (Serializable) keys, requesterId,
771                                                     ICacheEventLogger.GETMULTIPLE_EVENT );
772         try
773         {
774             return processGetMultiple( cacheName, keys, requesterId );
775         }
776         finally
777         {
778             logICacheEvent( cacheEvent );
779         }
780     }
781 
782     /**
783      * Gets multiple items from the cache based on the given set of keys.
784      * <p>
785      * @param cacheName
786      * @param keys
787      * @param requesterId
788      * @return a map of K key to ICacheElement&lt;K, V&gt; element, or an empty map if there is no
789      *         data in cache for any of these keys
790      */
791     private Map<K, ICacheElement<K, V>> processGetMultiple( String cacheName, Set<K> keys, long requesterId )
792     {
793         boolean fromCluster = isRequestFromCluster( requesterId );
794 
795         if ( log.isDebugEnabled() )
796         {
797             log.debug( "getMultiple [" + keys + "] from cache [" + cacheName + "] requesterId = [" + requesterId
798                 + "] fromCluster = " + fromCluster );
799         }
800 
801         CacheListeners<K, V> cacheDesc = getCacheListeners( cacheName );
802         Map<K, ICacheElement<K, V>> elements = getMultipleFromCacheListeners( keys, null, fromCluster, cacheDesc );
803         return elements;
804     }
805 
806     /**
807      * Since a non-receiving remote cache client will not register a listener, it will not have a
808      * listener id assigned from the server. As such the remote server cannot determine if it is a
809      * cluster or a normal client. It will assume that it is a normal client.
810      * <p>
811      * @param requesterId
812      * @return true is from a cluster.
813      */
814     private boolean isRequestFromCluster( long requesterId )
815     {
816         RemoteType remoteTypeL = idTypeMap.get( Long.valueOf( requesterId ) );
817         return remoteTypeL == RemoteType.CLUSTER;
818     }
819 
820     /**
821      * Gets the items from the associated cache listeners.
822      * <p>
823      * @param keys
824      * @param elements
825      * @param fromCluster
826      * @param cacheDesc
827      * @return Map
828      */
829     private Map<K, ICacheElement<K, V>> getMultipleFromCacheListeners( Set<K> keys, Map<K, ICacheElement<K, V>> elements, boolean fromCluster, CacheListeners<K, V> cacheDesc )
830     {
831         Map<K, ICacheElement<K, V>> returnElements = elements;
832 
833         if ( cacheDesc != null )
834         {
835             CompositeCache<K, V> c = (CompositeCache<K, V>) cacheDesc.cache;
836 
837             // If we have a getMultiple come in from a client and we don't have the item
838             // locally, we will allow the cache to look in other non local sources,
839             // such as a remote cache or a lateral.
840             //
841             // Since remote servers never get from clients and clients never go
842             // remote from a remote call, this
843             // will not result in any loops.
844             //
845             // This is the only instance I can think of where we allow a remote get
846             // from a remote call. The purpose is to allow remote cache servers to
847             // talk to each other. If one goes down, you want it to be able to get
848             // data from those that were up when the failed server comes back on
849             // line.
850 
851             if ( !fromCluster && this.remoteCacheServerAttributes.isAllowClusterGet() )
852             {
853                 if ( log.isDebugEnabled() )
854                 {
855                     log.debug( "NonLocalGetMultiple. fromCluster [" + fromCluster + "] AllowClusterGet ["
856                         + this.remoteCacheServerAttributes.isAllowClusterGet() + "]" );
857                 }
858 
859                 returnElements = c.getMultiple( keys );
860             }
861             else
862             {
863                 // Gets from cluster type remote will end up here.
864                 // Gets from all clients will end up here if allow cluster get is
865                 // false.
866 
867                 if ( log.isDebugEnabled() )
868                 {
869                     log.debug( "LocalGetMultiple.  fromCluster [" + fromCluster + "] AllowClusterGet ["
870                         + this.remoteCacheServerAttributes.isAllowClusterGet() + "]" );
871                 }
872 
873                 returnElements = c.localGetMultiple( keys );
874             }
875         }
876 
877         return returnElements;
878     }
879 
880     /**
881      * Return the keys in the cache.
882      * <p>
883      * @param cacheName the name of the cache region
884      * @see org.apache.commons.jcs.auxiliary.AuxiliaryCache#getKeySet()
885      */
886     @Override
887     public Set<K> getKeySet(String cacheName) throws IOException
888     {
889         return processGetKeySet( cacheName );
890     }
891 
892     /**
893      * Gets the set of keys of objects currently in the cache.
894      * <p>
895      * @param cacheName
896      * @return Set
897      */
898     protected Set<K> processGetKeySet( String cacheName )
899     {
900         CacheListeners<K, V> cacheDesc = null;
901         try
902         {
903             cacheDesc = getCacheListeners( cacheName );
904         }
905         catch ( Exception e )
906         {
907             log.error( "Problem getting listeners.", e );
908         }
909 
910         if ( cacheDesc == null )
911         {
912             return Collections.emptySet();
913         }
914 
915         CompositeCache<K, V> c = (CompositeCache<K, V>) cacheDesc.cache;
916         return c.getKeySet();
917     }
918 
919     /**
920      * Removes the given key from the specified remote cache. Defaults the listener id to 0.
921      * <p>
922      * @param cacheName
923      * @param key
924      * @throws IOException
925      */
926     @Override
927     public void remove( String cacheName, K key )
928         throws IOException
929     {
930         remove( cacheName, key, 0 );
931     }
932 
933     /**
934      * Remove the key from the cache region and don't tell the source listener about it.
935      * <p>
936      * The internal processing is wrapped in event logging calls.
937      * <p>
938      * @param cacheName
939      * @param key
940      * @param requesterId
941      * @throws IOException
942      */
943     @Override
944     public void remove( String cacheName, K key, long requesterId )
945         throws IOException
946     {
947         ICacheEvent<K> cacheEvent = createICacheEvent( cacheName, key, requesterId, ICacheEventLogger.REMOVE_EVENT );
948         try
949         {
950             processRemove( cacheName, key, requesterId );
951         }
952         finally
953         {
954             logICacheEvent( cacheEvent );
955         }
956     }
957 
958     /**
959      * Remove the key from the cache region and don't tell the source listener about it.
960      * <p>
961      * @param cacheName
962      * @param key
963      * @param requesterId
964      * @throws IOException
965      */
966     private void processRemove( String cacheName, K key, long requesterId )
967         throws IOException
968     {
969         if ( log.isDebugEnabled() )
970         {
971             log.debug( "remove [" + key + "] from cache [" + cacheName + "]" );
972         }
973 
974         CacheListeners<K, V> cacheDesc = cacheListenersMap.get( cacheName );
975 
976         boolean fromCluster = isRequestFromCluster( requesterId );
977 
978         if ( cacheDesc != null )
979         {
980             // best attempt to achieve ordered cache item removal and
981             // notification.
982             synchronized ( cacheDesc )
983             {
984                 boolean removeSuccess = false;
985 
986                 // No need to notify if it was not cached.
987                 CompositeCache<K, V> c = (CompositeCache<K, V>) cacheDesc.cache;
988 
989                 if ( fromCluster )
990                 {
991                     if ( log.isDebugEnabled() )
992                     {
993                         log.debug( "Remove FROM cluster, NOT updating other auxiliaries for region" );
994                     }
995                     removeSuccess = c.localRemove( key );
996                 }
997                 else
998                 {
999                     if ( log.isDebugEnabled() )
1000                     {
1001                         log.debug( "Remove NOT from cluster, updating other auxiliaries for region" );
1002                     }
1003                     removeSuccess = c.remove( key );
1004                 }
1005 
1006                 if ( log.isDebugEnabled() )
1007                 {
1008                     log.debug( "remove [" + key + "] from cache [" + cacheName + "] success (was it found) = "
1009                         + removeSuccess );
1010                 }
1011 
1012                 // UPDATE LOCALS IF A REQUEST COMES FROM A CLUSTER
1013                 // IF LOCAL CLUSTER CONSISTENCY IS CONFIGURED
1014                 if ( !fromCluster || ( fromCluster && remoteCacheServerAttributes.isLocalClusterConsistency() ) )
1015                 {
1016                     ICacheEventQueue<K, V>[] qlist = getEventQList( cacheDesc, requesterId );
1017 
1018                     for ( int i = 0; i < qlist.length; i++ )
1019                     {
1020                         qlist[i].addRemoveEvent( key );
1021                     }
1022                 }
1023             }
1024         }
1025     }
1026 
1027     /**
1028      * Remove all keys from the specified remote cache.
1029      * <p>
1030      * @param cacheName
1031      * @throws IOException
1032      */
1033     @Override
1034     public void removeAll( String cacheName )
1035         throws IOException
1036     {
1037         removeAll( cacheName, 0 );
1038     }
1039 
1040     /**
1041      * Remove all keys from the specified remote cache.
1042      * <p>
1043      * The internal processing is wrapped in event logging calls.
1044      * <p>
1045      * @param cacheName
1046      * @param requesterId
1047      * @throws IOException
1048      */
1049     @Override
1050     public void removeAll( String cacheName, long requesterId )
1051         throws IOException
1052     {
1053         ICacheEvent<String> cacheEvent = createICacheEvent( cacheName, "all", requesterId, ICacheEventLogger.REMOVEALL_EVENT );
1054         try
1055         {
1056             processRemoveAll( cacheName, requesterId );
1057         }
1058         finally
1059         {
1060             logICacheEvent( cacheEvent );
1061         }
1062     }
1063 
1064     /**
1065      * Remove all keys from the specified remote cache.
1066      * <p>
1067      * @param cacheName
1068      * @param requesterId
1069      * @throws IOException
1070      */
1071     private void processRemoveAll( String cacheName, long requesterId )
1072         throws IOException
1073     {
1074         CacheListeners<K, V> cacheDesc = cacheListenersMap.get( cacheName );
1075 
1076         boolean fromCluster = isRequestFromCluster( requesterId );
1077 
1078         if ( cacheDesc != null )
1079         {
1080             // best attempt to achieve ordered cache item removal and
1081             // notification.
1082             synchronized ( cacheDesc )
1083             {
1084                 // No need to broadcast, or notify if it was not cached.
1085                 CompositeCache<K, V> c = (CompositeCache<K, V>) cacheDesc.cache;
1086 
1087                 if ( fromCluster )
1088                 {
1089                     if ( log.isDebugEnabled() )
1090                     {
1091                         log.debug( "RemoveALL FROM cluster, NOT updating other auxiliaries for region" );
1092                     }
1093                     c.localRemoveAll();
1094                 }
1095                 else
1096                 {
1097                     if ( log.isDebugEnabled() )
1098                     {
1099                         log.debug( "RemoveALL NOT from cluster, updating other auxiliaries for region" );
1100                     }
1101                     c.removeAll();
1102                 }
1103 
1104                 // update registered listeners
1105                 if ( !fromCluster || ( fromCluster && remoteCacheServerAttributes.isLocalClusterConsistency() ) )
1106                 {
1107                     ICacheEventQueue<K, V>[] qlist = getEventQList( cacheDesc, requesterId );
1108 
1109                     for ( int i = 0; i < qlist.length; i++ )
1110                     {
1111                         qlist[i].addRemoveAllEvent();
1112                     }
1113                 }
1114             }
1115         }
1116     }
1117 
1118     /**
1119      * How many put events have we received.
1120      * <p>
1121      * @return puts
1122      */
1123     // Currently only intended for use by unit tests
1124     int getPutCount()
1125     {
1126         return puts;
1127     }
1128 
1129     /**
1130      * Frees the specified remote cache.
1131      * <p>
1132      * @param cacheName
1133      * @throws IOException
1134      */
1135     @Override
1136     public void dispose( String cacheName )
1137         throws IOException
1138     {
1139         dispose( cacheName, 0 );
1140     }
1141 
1142     /**
1143      * Frees the specified remote cache.
1144      * <p>
1145      * @param cacheName
1146      * @param requesterId
1147      * @throws IOException
1148      */
1149     public void dispose( String cacheName, long requesterId )
1150         throws IOException
1151     {
1152         ICacheEvent<String> cacheEvent = createICacheEvent( cacheName, "none", requesterId, ICacheEventLogger.DISPOSE_EVENT );
1153         try
1154         {
1155             processDispose( cacheName, requesterId );
1156         }
1157         finally
1158         {
1159             logICacheEvent( cacheEvent );
1160         }
1161     }
1162 
1163     /**
1164      * @param cacheName
1165      * @param requesterId
1166      * @throws IOException
1167      */
1168     private void processDispose( String cacheName, long requesterId )
1169         throws IOException
1170     {
1171         if ( log.isInfoEnabled() )
1172         {
1173             log.info( "Dispose request received from listener [" + requesterId + "]" );
1174         }
1175 
1176         CacheListeners<K, V> cacheDesc = cacheListenersMap.get( cacheName );
1177 
1178         // this is dangerous
1179         if ( cacheDesc != null )
1180         {
1181             // best attempt to achieve ordered free-cache-op and notification.
1182             synchronized ( cacheDesc )
1183             {
1184                 ICacheEventQueue<K, V>[] qlist = getEventQList( cacheDesc, requesterId );
1185 
1186                 for ( int i = 0; i < qlist.length; i++ )
1187                 {
1188                     qlist[i].addDisposeEvent();
1189                 }
1190                 cacheManager.freeCache( cacheName );
1191             }
1192         }
1193     }
1194 
1195     /**
1196      * Frees all remote caches.
1197      * <p>
1198      * @throws IOException
1199      */
1200     @Override
1201     public void release()
1202         throws IOException
1203     {
1204         for (CacheListeners<K, V> cacheDesc : cacheListenersMap.values())
1205         {
1206             ICacheEventQueue<K, V>[] qlist = getEventQList( cacheDesc, 0 );
1207 
1208             for ( int i = 0; i < qlist.length; i++ )
1209             {
1210                 qlist[i].addDisposeEvent();
1211             }
1212         }
1213         cacheManager.release();
1214     }
1215 
1216     /**
1217      * Returns the cache listener for the specified cache. Creates the cache and the cache
1218      * descriptor if they do not already exist.
1219      * <p>
1220      * @param cacheName
1221      * @return The cacheListeners value
1222      */
1223     protected CacheListeners<K, V> getCacheListeners( String cacheName )
1224     {
1225         CacheListeners<K, V> cacheListeners = cacheListenersMap.get( cacheName );
1226 
1227         if ( cacheListeners == null )
1228         {
1229             cacheListenersLock.lock();
1230 
1231             try
1232             {
1233                 // double check
1234                 cacheListeners = cacheListenersMap.get( cacheName );
1235                 if ( cacheListeners == null )
1236                 {
1237                     CompositeCache<K, V> cache = cacheManager.getCache( cacheName );
1238                     cacheListeners = new CacheListeners<K, V>( cache );
1239                     cacheListenersMap.put( cacheName, cacheListeners );
1240                 }
1241             }
1242             finally
1243             {
1244                 cacheListenersLock.unlock();
1245             }
1246         }
1247 
1248         return cacheListeners;
1249     }
1250 
1251     /**
1252      * Gets the clusterListeners attribute of the RemoteCacheServer object.
1253      * <p>
1254      * TODO may be able to remove this
1255      * @param cacheName
1256      * @return The clusterListeners value
1257      */
1258     protected CacheListeners<K, V> getClusterListeners( String cacheName )
1259     {
1260         CacheListeners<K, V> cacheListeners = clusterListenersMap.get( cacheName );
1261 
1262         if ( cacheListeners == null )
1263         {
1264             clusterListenersLock.lock();
1265 
1266             try
1267             {
1268                 cacheListeners = clusterListenersMap.get( cacheName );
1269                 if ( cacheListeners == null )
1270                 {
1271                     CompositeCache<K, V> cache = cacheManager.getCache( cacheName );
1272                     cacheListeners = new CacheListeners<K, V>( cache );
1273                     clusterListenersMap.put( cacheName, cacheListeners );
1274                 }
1275             }
1276             finally
1277             {
1278                 clusterListenersLock.unlock();
1279             }
1280         }
1281 
1282         return cacheListeners;
1283     }
1284 
1285     /**
1286      * Gets the eventQList attribute of the RemoteCacheServer object. This returns the event queues
1287      * stored in the cacheListeners object for a particular region, if the queue is not for this
1288      * requester.
1289      * <p>
1290      * Basically, this makes sure that a request from a particular local cache, identified by its
1291      * listener id, does not result in a call to that same listener.
1292      * <p>
1293      * @param cacheListeners
1294      * @param requesterId
1295      * @return The eventQList value
1296      */
1297     @SuppressWarnings("unchecked") // No generic arrays in java
1298     private ICacheEventQueue<K, V>[] getEventQList( CacheListeners<K, V> cacheListeners, long requesterId )
1299     {
1300         ICacheEventQueue<K, V>[] list = cacheListeners.eventQMap.values().toArray( new ICacheEventQueue[0] );
1301         int count = 0;
1302         // Set those not qualified to null; Count those qualified.
1303         for ( int i = 0; i < list.length; i++ )
1304         {
1305             ICacheEventQueue<K, V> q = list[i];
1306             if ( q.isWorking() && q.getListenerId() != requesterId )
1307             {
1308                 count++;
1309             }
1310             else
1311             {
1312                 list[i] = null;
1313             }
1314         }
1315         if ( count == list.length )
1316         {
1317             // All qualified.
1318             return list;
1319         }
1320 
1321         // Returns only the qualified.
1322         ICacheEventQueue<K, V>[] qq = new ICacheEventQueue[count];
1323         count = 0;
1324         for ( int i = 0; i < list.length; i++ )
1325         {
1326             if ( list[i] != null )
1327             {
1328                 qq[count++] = list[i];
1329             }
1330         }
1331         return qq;
1332     }
1333 
1334     /**
1335      * Removes dead event queues. Should clean out deregistered listeners.
1336      * <p>
1337      * @param eventQMap
1338      */
1339     private static <KK, VV> void cleanupEventQMap( Map<Long, ICacheEventQueue<KK, VV>> eventQMap )
1340     {
1341         synchronized ( eventQMap )
1342         {
1343             for (Iterator<Map.Entry<Long, ICacheEventQueue<KK, VV>>> itr = eventQMap.entrySet().iterator(); itr.hasNext(); )
1344             {
1345                 Map.Entry<Long, ICacheEventQueue<KK, VV>> e = itr.next();
1346                 ICacheEventQueue<KK, VV> q = e.getValue();
1347 
1348                 // this does not care if the q is alive (i.e. if
1349                 // there are active threads; it cares if the queue
1350                 // is working -- if it has not encountered errors
1351                 // above the failure threshold
1352                 if ( !q.isWorking() )
1353                 {
1354                     itr.remove();
1355                     log.warn( "Cache event queue " + q + " is not working and removed from cache server." );
1356                 }
1357             }
1358         }
1359     }
1360 
1361     /**
1362      * Subscribes to the specified remote cache.
1363      * <p>
1364      * If the client id is 0, then the remote cache server will increment it's local count and
1365      * assign an id to the client.
1366      * <p>
1367      * @param cacheName the specified remote cache.
1368      * @param listener object to notify for cache changes. must be synchronized since there are
1369      *            remote calls involved.
1370      * @throws IOException
1371      */
1372     @Override
1373     @SuppressWarnings("unchecked") // Need to cast to specific return type from getClusterListeners()
1374     public <KK, VV> void addCacheListener( String cacheName, ICacheListener<KK, VV> listener )
1375         throws IOException
1376     {
1377         if ( cacheName == null || listener == null )
1378         {
1379             throw new IllegalArgumentException( "cacheName and listener must not be null" );
1380         }
1381         CacheListeners<KK, VV> cacheListeners;
1382 
1383         IRemoteCacheListener<KK, VV> ircl = (IRemoteCacheListener<KK, VV>) listener;
1384 
1385         String listenerAddress = ircl.getLocalHostAddress();
1386 
1387         RemoteType remoteType = ircl.getRemoteType();
1388         if ( remoteType == RemoteType.CLUSTER )
1389         {
1390             log.debug( "adding cluster listener, listenerAddress [" + listenerAddress + "]" );
1391             cacheListeners = (CacheListeners<KK, VV>)getClusterListeners( cacheName );
1392         }
1393         else
1394         {
1395             log.debug( "adding normal listener, listenerAddress [" + listenerAddress + "]" );
1396             cacheListeners = (CacheListeners<KK, VV>)getCacheListeners( cacheName );
1397         }
1398         Map<Long, ICacheEventQueue<KK, VV>> eventQMap = cacheListeners.eventQMap;
1399         cleanupEventQMap( eventQMap );
1400 
1401         // synchronized ( listenerId )
1402         synchronized ( ICacheListener.class )
1403         {
1404             long id = 0;
1405             try
1406             {
1407                 id = listener.getListenerId();
1408                 // clients probably shouldn't do this.
1409                 if ( id == 0 )
1410                 {
1411                     // must start at one so the next gets recognized
1412                     long listenerIdB = nextListenerId();
1413                     if ( log.isDebugEnabled() )
1414                     {
1415                         log.debug( "listener id=" + ( listenerIdB & 0xff ) + " addded for cache [" + cacheName
1416                             + "], listenerAddress [" + listenerAddress + "]" );
1417                     }
1418                     listener.setListenerId( listenerIdB );
1419                     id = listenerIdB;
1420 
1421                     // in case it needs synchronization
1422                     String message = "Adding vm listener under new id = [" + listenerIdB + "], listenerAddress ["
1423                         + listenerAddress + "]";
1424                     logApplicationEvent( "RemoteCacheServer", "addCacheListener", message );
1425                     if ( log.isInfoEnabled() )
1426                     {
1427                         log.info( message );
1428                     }
1429                 }
1430                 else
1431                 {
1432                     String message = "Adding listener under existing id = [" + id + "], listenerAddress ["
1433                         + listenerAddress + "]";
1434                     logApplicationEvent( "RemoteCacheServer", "addCacheListener", message );
1435                     if ( log.isInfoEnabled() )
1436                     {
1437                         log.info( message );
1438                     }
1439                     // should confirm the the host is the same as we have on
1440                     // record, just in case a client has made a mistake.
1441                 }
1442 
1443                 // relate the type to an id
1444                 this.idTypeMap.put( Long.valueOf( id ), remoteType);
1445                 if ( listenerAddress != null )
1446                 {
1447                     this.idIPMap.put( Long.valueOf( id ), listenerAddress );
1448                 }
1449             }
1450             catch ( IOException ioe )
1451             {
1452                 String message = "Problem setting listener id, listenerAddress [" + listenerAddress + "]";
1453                 log.error( message, ioe );
1454 
1455                 if ( cacheEventLogger != null )
1456                 {
1457                     cacheEventLogger.logError( "RemoteCacheServer", "addCacheListener", message + " - "
1458                         + ioe.getMessage() );
1459                 }
1460             }
1461 
1462             CacheEventQueueFactory<KK, VV> fact = new CacheEventQueueFactory<KK, VV>();
1463             ICacheEventQueue<KK, VV> q = fact.createCacheEventQueue( listener, id, cacheName, remoteCacheServerAttributes
1464                 .getEventQueuePoolName(), remoteCacheServerAttributes.getEventQueueType() );
1465 
1466             eventQMap.put(Long.valueOf(listener.getListenerId()), q);
1467 
1468             if ( log.isInfoEnabled() )
1469             {
1470                 log.info( cacheListeners );
1471             }
1472         }
1473     }
1474 
1475     /**
1476      * Subscribes to all remote caches.
1477      * <p>
1478      * @param listener The feature to be added to the CacheListener attribute
1479      * @throws IOException
1480      */
1481     @Override
1482     public <KK, VV> void addCacheListener( ICacheListener<KK, VV> listener )
1483         throws IOException
1484     {
1485         for (String cacheName : cacheListenersMap.keySet())
1486         {
1487             addCacheListener( cacheName, listener );
1488 
1489             if ( log.isDebugEnabled() )
1490             {
1491                 log.debug( "Adding listener for cache [" + cacheName + "]" );
1492             }
1493         }
1494     }
1495 
1496     /**
1497      * Unsubscribe this listener from this region. If the listener is registered, it will be removed
1498      * from the event queue map list.
1499      * <p>
1500      * @param cacheName
1501      * @param listener
1502      * @throws IOException
1503      */
1504     @Override
1505     public <KK, VV> void removeCacheListener( String cacheName, ICacheListener<KK, VV> listener )
1506         throws IOException
1507     {
1508         removeCacheListener( cacheName, listener.getListenerId() );
1509     }
1510 
1511     /**
1512      * Unsubscribe this listener from this region. If the listener is registered, it will be removed
1513      * from the event queue map list.
1514      * <p>
1515      * @param cacheName
1516      * @param listenerId
1517      */
1518     public void removeCacheListener( String cacheName, long listenerId )
1519     {
1520         String message = "Removing listener for cache region = [" + cacheName + "] and listenerId [" + listenerId + "]";
1521         logApplicationEvent( "RemoteCacheServer", "removeCacheListener", message );
1522         if ( log.isInfoEnabled() )
1523         {
1524             log.info( message );
1525         }
1526 
1527         boolean isClusterListener = isRequestFromCluster( listenerId );
1528 
1529         CacheListeners<K, V> cacheDesc = null;
1530 
1531         if ( isClusterListener )
1532         {
1533             cacheDesc = getClusterListeners( cacheName );
1534         }
1535         else
1536         {
1537             cacheDesc = getCacheListeners( cacheName );
1538         }
1539         Map<Long, ICacheEventQueue<K, V>> eventQMap = cacheDesc.eventQMap;
1540         cleanupEventQMap( eventQMap );
1541         ICacheEventQueue<K, V> q = eventQMap.remove( Long.valueOf( listenerId ) );
1542 
1543         if ( q != null )
1544         {
1545             if ( log.isDebugEnabled() )
1546             {
1547                 log.debug( "Found queue for cache region = [" + cacheName + "] and listenerId  [" + listenerId + "]" );
1548             }
1549             q.destroy();
1550             cleanupEventQMap( eventQMap );
1551         }
1552         else
1553         {
1554             if ( log.isDebugEnabled() )
1555             {
1556                 log.debug( "Did not find queue for cache region = [" + cacheName + "] and listenerId [" + listenerId
1557                     + "]" );
1558             }
1559         }
1560 
1561         // cleanup
1562         idTypeMap.remove( Long.valueOf( listenerId ) );
1563         idIPMap.remove( Long.valueOf( listenerId ) );
1564 
1565         if ( log.isInfoEnabled() )
1566         {
1567             log.info( "After removing listener [" + listenerId + "] cache region " + cacheName + "'s listener size ["
1568                 + cacheDesc.eventQMap.size() + "]" );
1569         }
1570     }
1571 
1572     /**
1573      * Unsubscribes from all remote caches.
1574      * <p>
1575      * @param listener
1576      * @throws IOException
1577      */
1578     @Override
1579     public <KK, VV> void removeCacheListener( ICacheListener<KK, VV> listener )
1580         throws IOException
1581     {
1582         for (String cacheName : cacheListenersMap.keySet())
1583         {
1584             removeCacheListener( cacheName, listener );
1585 
1586             if ( log.isInfoEnabled() )
1587             {
1588                 log.info( "Removing listener for cache [" + cacheName + "]" );
1589             }
1590         }
1591     }
1592 
1593     /**
1594      * Shuts down the remote server.
1595      * <p>
1596      * @throws IOException
1597      */
1598     @Override
1599     public void shutdown()
1600         throws IOException
1601     {
1602         shutdown("", Registry.REGISTRY_PORT);
1603     }
1604 
1605     /**
1606      * Shuts down a server at a particular host and port. Then it calls shutdown on the cache
1607      * itself.
1608      * <p>
1609      * @param host
1610      * @param port
1611      * @throws IOException
1612      */
1613     @Override
1614     public void shutdown( String host, int port )
1615         throws IOException
1616     {
1617         if ( log.isInfoEnabled() )
1618         {
1619             log.info( "Received shutdown request. Shutting down server." );
1620         }
1621 
1622         synchronized (listenerId)
1623         {
1624             for (String cacheName : cacheListenersMap.keySet())
1625             {
1626                 for (int i = 0; i <= listenerId[0]; i++)
1627                 {
1628                     removeCacheListener( cacheName, i );
1629                 }
1630 
1631                 if ( log.isInfoEnabled() )
1632                 {
1633                     log.info( "Removing listener for cache [" + cacheName + "]" );
1634                 }
1635             }
1636 
1637             cacheListenersMap.clear();
1638             clusterListenersMap.clear();
1639         }
1640         RemoteCacheServerFactory.shutdownImpl( host, port );
1641         this.cacheManager.shutDown();
1642     }
1643 
1644     /**
1645      * Called by the RMI runtime sometime after the runtime determines that the reference list, the
1646      * list of clients referencing the remote object, becomes empty.
1647      */
1648     // TODO: test out the DGC.
1649     @Override
1650     public void unreferenced()
1651     {
1652         if ( log.isInfoEnabled() )
1653         {
1654             log.info( "*** Server now unreferenced and subject to GC. ***" );
1655         }
1656     }
1657 
1658     /**
1659      * Returns the next generated listener id [0,255].
1660      * <p>
1661      * @return the listener id of a client. This should be unique for this server.
1662      */
1663     private long nextListenerId()
1664     {
1665         long id = 0;
1666         if ( listenerId[0] == Integer.MAX_VALUE )
1667         {
1668             synchronized ( listenerId )
1669             {
1670                 id = listenerId[0];
1671                 listenerId[0] = 0;
1672                 // TODO: record & check if the generated id is currently being
1673                 // used by a valid listener. Currently if the id wraps after
1674                 // Long.MAX_VALUE,
1675                 // we just assume it won't collide with an existing listener who
1676                 // is live.
1677             }
1678         }
1679         else
1680         {
1681             synchronized ( listenerId )
1682             {
1683                 id = ++listenerId[0];
1684             }
1685         }
1686         return id;
1687     }
1688 
1689     /**
1690      * Gets the stats attribute of the RemoteCacheServer object.
1691      * <p>
1692      * @return The stats value
1693      * @throws IOException
1694      */
1695     @Override
1696     public String getStats()
1697         throws IOException
1698     {
1699         return cacheManager.getStats();
1700     }
1701 
1702     /**
1703      * Logs an event if an event logger is configured.
1704      * <p>
1705      * @param item
1706      * @param requesterId
1707      * @param eventName
1708      * @return ICacheEvent
1709      */
1710     private ICacheEvent<ICacheElement<K, V>> createICacheEvent( ICacheElement<K, V> item, long requesterId, String eventName )
1711     {
1712         if ( cacheEventLogger == null )
1713         {
1714             return new CacheEvent<ICacheElement<K, V>>();
1715         }
1716         String ipAddress = getExtraInfoForRequesterId( requesterId );
1717         return cacheEventLogger
1718             .createICacheEvent( "RemoteCacheServer", item.getCacheName(), eventName, ipAddress, item );
1719     }
1720 
1721     /**
1722      * Logs an event if an event logger is configured.
1723      * <p>
1724      * @param cacheName
1725      * @param key
1726      * @param requesterId
1727      * @param eventName
1728      * @return ICacheEvent
1729      */
1730     private <T> ICacheEvent<T> createICacheEvent( String cacheName, T key, long requesterId, String eventName )
1731     {
1732         if ( cacheEventLogger == null )
1733         {
1734             return new CacheEvent<T>();
1735         }
1736         String ipAddress = getExtraInfoForRequesterId( requesterId );
1737         return cacheEventLogger.createICacheEvent( "RemoteCacheServer", cacheName, eventName, ipAddress, key );
1738     }
1739 
1740     /**
1741      * Logs an event if an event logger is configured.
1742      * <p>
1743      * @param source
1744      * @param eventName
1745      * @param optionalDetails
1746      */
1747     protected void logApplicationEvent( String source, String eventName, String optionalDetails )
1748     {
1749         if ( cacheEventLogger != null )
1750         {
1751             cacheEventLogger.logApplicationEvent( source, eventName, optionalDetails );
1752         }
1753     }
1754 
1755     /**
1756      * Logs an event if an event logger is configured.
1757      * <p>
1758      * @param cacheEvent
1759      */
1760     protected <T> void logICacheEvent( ICacheEvent<T> cacheEvent )
1761     {
1762         if ( cacheEventLogger != null )
1763         {
1764             cacheEventLogger.logICacheEvent( cacheEvent );
1765         }
1766     }
1767 
1768     /**
1769      * Ip address for the client, if one is stored.
1770      * <p>
1771      * Protected for testing.
1772      * <p>
1773      * @param requesterId
1774      * @return String
1775      */
1776     protected String getExtraInfoForRequesterId( long requesterId )
1777     {
1778         String ipAddress = idIPMap.get( Long.valueOf( requesterId ) );
1779         return ipAddress;
1780     }
1781 
1782     /**
1783      * Allows it to be injected.
1784      * <p>
1785      * @param cacheEventLogger
1786      */
1787     public void setCacheEventLogger( ICacheEventLogger cacheEventLogger )
1788     {
1789         this.cacheEventLogger = cacheEventLogger;
1790     }
1791 }