View Javadoc

1   package org.apache.jcs.auxiliary.remote.server;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
5    * agreements. See the NOTICE file distributed with this work for additional information regarding
6    * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance with the License. You may obtain a
8    * copy of the License at
9    *
10   * http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software distributed under the License
13   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
14   * or implied. See the License for the specific language governing permissions and limitations under
15   * the License.
16   */
17  
18  import java.io.IOException;
19  import java.io.Serializable;
20  import java.rmi.RemoteException;
21  import java.rmi.registry.Registry;
22  import java.rmi.server.RMISocketFactory;
23  import java.rmi.server.UnicastRemoteObject;
24  import java.rmi.server.Unreferenced;
25  import java.util.Collections;
26  import java.util.Hashtable;
27  import java.util.Iterator;
28  import java.util.Map;
29  import java.util.Set;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.jcs.access.exception.CacheException;
34  import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheListener;
35  import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheObserver;
36  import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheServiceAdmin;
37  import org.apache.jcs.auxiliary.remote.server.behavior.IRemoteCacheServerAttributes;
38  import org.apache.jcs.auxiliary.remote.server.behavior.RemoteType;
39  import org.apache.jcs.engine.CacheEventQueueFactory;
40  import org.apache.jcs.engine.CacheListeners;
41  import org.apache.jcs.engine.behavior.ICacheElement;
42  import org.apache.jcs.engine.behavior.ICacheEventQueue;
43  import org.apache.jcs.engine.behavior.ICacheListener;
44  import org.apache.jcs.engine.behavior.ICacheServiceNonLocal;
45  import org.apache.jcs.engine.control.CompositeCache;
46  import org.apache.jcs.engine.control.CompositeCacheManager;
47  import org.apache.jcs.engine.logging.CacheEvent;
48  import org.apache.jcs.engine.logging.behavior.ICacheEvent;
49  import org.apache.jcs.engine.logging.behavior.ICacheEventLogger;
50  
51  /**
52   * This class provides remote cache services. The remote cache server propagates events from local
53   * caches to other local caches. It can also store cached data, making it available to new clients.
54   * <p>
55   * Remote cache servers can be clustered. If the cache used by this remote cache is configured to
56   * use a remote cache of type cluster, the two remote caches will communicate with each other.
57   * Remote and put requests can be sent from one remote to another. If they are configured to
58   * broadcast such event to their client, then remove an puts can be sent to all locals in the
59   * cluster.
60   * <p>
61   * Get requests are made between clustered servers if AllowClusterGet is true. You can setup several
62   * clients to use one remote server and several to use another. The get local will be distributed
63   * between the two servers. Since caches are usually high get and low put, this should allow you to
64   * scale.
65   */
66  public class RemoteCacheServer<K extends Serializable, V extends Serializable>
67      extends UnicastRemoteObject
68      implements ICacheServiceNonLocal<K, V>, IRemoteCacheObserver, IRemoteCacheServiceAdmin, Unreferenced
69  {
70      /** For serialization. Don't change. */
71      private static final long serialVersionUID = -8072345435941473116L;
72  
73      /** log instance */
74      private final static Log log = LogFactory.getLog( RemoteCacheServer.class );
75  
76      /** timing -- if we should record operation times. */
77      protected final static boolean timing = true;
78  
79      /** Number of puts into the cache. */
80      private int puts = 0;
81  
82      /** Maps cache name to CacheListeners object. association of listeners (regions). */
83      private final Hashtable<String, CacheListeners<K, V>> cacheListenersMap =
84          new Hashtable<String, CacheListeners<K, V>>();
85  
86      /** maps cluster listeners to regions. */
87      private final Hashtable<String, CacheListeners<K, V>> clusterListenersMap =
88          new Hashtable<String, CacheListeners<K, V>>();
89  
90      /** The central hub */
91      private CompositeCacheManager cacheManager;
92  
93      /** relates listener id with a type */
94      private final Hashtable<Long, RemoteType> idTypeMap = new Hashtable<Long, RemoteType>();
95  
96      /** relates listener id with an ip address */
97      private final Hashtable<Long, String> idIPMap = new Hashtable<Long, String>();
98  
99      /** Used to get the next listener id. */
100     private final int[] listenerId = new int[1];
101 
102     /** Configuration settings. */
103     protected IRemoteCacheServerAttributes remoteCacheServerAttributes;
104 
105     /** The interval at which we will log updates. */
106     private final int logInterval = 100;
107 
108     /** An optional event logger */
109     private transient ICacheEventLogger cacheEventLogger;
110 
111     /**
112      * Constructor for the RemoteCacheServer object. This initializes the server with the values
113      * from the config file.
114      * <p>
115      * @param rcsa
116      * @throws RemoteException
117      */
118     protected RemoteCacheServer( IRemoteCacheServerAttributes rcsa )
119         throws RemoteException
120     {
121         super( rcsa.getServicePort() );
122         this.remoteCacheServerAttributes = rcsa;
123         init( rcsa.getConfigFileName() );
124     }
125 
126     /**
127      * Constructor for the RemoteCacheServer object. This initializes the server with the values
128      * from the config file.
129      * <p>
130      * @param rcsa
131      * @param customRMISocketFactory
132      * @throws RemoteException
133      */
134     protected RemoteCacheServer( IRemoteCacheServerAttributes rcsa, RMISocketFactory customRMISocketFactory )
135         throws RemoteException
136     {
137         super( rcsa.getServicePort(), customRMISocketFactory, customRMISocketFactory );
138         this.remoteCacheServerAttributes = rcsa;
139         init( rcsa.getConfigFileName() );
140     }
141 
142     /**
143      * Initialize the RMI Cache Server from a properties file.
144      * <p>
145      * @param prop
146      * @throws RemoteException if the configuration of the cache manager instance fails
147      */
148     private void init( String prop ) throws RemoteException
149     {
150         try
151         {
152             cacheManager = createCacheManager( prop );
153         }
154         catch (CacheException e)
155         {
156             throw new RemoteException(e.getMessage(), e);
157         }
158 
159         // cacheManager would have created a number of ICache objects.
160         // Use these objects to set up the cacheListenersMap.
161         String[] list = cacheManager.getCacheNames();
162         for ( int i = 0; i < list.length; i++ )
163         {
164             String name = list[i];
165             CompositeCache<K, V> cache = cacheManager.getCache( name );
166             cacheListenersMap.put( name, new CacheListeners<K, V>( cache ) );
167         }
168     }
169 
170     /**
171      * Subclass can override this method to create the specific cache manager.
172      * <p>
173      * @param prop The name of the configuration file.
174      * @return The cache hub configured with this configuration file.
175      *
176      * @throws CacheException if the configuration cannot be loaded
177      */
178     private CompositeCacheManager createCacheManager( String prop ) throws CacheException
179     {
180         CompositeCacheManager hub = CompositeCacheManager.getUnconfiguredInstance();
181 
182         if ( prop == null )
183         {
184             hub.configure( "/remote.cache.ccf" );
185         }
186         else
187         {
188             hub.configure( prop );
189         }
190         return hub;
191     }
192 
193     /**
194      * Puts a cache bean to the remote cache and notifies all listeners which <br>
195      * <ol>
196      * <li>have a different listener id than the originating host; <li>are currently subscribed to
197      * the related cache.
198      * </ol>
199      * <p>
200      * @param item
201      * @throws IOException
202      */
203     public void put( ICacheElement<K, V> item )
204         throws IOException
205     {
206         update( item );
207     }
208 
209     /**
210      * @param item
211      * @throws IOException
212      */
213     public void update( ICacheElement<K, V> item )
214         throws IOException
215     {
216         update( item, 0 );
217     }
218 
219     /**
220      * The internal processing is wrapped in event logging calls.
221      * <p>
222      * @param item
223      * @param requesterId
224      * @throws IOException
225      */
226     public void update( ICacheElement<K, V> item, long requesterId )
227         throws IOException
228     {
229         ICacheEvent<ICacheElement<K, V>> cacheEvent = createICacheEvent( item, requesterId, ICacheEventLogger.UPDATE_EVENT );
230         try
231         {
232             processUpdate( item, requesterId );
233         }
234         finally
235         {
236             logICacheEvent( cacheEvent );
237         }
238     }
239 
240     /**
241      * An update can come from either a local cache's remote auxiliary, or it can come from a remote
242      * server. A remote server is considered a a source of type cluster.
243      * <p>
244      * If the update came from a cluster, then we should tell the cache manager that this was a
245      * remote put. This way, any lateral and remote auxiliaries configured for the region will not
246      * be updated. This is basically how a remote listener works when plugged into a local cache.
247      * <p>
248      * If the cluster is configured to keep local cluster consistency, then all listeners will be
249      * updated. This allows cluster server A to update cluster server B and then B to update its
250      * clients if it is told to keep local cluster consistency. Otherwise, server A will update
251      * server B and B will not tell its clients. If you cluster using lateral caches for instance,
252      * this is how it will work. Updates to a cluster node, will never get to the leaves. The remote
253      * cluster, with local cluster consistency, allows you to update leaves. This basically allows
254      * you to have a failover remote server.
255      * <p>
256      * Since currently a cluster will not try to get from other cluster servers, you can scale a bit
257      * with a cluster configuration. Puts and removes will be broadcasted to all clients, but the
258      * get load on a remote server can be reduced.
259      * <p>
260      * @param item
261      * @param requesterId
262      */
263     private void processUpdate( ICacheElement<K, V> item, long requesterId )
264     {
265         long start = 0;
266         if ( timing )
267         {
268             start = System.currentTimeMillis();
269         }
270 
271         logUpdateInfo( item );
272 
273         try
274         {
275             CacheListeners<K, V> cacheDesc = getCacheListeners( item.getCacheName() );
276             /* Object val = */item.getVal();
277 
278             boolean fromCluster = isRequestFromCluster( requesterId );
279 
280             if ( log.isDebugEnabled() )
281             {
282                 log.debug( "In update, requesterId = [" + requesterId + "] fromCluster = " + fromCluster );
283             }
284 
285             // ordered cache item update and notification.
286             synchronized ( cacheDesc )
287             {
288                 try
289                 {
290                     CompositeCache<K, V> c = (CompositeCache<K, V>) cacheDesc.cache;
291 
292                     // If the source of this request was not from a cluster,
293                     // then consider it a local update. The cache manager will
294                     // try to
295                     // update all auxiliaries.
296                     //
297                     // This requires that two local caches not be connected to
298                     // two clustered remote caches. The failover runner will
299                     // have to make sure of this. ALos, the local cache needs
300                     // avoid updating this source. Will need to pass the source
301                     // id somehow. The remote cache should update all local
302                     // caches
303                     // but not update the cluster source. Cluster remote caches
304                     // should only be updated by the server and not the
305                     // RemoteCache.
306                     if ( fromCluster )
307                     {
308                         if ( log.isDebugEnabled() )
309                         {
310                             log.debug( "Put FROM cluster, NOT updating other auxiliaries for region. "
311                                 + " requesterId [" + requesterId + "]" );
312                         }
313                         c.localUpdate( item );
314                     }
315                     else
316                     {
317                         if ( log.isDebugEnabled() )
318                         {
319                             log.debug( "Put NOT from cluster, updating other auxiliaries for region. "
320                                 + " requesterId [" + requesterId + "]" );
321                         }
322                         c.update( item );
323                     }
324                 }
325                 catch ( Exception ce )
326                 {
327                     // swallow
328                     if ( log.isInfoEnabled() )
329                     {
330                         log.info( "Exception caught updating item. requesterId [" + requesterId + "] "
331                             + ce.getMessage() );
332                     }
333                 }
334 
335                 // UPDATE LOCALS IF A REQUEST COMES FROM A CLUSTER
336                 // IF LOCAL CLUSTER CONSISTENCY IS CONFIGURED
337                 if ( !fromCluster || ( fromCluster && remoteCacheServerAttributes.isLocalClusterConsistency() ) )
338                 {
339                     ICacheEventQueue<K, V>[] qlist = getEventQList( cacheDesc, requesterId );
340 
341                     if ( qlist != null )
342                     {
343                         if ( log.isDebugEnabled() )
344                         {
345                             log.debug( "qlist.length = " + qlist.length );
346                         }
347                         for ( int i = 0; i < qlist.length; i++ )
348                         {
349                             qlist[i].addPutEvent( item );
350                         }
351                     }
352                     else
353                     {
354                         if ( log.isDebugEnabled() )
355                         {
356                             log.debug( "q list is null" );
357                         }
358                     }
359                 }
360             }
361         }
362         catch ( IOException e )
363         {
364             if ( cacheEventLogger != null )
365             {
366                 cacheEventLogger.logError( "RemoteCacheServer", ICacheEventLogger.UPDATE_EVENT, e.getMessage()
367                     + " REGION: " + item.getCacheName() + " ITEM: " + item );
368             }
369 
370             log.error( "Trouble in Update. requesterId [" + requesterId + "]", e );
371         }
372 
373         // TODO use JAMON for timing
374         if ( timing )
375         {
376             long end = System.currentTimeMillis();
377             if ( log.isDebugEnabled() )
378             {
379                 log.debug( "put took " + String.valueOf( end - start ) + " ms." );
380             }
381         }
382     }
383 
384     /**
385      * Log some details.
386      * <p>
387      * @param item
388      */
389     private void logUpdateInfo( ICacheElement<K, V> item )
390     {
391         if ( log.isInfoEnabled() )
392         {
393             // not thread safe, but it doesn't have to be accurate
394             puts++;
395             if ( puts % logInterval == 0 )
396             {
397                 log.info( "puts = " + puts );
398             }
399         }
400 
401         if ( log.isDebugEnabled() )
402         {
403             log.debug( "In update, put [" + item.getKey() + "] in [" + item.getCacheName() + "]" );
404         }
405     }
406 
407     /**
408      * Returns a cache value from the specified remote cache; or null if the cache or key does not
409      * exist.
410      * <p>
411      * @param cacheName
412      * @param key
413      * @return ICacheElement
414      * @throws IOException
415      */
416     public ICacheElement<K, V> get( String cacheName, K key )
417         throws IOException
418     {
419         return this.get( cacheName, key, 0 );
420     }
421 
422     /**
423      * Returns a cache bean from the specified cache; or null if the key does not exist.
424      * <p>
425      * Adding the requestor id, allows the cache to determine the source of the get.
426      * <p>
427      * The internal processing is wrapped in event logging calls.
428      * <p>
429      * @param cacheName
430      * @param key
431      * @param requesterId
432      * @return ICacheElement
433      * @throws IOException
434      */
435     public ICacheElement<K, V> get( String cacheName, K key, long requesterId )
436         throws IOException
437     {
438         ICacheElement<K, V> element = null;
439         ICacheEvent<K> cacheEvent = createICacheEvent( cacheName, key, requesterId, ICacheEventLogger.GET_EVENT );
440         try
441         {
442             element = processGet( cacheName, key, requesterId );
443         }
444         finally
445         {
446             logICacheEvent( cacheEvent );
447         }
448         return element;
449     }
450 
451     /**
452      * Returns a cache bean from the specified cache; or null if the key does not exist.
453      * <p>
454      * Adding the requester id, allows the cache to determine the source of the get.
455      * <p>
456      * @param cacheName
457      * @param key
458      * @param requesterId
459      * @return ICacheElement
460      */
461     private ICacheElement<K, V> processGet( String cacheName, K key, long requesterId )
462     {
463         boolean fromCluster = isRequestFromCluster( requesterId );
464 
465         if ( log.isDebugEnabled() )
466         {
467             log.debug( "get [" + key + "] from cache [" + cacheName + "] requesterId = [" + requesterId
468                 + "] fromCluster = " + fromCluster );
469         }
470 
471         CacheListeners<K, V> cacheDesc = null;
472         try
473         {
474             cacheDesc = getCacheListeners( cacheName );
475         }
476         catch ( Exception e )
477         {
478             log.error( "Problem getting listeners.", e );
479 
480             if ( cacheEventLogger != null )
481             {
482                 cacheEventLogger.logError( "RemoteCacheServer", ICacheEventLogger.GET_EVENT, e.getMessage() + cacheName
483                     + " KEY: " + key );
484             }
485         }
486 
487         ICacheElement<K, V> element = null;
488 
489         element = getFromCacheListeners( key, fromCluster, cacheDesc, element );
490         return element;
491     }
492 
493     /**
494      * Gets the item from the associated cache listeners.
495      * <p>
496      * @param key
497      * @param fromCluster
498      * @param cacheDesc
499      * @param element
500      * @return ICacheElement
501      */
502     private ICacheElement<K, V> getFromCacheListeners( K key, boolean fromCluster, CacheListeners<K, V> cacheDesc,
503                                                  ICacheElement<K, V> element )
504     {
505         ICacheElement<K, V> returnElement = element;
506 
507         if ( cacheDesc != null )
508         {
509             CompositeCache<K, V> c = (CompositeCache<K, V>) cacheDesc.cache;
510 
511             // If we have a get come in from a client and we don't have the item
512             // locally, we will allow the cache to look in other non local sources,
513             // such as a remote cache or a lateral.
514             //
515             // Since remote servers never get from clients and clients never go
516             // remote from a remote call, this
517             // will not result in any loops.
518             //
519             // This is the only instance I can think of where we allow a remote get
520             // from a remote call. The purpose is to allow remote cache servers to
521             // talk to each other. If one goes down, you want it to be able to get
522             // data from those that were up when the failed server comes back o
523             // line.
524 
525             if ( !fromCluster && this.remoteCacheServerAttributes.isAllowClusterGet() )
526             {
527                 if ( log.isDebugEnabled() )
528                 {
529                     log.debug( "NonLocalGet. fromCluster [" + fromCluster + "] AllowClusterGet ["
530                         + this.remoteCacheServerAttributes.isAllowClusterGet() + "]" );
531                 }
532                 returnElement = c.get( key );
533             }
534             else
535             {
536                 // Gets from cluster type remote will end up here.
537                 // Gets from all clients will end up here if allow cluster get is
538                 // false.
539 
540                 if ( log.isDebugEnabled() )
541                 {
542                     log.debug( "LocalGet.  fromCluster [" + fromCluster + "] AllowClusterGet ["
543                         + this.remoteCacheServerAttributes.isAllowClusterGet() + "]" );
544                 }
545                 returnElement = c.localGet( key );
546             }
547         }
548 
549         return returnElement;
550     }
551 
552     /**
553      * Gets all matching items.
554      * <p>
555      * @param cacheName
556      * @param pattern
557      * @return Map of keys and wrapped objects
558      * @throws IOException
559      */
560     public Map<K, ICacheElement<K, V>> getMatching( String cacheName, String pattern )
561         throws IOException
562     {
563         return getMatching( cacheName, pattern, 0 );
564     }
565 
566     /**
567      * Retrieves all matching keys.
568      * <p>
569      * @param cacheName
570      * @param pattern
571      * @param requesterId
572      * @return Map of keys and wrapped objects
573      * @throws IOException
574      */
575     public Map<K, ICacheElement<K, V>> getMatching( String cacheName, String pattern, long requesterId )
576         throws IOException
577     {
578         ICacheEvent<String> cacheEvent = createICacheEvent( cacheName, pattern, requesterId,
579                                                     ICacheEventLogger.GETMATCHING_EVENT );
580         try
581         {
582             return processGetMatching( cacheName, pattern, requesterId );
583         }
584         finally
585         {
586             logICacheEvent( cacheEvent );
587         }
588     }
589 
590     /**
591      * Retrieves all matching keys.
592      * <p>
593      * @param cacheName
594      * @param pattern
595      * @param requesterId
596      * @return Map of keys and wrapped objects
597      */
598     protected Map<K, ICacheElement<K, V>> processGetMatching( String cacheName, String pattern, long requesterId )
599     {
600         boolean fromCluster = isRequestFromCluster( requesterId );
601 
602         if ( log.isDebugEnabled() )
603         {
604             log.debug( "getMatching [" + pattern + "] from cache [" + cacheName + "] requesterId = [" + requesterId
605                 + "] fromCluster = " + fromCluster );
606         }
607 
608         CacheListeners<K, V> cacheDesc = null;
609         try
610         {
611             cacheDesc = getCacheListeners( cacheName );
612         }
613         catch ( Exception e )
614         {
615             log.error( "Problem getting listeners.", e );
616 
617             if ( cacheEventLogger != null )
618             {
619                 cacheEventLogger.logError( "RemoteCacheServer", ICacheEventLogger.GETMATCHING_EVENT, e.getMessage()
620                     + cacheName + " pattern: " + pattern );
621             }
622         }
623 
624         return getMatchingFromCacheListeners( pattern, fromCluster, cacheDesc );
625     }
626 
627     /**
628      * Gets the item from the associated cache listeners.
629      * <p>
630      * @param pattern
631      * @param fromCluster
632      * @param cacheDesc
633      * @return Map of keys to results
634      */
635     private Map<K, ICacheElement<K, V>> getMatchingFromCacheListeners( String pattern, boolean fromCluster, CacheListeners<K, V> cacheDesc )
636     {
637         Map<K, ICacheElement<K, V>> elements = null;
638         if ( cacheDesc != null )
639         {
640             CompositeCache<K, V> c = (CompositeCache<K, V>) cacheDesc.cache;
641 
642             // We always want to go remote and then merge the items.  But this can lead to inconsistencies after
643             // failover recovery.  Removed items may show up.  There is no good way to prevent this.
644             // We should make it configurable.
645 
646             if ( !fromCluster && this.remoteCacheServerAttributes.isAllowClusterGet() )
647             {
648                 if ( log.isDebugEnabled() )
649                 {
650                     log.debug( "NonLocalGetMatching. fromCluster [" + fromCluster + "] AllowClusterGet ["
651                         + this.remoteCacheServerAttributes.isAllowClusterGet() + "]" );
652                 }
653                 elements = c.getMatching( pattern );
654             }
655             else
656             {
657                 // Gets from cluster type remote will end up here.
658                 // Gets from all clients will end up here if allow cluster get is
659                 // false.
660 
661                 if ( log.isDebugEnabled() )
662                 {
663                     log.debug( "LocalGetMatching.  fromCluster [" + fromCluster + "] AllowClusterGet ["
664                         + this.remoteCacheServerAttributes.isAllowClusterGet() + "]" );
665                 }
666                 elements = c.localGetMatching( pattern );
667             }
668         }
669         return elements;
670     }
671 
672     /**
673      * Gets multiple items from the cache based on the given set of keys.
674      * <p>
675      * @param cacheName
676      * @param keys
677      * @return a map of K key to ICacheElement<K, V> element, or an empty map if there is no
678      *         data in cache for any of these keys
679      * @throws IOException
680      */
681     public Map<K, ICacheElement<K, V>> getMultiple( String cacheName, Set<K> keys )
682         throws IOException
683     {
684         return this.getMultiple( cacheName, keys, 0 );
685     }
686 
687     /**
688      * Gets multiple items from the cache based on the given set of keys.
689      * <p>
690      * The internal processing is wrapped in event logging calls.
691      * <p>
692      * @param cacheName
693      * @param keys
694      * @param requesterId
695      * @return a map of K key to ICacheElement<K, V> element, or an empty map if there is no
696      *         data in cache for any of these keys
697      * @throws IOException
698      */
699     public Map<K, ICacheElement<K, V>> getMultiple( String cacheName, Set<K> keys, long requesterId )
700         throws IOException
701     {
702         ICacheEvent<Serializable> cacheEvent = createICacheEvent( cacheName, (Serializable) keys, requesterId,
703                                                     ICacheEventLogger.GETMULTIPLE_EVENT );
704         try
705         {
706             return processGetMultiple( cacheName, keys, requesterId );
707         }
708         finally
709         {
710             logICacheEvent( cacheEvent );
711         }
712     }
713 
714     /**
715      * Gets multiple items from the cache based on the given set of keys.
716      * <p>
717      * @param cacheName
718      * @param keys
719      * @param requesterId
720      * @return a map of K key to ICacheElement<K, V> element, or an empty map if there is no
721      *         data in cache for any of these keys
722      */
723     private Map<K, ICacheElement<K, V>> processGetMultiple( String cacheName, Set<K> keys, long requesterId )
724     {
725         Map<K, ICacheElement<K, V>> elements = null;
726 
727         boolean fromCluster = isRequestFromCluster( requesterId );
728 
729         if ( log.isDebugEnabled() )
730         {
731             log.debug( "getMultiple [" + keys + "] from cache [" + cacheName + "] requesterId = [" + requesterId
732                 + "] fromCluster = " + fromCluster );
733         }
734 
735         CacheListeners<K, V> cacheDesc = null;
736         try
737         {
738             cacheDesc = getCacheListeners( cacheName );
739         }
740         catch ( Exception e )
741         {
742             log.error( "Problem getting listeners.", e );
743         }
744 
745         elements = getMultipleFromCacheListeners( keys, elements, fromCluster, cacheDesc );
746         return elements;
747     }
748 
749     /**
750      * Since a non-receiving remote cache client will not register a listener, it will not have a
751      * listener id assigned from the server. As such the remote server cannot determine if it is a
752      * cluster or a normal client. It will assume that it is a normal client.
753      * <p>
754      * @param requesterId
755      * @return true is from a cluster.
756      */
757     private boolean isRequestFromCluster( long requesterId )
758     {
759         RemoteType remoteTypeL = idTypeMap.get( Long.valueOf( requesterId ) );
760         return remoteTypeL == RemoteType.CLUSTER;
761     }
762 
763     /**
764      * Gets the items from the associated cache listeners.
765      * <p>
766      * @param keys
767      * @param elements
768      * @param fromCluster
769      * @param cacheDesc
770      * @return Map
771      */
772     private Map<K, ICacheElement<K, V>> getMultipleFromCacheListeners( Set<K> keys, Map<K, ICacheElement<K, V>> elements, boolean fromCluster, CacheListeners<K, V> cacheDesc )
773     {
774         Map<K, ICacheElement<K, V>> returnElements = elements;
775 
776         if ( cacheDesc != null )
777         {
778             CompositeCache<K, V> c = (CompositeCache<K, V>) cacheDesc.cache;
779 
780             // If we have a getMultiple come in from a client and we don't have the item
781             // locally, we will allow the cache to look in other non local sources,
782             // such as a remote cache or a lateral.
783             //
784             // Since remote servers never get from clients and clients never go
785             // remote from a remote call, this
786             // will not result in any loops.
787             //
788             // This is the only instance I can think of where we allow a remote get
789             // from a remote call. The purpose is to allow remote cache servers to
790             // talk to each other. If one goes down, you want it to be able to get
791             // data from those that were up when the failed server comes back on
792             // line.
793 
794             if ( !fromCluster && this.remoteCacheServerAttributes.isAllowClusterGet() )
795             {
796                 if ( log.isDebugEnabled() )
797                 {
798                     log.debug( "NonLocalGetMultiple. fromCluster [" + fromCluster + "] AllowClusterGet ["
799                         + this.remoteCacheServerAttributes.isAllowClusterGet() + "]" );
800                 }
801 
802                 returnElements = c.getMultiple( keys );
803             }
804             else
805             {
806                 // Gets from cluster type remote will end up here.
807                 // Gets from all clients will end up here if allow cluster get is
808                 // false.
809 
810                 if ( log.isDebugEnabled() )
811                 {
812                     log.debug( "LocalGetMultiple.  fromCluster [" + fromCluster + "] AllowClusterGet ["
813                         + this.remoteCacheServerAttributes.isAllowClusterGet() + "]" );
814                 }
815 
816                 returnElements = c.localGetMultiple( keys );
817             }
818         }
819 
820         return returnElements;
821     }
822 
823     /**
824      * Gets the set of keys of objects currently in the group.
825      * <p>
826      * @param cacheName
827      * @param group
828      * @return A Set of group keys
829      */
830     public Set<K> getGroupKeys( String cacheName, String group )
831     {
832         return processGetGroupKeys( cacheName, group );
833     }
834 
835     /**
836      * Gets the set of keys of objects currently in the group.
837      * <p>
838      * @param cacheName
839      * @param group
840      * @return Set
841      */
842     protected Set<K> processGetGroupKeys( String cacheName, String group )
843     {
844         CacheListeners<K, V> cacheDesc = null;
845         try
846         {
847             cacheDesc = getCacheListeners( cacheName );
848         }
849         catch ( Exception e )
850         {
851             log.error( "Problem getting listeners.", e );
852         }
853 
854         if ( cacheDesc == null )
855         {
856             return Collections.emptySet();
857         }
858 
859         CompositeCache<K, V> c = (CompositeCache<K, V>) cacheDesc.cache;
860         return c.getGroupKeys( group );
861     }
862 
863     /**
864      * Gets the set of group names currently in the cache.
865      * <p>
866      * @param cacheName the name of the region
867      * @return A Set of group names
868      */
869     public Set<String> getGroupNames(String cacheName)
870     {
871         return processGetGroupNames(cacheName);
872     }
873 
874     /**
875      * Gets the set of group names currently in the cache.
876      * <p>
877      * @param cacheName the name of the region
878      * @return A Set of group names
879      */
880     protected Set<String> processGetGroupNames(String cacheName)
881     {
882         CacheListeners<K, V> cacheDesc = null;
883         try
884         {
885             cacheDesc = getCacheListeners(cacheName);
886         }
887         catch (Exception e)
888         {
889             log.error("Problem getting listeners.", e);
890         }
891 
892         if (cacheDesc == null)
893         {
894             return Collections.emptySet();
895         }
896 
897         CompositeCache<K, V> c = (CompositeCache<K, V>) cacheDesc.cache;
898         return c.getGroupNames();
899     }
900 
901     /**
902      * Removes the given key from the specified remote cache. Defaults the listener id to 0.
903      * <p>
904      * @param cacheName
905      * @param key
906      * @throws IOException
907      */
908     public void remove( String cacheName, K key )
909         throws IOException
910     {
911         remove( cacheName, key, 0 );
912     }
913 
914     /**
915      * Remove the key from the cache region and don't tell the source listener about it.
916      * <p>
917      * The internal processing is wrapped in event logging calls.
918      * <p>
919      * @param cacheName
920      * @param key
921      * @param requesterId
922      * @throws IOException
923      */
924     public void remove( String cacheName, K key, long requesterId )
925         throws IOException
926     {
927         ICacheEvent<K> cacheEvent = createICacheEvent( cacheName, key, requesterId, ICacheEventLogger.REMOVE_EVENT );
928         try
929         {
930             processRemove( cacheName, key, requesterId );
931         }
932         finally
933         {
934             logICacheEvent( cacheEvent );
935         }
936     }
937 
938     /**
939      * Remove the key from the cache region and don't tell the source listener about it.
940      * <p>
941      * @param cacheName
942      * @param key
943      * @param requesterId
944      * @throws IOException
945      */
946     private void processRemove( String cacheName, K key, long requesterId )
947         throws IOException
948     {
949         if ( log.isDebugEnabled() )
950         {
951             log.debug( "remove [" + key + "] from cache [" + cacheName + "]" );
952         }
953 
954         CacheListeners<K, V> cacheDesc = cacheListenersMap.get( cacheName );
955 
956         boolean fromCluster = isRequestFromCluster( requesterId );
957 
958         if ( cacheDesc != null )
959         {
960             // best attempt to achieve ordered cache item removal and
961             // notification.
962             synchronized ( cacheDesc )
963             {
964                 boolean removeSuccess = false;
965 
966                 // No need to notify if it was not cached.
967                 CompositeCache<K, V> c = (CompositeCache<K, V>) cacheDesc.cache;
968 
969                 if ( fromCluster )
970                 {
971                     if ( log.isDebugEnabled() )
972                     {
973                         log.debug( "Remove FROM cluster, NOT updating other auxiliaries for region" );
974                     }
975                     removeSuccess = c.localRemove( key );
976                 }
977                 else
978                 {
979                     if ( log.isDebugEnabled() )
980                     {
981                         log.debug( "Remove NOT from cluster, updating other auxiliaries for region" );
982                     }
983                     removeSuccess = c.remove( key );
984                 }
985 
986                 if ( log.isDebugEnabled() )
987                 {
988                     log.debug( "remove [" + key + "] from cache [" + cacheName + "] success (was it found) = "
989                         + removeSuccess );
990                 }
991 
992                 // UPDATE LOCALS IF A REQUEST COMES FROM A CLUSTER
993                 // IF LOCAL CLUSTER CONSISTENCY IS CONFIGURED
994                 if ( !fromCluster || ( fromCluster && remoteCacheServerAttributes.isLocalClusterConsistency() ) )
995                 {
996                     ICacheEventQueue<K, V>[] qlist = getEventQList( cacheDesc, requesterId );
997 
998                     for ( int i = 0; i < qlist.length; i++ )
999                     {
1000                         qlist[i].addRemoveEvent( key );
1001                     }
1002                 }
1003             }
1004         }
1005     }
1006 
1007     /**
1008      * Remove all keys from the specified remote cache.
1009      * <p>
1010      * @param cacheName
1011      * @throws IOException
1012      */
1013     public void removeAll( String cacheName )
1014         throws IOException
1015     {
1016         removeAll( cacheName, 0 );
1017     }
1018 
1019     /**
1020      * Remove all keys from the specified remote cache.
1021      * <p>
1022      * The internal processing is wrapped in event logging calls.
1023      * <p>
1024      * @param cacheName
1025      * @param requesterId
1026      * @throws IOException
1027      */
1028     public void removeAll( String cacheName, long requesterId )
1029         throws IOException
1030     {
1031         ICacheEvent<String> cacheEvent = createICacheEvent( cacheName, "all", requesterId, ICacheEventLogger.REMOVEALL_EVENT );
1032         try
1033         {
1034             processRemoveAll( cacheName, requesterId );
1035         }
1036         finally
1037         {
1038             logICacheEvent( cacheEvent );
1039         }
1040     }
1041 
1042     /**
1043      * Remove all keys from the specified remote cache.
1044      * <p>
1045      * @param cacheName
1046      * @param requesterId
1047      * @throws IOException
1048      */
1049     private void processRemoveAll( String cacheName, long requesterId )
1050         throws IOException
1051     {
1052         CacheListeners<K, V> cacheDesc = cacheListenersMap.get( cacheName );
1053 
1054         boolean fromCluster = isRequestFromCluster( requesterId );
1055 
1056         if ( cacheDesc != null )
1057         {
1058             // best attempt to achieve ordered cache item removal and
1059             // notification.
1060             synchronized ( cacheDesc )
1061             {
1062                 // No need to broadcast, or notify if it was not cached.
1063                 CompositeCache<K, V> c = (CompositeCache<K, V>) cacheDesc.cache;
1064 
1065                 if ( fromCluster )
1066                 {
1067                     if ( log.isDebugEnabled() )
1068                     {
1069                         log.debug( "RemoveALL FROM cluster, NOT updating other auxiliaries for region" );
1070                     }
1071                     c.localRemoveAll();
1072                 }
1073                 else
1074                 {
1075                     if ( log.isDebugEnabled() )
1076                     {
1077                         log.debug( "RemoveALL NOT from cluster, updating other auxiliaries for region" );
1078                     }
1079                     c.removeAll();
1080                 }
1081 
1082                 // update registered listeners
1083                 if ( !fromCluster || ( fromCluster && remoteCacheServerAttributes.isLocalClusterConsistency() ) )
1084                 {
1085                     ICacheEventQueue<K, V>[] qlist = getEventQList( cacheDesc, requesterId );
1086 
1087                     for ( int i = 0; i < qlist.length; i++ )
1088                     {
1089                         qlist[i].addRemoveAllEvent();
1090                     }
1091                 }
1092             }
1093         }
1094     }
1095 
1096     /**
1097      * How many put events have we received.
1098      * <p>
1099      * @return puts
1100      */
1101     protected int getPutCount()
1102     {
1103         return puts;
1104     }
1105 
1106     /**
1107      * Frees the specified remote cache.
1108      * <p>
1109      * @param cacheName
1110      * @throws IOException
1111      */
1112     public void dispose( String cacheName )
1113         throws IOException
1114     {
1115         dispose( cacheName, 0 );
1116     }
1117 
1118     /**
1119      * Frees the specified remote cache.
1120      * <p>
1121      * @param cacheName
1122      * @param requesterId
1123      * @throws IOException
1124      */
1125     public void dispose( String cacheName, long requesterId )
1126         throws IOException
1127     {
1128         ICacheEvent<String> cacheEvent = createICacheEvent( cacheName, "none", requesterId, ICacheEventLogger.DISPOSE_EVENT );
1129         try
1130         {
1131             processDispose( cacheName, requesterId );
1132         }
1133         finally
1134         {
1135             logICacheEvent( cacheEvent );
1136         }
1137     }
1138 
1139     /**
1140      * @param cacheName
1141      * @param requesterId
1142      * @throws IOException
1143      */
1144     private void processDispose( String cacheName, long requesterId )
1145         throws IOException
1146     {
1147         if ( log.isInfoEnabled() )
1148         {
1149             log.info( "Dispose request received from listener [" + requesterId + "]" );
1150         }
1151 
1152         CacheListeners<K, V> cacheDesc = cacheListenersMap.get( cacheName );
1153 
1154         // this is dangerous
1155         if ( cacheDesc != null )
1156         {
1157             // best attempt to achieve ordered free-cache-op and notification.
1158             synchronized ( cacheDesc )
1159             {
1160                 ICacheEventQueue<K, V>[] qlist = getEventQList( cacheDesc, requesterId );
1161 
1162                 for ( int i = 0; i < qlist.length; i++ )
1163                 {
1164                     qlist[i].addDisposeEvent();
1165                 }
1166                 cacheManager.freeCache( cacheName );
1167             }
1168         }
1169     }
1170 
1171     /**
1172      * Frees all remote caches.
1173      * <p>
1174      * @throws IOException
1175      */
1176     public void release()
1177         throws IOException
1178     {
1179         synchronized ( cacheListenersMap )
1180         {
1181             for (CacheListeners<K, V> cacheDesc : cacheListenersMap.values())
1182             {
1183                 ICacheEventQueue<K, V>[] qlist = getEventQList( cacheDesc, 0 );
1184 
1185                 for ( int i = 0; i < qlist.length; i++ )
1186                 {
1187                     qlist[i].addDisposeEvent();
1188                 }
1189             }
1190             cacheManager.release();
1191         }
1192     }
1193 
1194     /**
1195      * Returns the cache listener for the specified cache. Creates the cache and the cache
1196      * descriptor if they do not already exist.
1197      * <p>
1198      * @param cacheName
1199      * @return The cacheListeners value
1200      */
1201     protected CacheListeners<K, V> getCacheListeners( String cacheName )
1202     {
1203         CacheListeners<K, V> cacheListeners = cacheListenersMap.get( cacheName );
1204         synchronized ( cacheListenersMap )
1205         {
1206             if ( cacheListeners == null )
1207             {
1208                 cacheListeners = cacheListenersMap.get( cacheName );
1209                 if ( cacheListeners == null )
1210                 {
1211                     CompositeCache<K, V> cache = cacheManager.getCache( cacheName );
1212                     cacheListeners = new CacheListeners<K, V>( cache );
1213                     cacheListenersMap.put( cacheName, cacheListeners );
1214                 }
1215             }
1216         }
1217         return cacheListeners;
1218     }
1219 
1220     /**
1221      * Gets the clusterListeners attribute of the RemoteCacheServer object.
1222      * <p>
1223      * TODO may be able to remove this
1224      * @param cacheName
1225      * @return The clusterListeners value
1226      */
1227     protected CacheListeners<K, V> getClusterListeners( String cacheName )
1228     {
1229         CacheListeners<K, V> cacheListeners = clusterListenersMap.get( cacheName );
1230         synchronized ( clusterListenersMap )
1231         {
1232             if ( cacheListeners == null )
1233             {
1234                 cacheListeners = clusterListenersMap.get( cacheName );
1235                 if ( cacheListeners == null )
1236                 {
1237                     CompositeCache<K, V> cache = cacheManager.getCache( cacheName );
1238                     cacheListeners = new CacheListeners<K, V>( cache );
1239                     clusterListenersMap.put( cacheName, cacheListeners );
1240                 }
1241             }
1242         }
1243         return cacheListeners;
1244     }
1245 
1246     /**
1247      * Gets the eventQList attribute of the RemoteCacheServer object. This returns the event queues
1248      * stored in the cacheListeners object for a particular region, if the queue is not for this
1249      * requester.
1250      * <p>
1251      * Basically, this makes sure that a request from a particular local cache, identified by its
1252      * listener id, does not result in a call to that same listener.
1253      * <p>
1254      * @param cacheListeners
1255      * @param requesterId
1256      * @return The eventQList value
1257      */
1258     @SuppressWarnings("unchecked") // No generic arrays in java
1259     private ICacheEventQueue<K, V>[] getEventQList( CacheListeners<K, V> cacheListeners, long requesterId )
1260     {
1261         ICacheEventQueue<K, V>[] list = null;
1262         synchronized ( cacheListeners.eventQMap )
1263         {
1264             list = cacheListeners.eventQMap.values().toArray( new ICacheEventQueue[0] );
1265         }
1266         int count = 0;
1267         // Set those not qualified to null; Count those qualified.
1268         for ( int i = 0; i < list.length; i++ )
1269         {
1270             ICacheEventQueue<K, V> q = list[i];
1271             if ( q.isWorking() && q.getListenerId() != requesterId )
1272             {
1273                 count++;
1274             }
1275             else
1276             {
1277                 list[i] = null;
1278             }
1279         }
1280         if ( count == list.length )
1281         {
1282             // All qualified.
1283             return list;
1284         }
1285 
1286         // Returns only the qualified.
1287         ICacheEventQueue<K, V>[] qq = new ICacheEventQueue[count];
1288         count = 0;
1289         for ( int i = 0; i < list.length; i++ )
1290         {
1291             if ( list[i] != null )
1292             {
1293                 qq[count++] = list[i];
1294             }
1295         }
1296         return qq;
1297     }
1298 
1299     /**
1300      * Removes dead event queues. Should clean out deregistered listeners.
1301      * <p>
1302      * @param eventQMap
1303      */
1304     private static <KK extends Serializable, VV extends Serializable> void cleanupEventQMap( Map<Long, ICacheEventQueue<KK, VV>> eventQMap )
1305     {
1306         synchronized ( eventQMap )
1307         {
1308             for (Iterator<Map.Entry<Long, ICacheEventQueue<KK, VV>>> itr = eventQMap.entrySet().iterator(); itr.hasNext(); )
1309             {
1310                 Map.Entry<Long, ICacheEventQueue<KK, VV>> e = itr.next();
1311                 ICacheEventQueue<KK, VV> q = e.getValue();
1312 
1313                 // this does not care if the q is alive (i.e. if
1314                 // there are active threads; it cares if the queue
1315                 // is working -- if it has not encountered errors
1316                 // above the failure threshold
1317                 if ( !q.isWorking() )
1318                 {
1319                     itr.remove();
1320                     log.warn( "Cache event queue " + q + " is not working and removed from cache server." );
1321                 }
1322             }
1323         }
1324     }
1325 
1326     /**
1327      * Subscribes to the specified remote cache.
1328      * <p>
1329      * If the client id is 0, then the remote cache server will increment it's local count and
1330      * assign an id to the client.
1331      * <p>
1332      * @param cacheName the specified remote cache.
1333      * @param listener object to notify for cache changes. must be synchronized since there are
1334      *            remote calls involved.
1335      * @throws IOException
1336      */
1337     @SuppressWarnings("unchecked") // Need to cast to specific return type from getClusterListeners()
1338     public <KK extends Serializable, VV extends Serializable> void addCacheListener( String cacheName, ICacheListener<KK, VV> listener )
1339         throws IOException
1340     {
1341         if ( cacheName == null || listener == null )
1342         {
1343             throw new IllegalArgumentException( "cacheName and listener must not be null" );
1344         }
1345         CacheListeners<KK, VV> cacheListeners;
1346 
1347         IRemoteCacheListener<KK, VV> ircl = (IRemoteCacheListener<KK, VV>) listener;
1348 
1349         String listenerAddress = ircl.getLocalHostAddress();
1350 
1351         RemoteType remoteType = ircl.getRemoteType();
1352         if ( remoteType == RemoteType.CLUSTER )
1353         {
1354             log.debug( "adding cluster listener, listenerAddress [" + listenerAddress + "]" );
1355             cacheListeners = (CacheListeners<KK, VV>)getClusterListeners( cacheName );
1356         }
1357         else
1358         {
1359             log.debug( "adding normal listener, listenerAddress [" + listenerAddress + "]" );
1360             cacheListeners = (CacheListeners<KK, VV>)getCacheListeners( cacheName );
1361         }
1362         Map<Long, ICacheEventQueue<KK, VV>> eventQMap = cacheListeners.eventQMap;
1363         cleanupEventQMap( eventQMap );
1364 
1365         // synchronized ( listenerId )
1366         synchronized ( ICacheListener.class )
1367         {
1368             long id = 0;
1369             try
1370             {
1371                 id = listener.getListenerId();
1372                 // clients probably shouldn't do this.
1373                 if ( id == 0 )
1374                 {
1375                     // must start at one so the next gets recognized
1376                     long listenerIdB = nextListenerId();
1377                     if ( log.isDebugEnabled() )
1378                     {
1379                         log.debug( "listener id=" + ( listenerIdB & 0xff ) + " addded for cache [" + cacheName
1380                             + "], listenerAddress [" + listenerAddress + "]" );
1381                     }
1382                     listener.setListenerId( listenerIdB );
1383                     id = listenerIdB;
1384 
1385                     // in case it needs synchronization
1386                     String message = "Adding vm listener under new id = [" + listenerIdB + "], listenerAddress ["
1387                         + listenerAddress + "]";
1388                     logApplicationEvent( "RemoteCacheServer", "addCacheListener", message );
1389                     if ( log.isInfoEnabled() )
1390                     {
1391                         log.info( message );
1392                     }
1393                 }
1394                 else
1395                 {
1396                     String message = "Adding listener under existing id = [" + id + "], listenerAddress ["
1397                         + listenerAddress + "]";
1398                     logApplicationEvent( "RemoteCacheServer", "addCacheListener", message );
1399                     if ( log.isInfoEnabled() )
1400                     {
1401                         log.info( message );
1402                     }
1403                     // should confirm the the host is the same as we have on
1404                     // record, just in case a client has made a mistake.
1405                 }
1406 
1407                 // relate the type to an id
1408                 this.idTypeMap.put( Long.valueOf( id ), remoteType);
1409                 if ( listenerAddress != null )
1410                 {
1411                     this.idIPMap.put( Long.valueOf( id ), listenerAddress );
1412                 }
1413             }
1414             catch ( IOException ioe )
1415             {
1416                 String message = "Problem setting listener id, listenerAddress [" + listenerAddress + "]";
1417                 log.error( message, ioe );
1418 
1419                 if ( cacheEventLogger != null )
1420                 {
1421                     cacheEventLogger.logError( "RemoteCacheServer", "addCacheListener", message + " - "
1422                         + ioe.getMessage() );
1423                 }
1424             }
1425 
1426             CacheEventQueueFactory<KK, VV> fact = new CacheEventQueueFactory<KK, VV>();
1427             ICacheEventQueue<KK, VV> q = fact.createCacheEventQueue( listener, id, cacheName, remoteCacheServerAttributes
1428                 .getEventQueuePoolName(), remoteCacheServerAttributes.getEventQueueType() );
1429 
1430             eventQMap.put(Long.valueOf(listener.getListenerId()), q);
1431 
1432             if ( log.isInfoEnabled() )
1433             {
1434                 log.info( cacheListeners );
1435             }
1436         }
1437     }
1438 
1439     /**
1440      * Subscribes to all remote caches.
1441      * <p>
1442      * @param listener The feature to be added to the CacheListener attribute
1443      * @throws IOException
1444      */
1445     public <KK extends Serializable, VV extends Serializable> void addCacheListener( ICacheListener<KK, VV> listener )
1446         throws IOException
1447     {
1448         for (String cacheName : cacheListenersMap.keySet())
1449         {
1450             addCacheListener( cacheName, listener );
1451 
1452             if ( log.isDebugEnabled() )
1453             {
1454                 log.debug( "Adding listener for cache [" + cacheName + "]" );
1455             }
1456         }
1457     }
1458 
1459     /**
1460      * Unsubscribe this listener from this region. If the listener is registered, it will be removed
1461      * from the event queue map list.
1462      * <p>
1463      * @param cacheName
1464      * @param listener
1465      * @throws IOException
1466      */
1467     public <KK extends Serializable, VV extends Serializable> void removeCacheListener( String cacheName, ICacheListener<KK, VV> listener )
1468         throws IOException
1469     {
1470         removeCacheListener( cacheName, listener.getListenerId() );
1471     }
1472 
1473     /**
1474      * Unsubscribe this listener from this region. If the listener is registered, it will be removed
1475      * from the event queue map list.
1476      * <p>
1477      * @param cacheName
1478      * @param listenerId
1479      */
1480     public void removeCacheListener( String cacheName, long listenerId )
1481     {
1482         String message = "Removing listener for cache region = [" + cacheName + "] and listenerId [" + listenerId + "]";
1483         logApplicationEvent( "RemoteCacheServer", "removeCacheListener", message );
1484         if ( log.isInfoEnabled() )
1485         {
1486             log.info( message );
1487         }
1488 
1489         boolean isClusterListener = isRequestFromCluster( listenerId );
1490 
1491         CacheListeners<K, V> cacheDesc = null;
1492 
1493         if ( isClusterListener )
1494         {
1495             cacheDesc = getClusterListeners( cacheName );
1496         }
1497         else
1498         {
1499             cacheDesc = getCacheListeners( cacheName );
1500         }
1501         Map<Long, ICacheEventQueue<K, V>> eventQMap = cacheDesc.eventQMap;
1502         cleanupEventQMap( eventQMap );
1503         ICacheEventQueue<K, V> q = eventQMap.remove( Long.valueOf( listenerId ) );
1504 
1505         if ( q != null )
1506         {
1507             if ( log.isDebugEnabled() )
1508             {
1509                 log.debug( "Found queue for cache region = [" + cacheName + "] and listenerId  [" + listenerId + "]" );
1510             }
1511             q.destroy();
1512             cleanupEventQMap( eventQMap );
1513         }
1514         else
1515         {
1516             if ( log.isDebugEnabled() )
1517             {
1518                 log.debug( "Did not find queue for cache region = [" + cacheName + "] and listenerId [" + listenerId
1519                     + "]" );
1520             }
1521         }
1522 
1523         // cleanup
1524         idTypeMap.remove( Long.valueOf( listenerId ) );
1525         idIPMap.remove( Long.valueOf( listenerId ) );
1526 
1527         if ( log.isInfoEnabled() )
1528         {
1529             log.info( "After removing listener [" + listenerId + "] cache region " + cacheName + "'s listener size ["
1530                 + cacheDesc.eventQMap.size() + "]" );
1531         }
1532     }
1533 
1534     /**
1535      * Unsubscribes from all remote caches.
1536      * <p>
1537      * @param listener
1538      * @throws IOException
1539      */
1540     public <KK extends Serializable, VV extends Serializable> void removeCacheListener( ICacheListener<KK, VV> listener )
1541         throws IOException
1542     {
1543         for (String cacheName : cacheListenersMap.keySet())
1544         {
1545             removeCacheListener( cacheName, listener );
1546 
1547             if ( log.isInfoEnabled() )
1548             {
1549                 log.info( "Removing listener for cache [" + cacheName + "]" );
1550             }
1551         }
1552         return;
1553     }
1554 
1555     /**
1556      * Shuts down the remote server.
1557      * <p>
1558      * @throws IOException
1559      */
1560     public void shutdown()
1561         throws IOException
1562     {
1563         RemoteCacheServerFactory.shutdownImpl( "", Registry.REGISTRY_PORT );
1564     }
1565 
1566     /**
1567      * Shuts down a server at a particular host and port. Then it calls shutdown on the cache
1568      * itself.
1569      * <p>
1570      * @param host
1571      * @param port
1572      * @throws IOException
1573      */
1574     public void shutdown( String host, int port )
1575         throws IOException
1576     {
1577         if ( log.isInfoEnabled() )
1578         {
1579             log.info( "Received shutdown request.  Shutting down server." );
1580         }
1581         RemoteCacheServerFactory.shutdownImpl( host, port );
1582         this.cacheManager.shutDown();
1583     }
1584 
1585     /**
1586      * Called by the RMI runtime sometime after the runtime determines that the reference list, the
1587      * list of clients referencing the remote object, becomes empty.
1588      */
1589     // TODO: test out the DGC.
1590     public void unreferenced()
1591     {
1592         if ( log.isInfoEnabled() )
1593         {
1594             log.info( "*** Server now unreferenced and subject to GC. ***" );
1595         }
1596     }
1597 
1598     /**
1599      * Returns the next generated listener id [0,255].
1600      * <p>
1601      * @return the listener id of a client. This should be unique for this server.
1602      */
1603     private long nextListenerId()
1604     {
1605         long id = 0;
1606         if ( listenerId[0] == Long.MAX_VALUE )
1607         {
1608             synchronized ( listenerId )
1609             {
1610                 id = listenerId[0];
1611                 listenerId[0] = 0;
1612                 // TODO: record & check if the generated id is currently being
1613                 // used by a valid listener. Currently if the id wraps after
1614                 // Long.MAX_VALUE,
1615                 // we just assume it won't collide with an existing listener who
1616                 // is live.
1617             }
1618         }
1619         else
1620         {
1621             synchronized ( listenerId )
1622             {
1623                 id = ++listenerId[0];
1624             }
1625         }
1626         return id;
1627     }
1628 
1629     /**
1630      * Gets the stats attribute of the RemoteCacheServer object.
1631      * <p>
1632      * @return The stats value
1633      * @throws IOException
1634      */
1635     public String getStats()
1636         throws IOException
1637     {
1638         return cacheManager.getStats();
1639     }
1640 
1641     /**
1642      * Logs an event if an event logger is configured.
1643      * <p>
1644      * @param item
1645      * @param requesterId
1646      * @param eventName
1647      * @return ICacheEvent
1648      */
1649     private ICacheEvent<ICacheElement<K, V>> createICacheEvent( ICacheElement<K, V> item, long requesterId, String eventName )
1650     {
1651         if ( cacheEventLogger == null )
1652         {
1653             return new CacheEvent<ICacheElement<K, V>>();
1654         }
1655         String ipAddress = getExtraInfoForRequesterId( requesterId );
1656         return cacheEventLogger
1657             .createICacheEvent( "RemoteCacheServer", item.getCacheName(), eventName, ipAddress, item );
1658     }
1659 
1660     /**
1661      * Logs an event if an event logger is configured.
1662      * <p>
1663      * @param cacheName
1664      * @param key
1665      * @param requesterId
1666      * @param eventName
1667      * @return ICacheEvent
1668      */
1669     private <T extends Serializable> ICacheEvent<T> createICacheEvent( String cacheName, T key, long requesterId, String eventName )
1670     {
1671         if ( cacheEventLogger == null )
1672         {
1673             return new CacheEvent<T>();
1674         }
1675         String ipAddress = getExtraInfoForRequesterId( requesterId );
1676         return cacheEventLogger.createICacheEvent( "RemoteCacheServer", cacheName, eventName, ipAddress, key );
1677     }
1678 
1679     /**
1680      * Logs an event if an event logger is configured.
1681      * <p>
1682      * @param source
1683      * @param eventName
1684      * @param optionalDetails
1685      */
1686     protected void logApplicationEvent( String source, String eventName, String optionalDetails )
1687     {
1688         if ( cacheEventLogger != null )
1689         {
1690             cacheEventLogger.logApplicationEvent( source, eventName, optionalDetails );
1691         }
1692     }
1693 
1694     /**
1695      * Logs an event if an event logger is configured.
1696      * <p>
1697      * @param cacheEvent
1698      */
1699     protected <T extends Serializable> void logICacheEvent( ICacheEvent<T> cacheEvent )
1700     {
1701         if ( cacheEventLogger != null )
1702         {
1703             cacheEventLogger.logICacheEvent( cacheEvent );
1704         }
1705     }
1706 
1707     /**
1708      * Ip address for the client, if one is stored.
1709      * <p>
1710      * Protected for testing.
1711      * <p>
1712      * @param requesterId
1713      * @return String
1714      */
1715     protected String getExtraInfoForRequesterId( long requesterId )
1716     {
1717         String ipAddress = idIPMap.get( Long.valueOf( requesterId ) );
1718         return ipAddress;
1719     }
1720 
1721     /**
1722      * Allows it to be injected.
1723      * <p>
1724      * @param cacheEventLogger
1725      */
1726     public void setCacheEventLogger( ICacheEventLogger cacheEventLogger )
1727     {
1728         this.cacheEventLogger = cacheEventLogger;
1729     }
1730 }