View Javadoc

1   package org.apache.jcs.auxiliary.remote.server;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
5    * agreements. See the NOTICE file distributed with this work for additional information regarding
6    * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance with the License. You may obtain a
8    * copy of the License at
9    *
10   * http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software distributed under the License
13   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
14   * or implied. See the License for the specific language governing permissions and limitations under
15   * the License.
16   */
17  
18  import java.io.IOException;
19  import java.io.Serializable;
20  import java.rmi.RemoteException;
21  import java.rmi.registry.Registry;
22  import java.rmi.server.RMISocketFactory;
23  import java.rmi.server.UnicastRemoteObject;
24  import java.rmi.server.Unreferenced;
25  import java.util.Collections;
26  import java.util.Hashtable;
27  import java.util.Iterator;
28  import java.util.Map;
29  import java.util.Set;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.jcs.access.exception.CacheException;
34  import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheAttributes;
35  import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheListener;
36  import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheObserver;
37  import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheService;
38  import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheServiceAdmin;
39  import org.apache.jcs.auxiliary.remote.server.behavior.IRemoteCacheServerAttributes;
40  import org.apache.jcs.engine.CacheEventQueueFactory;
41  import org.apache.jcs.engine.CacheListeners;
42  import org.apache.jcs.engine.behavior.ICacheElement;
43  import org.apache.jcs.engine.behavior.ICacheEventQueue;
44  import org.apache.jcs.engine.behavior.ICacheListener;
45  import org.apache.jcs.engine.control.CompositeCache;
46  import org.apache.jcs.engine.control.CompositeCacheManager;
47  import org.apache.jcs.engine.logging.CacheEvent;
48  import org.apache.jcs.engine.logging.behavior.ICacheEvent;
49  import org.apache.jcs.engine.logging.behavior.ICacheEventLogger;
50  
51  /**
52   * This class provides remote cache services. The remote cache server propagates events from local
53   * caches to other local caches. It can also store cached data, making it available to new clients.
54   * <p>
55   * Remote cache servers can be clustered. If the cache used by this remote cache is configured to
56   * use a remote cache of type cluster, the two remote caches will communicate with each other.
57   * Remote and put requests can be sent from one remote to another. If they are configured to
58   * broadcast such event to their client, then remove an puts can be sent to all locals in the
59   * cluster.
60   * <p>
61   * Get requests are made between clustered servers if AllowClusterGet is true. You can setup several
62   * clients to use one remote server and several to use another. The get local will be distributed
63   * between the two servers. Since caches are usually high get and low put, this should allow you to
64   * scale.
65   */
66  public class RemoteCacheServer
67      extends UnicastRemoteObject
68      implements IRemoteCacheService, IRemoteCacheObserver, IRemoteCacheServiceAdmin, Unreferenced
69  {
70      /** For serialization. Don't change. */
71      private static final long serialVersionUID = -8072345435941473116L;
72  
73      /** log instance */
74      private final static Log log = LogFactory.getLog( RemoteCacheServer.class );
75  
76      /** timing -- if we should record operation times. */
77      protected final static boolean timing = true;
78  
79      /** Number of puts into the cache. */
80      private int puts = 0;
81  
82      /** Maps cache name to CacheListeners object. association of listeners (regions). */
83      private final Hashtable<String, CacheListeners> cacheListenersMap =
84          new Hashtable<String, CacheListeners>();
85  
86      /** maps cluster listeners to regions. */
87      private final Hashtable<String, CacheListeners> clusterListenersMap =
88          new Hashtable<String, CacheListeners>();
89  
90      /** The central hub */
91      private CompositeCacheManager cacheManager;
92  
93      /** relates listener id with a type */
94      private final Hashtable<Long, Integer> idTypeMap = new Hashtable<Long, Integer>();
95  
96      /** relates listener id with an ip address */
97      private final Hashtable<Long, String> idIPMap = new Hashtable<Long, String>();
98  
99      /** Used to get the next listener id. */
100     private final int[] listenerId = new int[1];
101 
102     /** Configuration settings. */
103     protected IRemoteCacheServerAttributes remoteCacheServerAttributes;
104 
105     /** The interval at which we will log updates. */
106     private final int logInterval = 100;
107 
108     /** An optional event logger */
109     private transient ICacheEventLogger cacheEventLogger;
110 
111     /** If there is no event logger, we will return this event for all create calls. */
112     private static final ICacheEvent EMPTY_ICACHE_EVENT = new CacheEvent();
113 
114     /**
115      * Constructor for the RemoteCacheServer object. This initializes the server with the values
116      * from the config file.
117      * <p>
118      * @param rcsa
119      * @throws RemoteException
120      */
121     protected RemoteCacheServer( IRemoteCacheServerAttributes rcsa )
122         throws RemoteException
123     {
124         super( rcsa.getServicePort() );
125         this.remoteCacheServerAttributes = rcsa;
126         init( rcsa.getConfigFileName() );
127     }
128 
129     /**
130      * Constructor for the RemoteCacheServer object. This initializes the server with the values
131      * from the config file.
132      * <p>
133      * @param rcsa
134      * @param customRMISocketFactory
135      * @throws RemoteException
136      */
137     protected RemoteCacheServer( IRemoteCacheServerAttributes rcsa, RMISocketFactory customRMISocketFactory )
138         throws RemoteException
139     {
140         super( rcsa.getServicePort(), customRMISocketFactory, customRMISocketFactory );
141         this.remoteCacheServerAttributes = rcsa;
142         init( rcsa.getConfigFileName() );
143     }
144 
145     /**
146      * Initialize the RMI Cache Server from a properties file.
147      * <p>
148      * @param prop
149      * @throws RemoteException if the configuration of the cache manager instance fails
150      */
151     private void init( String prop ) throws RemoteException
152     {
153         try
154         {
155             cacheManager = createCacheManager( prop );
156         }
157         catch (CacheException e)
158         {
159             throw new RemoteException(e.getMessage(), e);
160         }
161 
162         // cacheManager would have created a number of ICache objects.
163         // Use these objects to set up the cacheListenersMap.
164         String[] list = cacheManager.getCacheNames();
165         for ( int i = 0; i < list.length; i++ )
166         {
167             String name = list[i];
168             cacheListenersMap.put( name, new CacheListeners( cacheManager.getCache( name ) ) );
169         }
170     }
171 
172     /**
173      * Subclass can override this method to create the specific cache manager.
174      * <p>
175      * @param prop The name of the configuration file.
176      * @return The cache hub configured with this configuration file.
177      *
178      * @throws CacheException if the configuration cannot be loaded
179      */
180     private CompositeCacheManager createCacheManager( String prop ) throws CacheException
181     {
182         CompositeCacheManager hub = CompositeCacheManager.getUnconfiguredInstance();
183 
184         if ( prop == null )
185         {
186             hub.configure( "/remote.cache.ccf" );
187         }
188         else
189         {
190             hub.configure( prop );
191         }
192         return hub;
193     }
194 
195     /**
196      * Puts a cache bean to the remote cache and notifies all listeners which <br>
197      * <ol>
198      * <li>have a different listener id than the originating host; <li>are currently subscribed to
199      * the related cache.
200      * </ol>
201      * <p>
202      * @param item
203      * @throws IOException
204      */
205     public void put( ICacheElement item )
206         throws IOException
207     {
208         update( item );
209     }
210 
211     /**
212      * @param item
213      * @throws IOException
214      */
215     public void update( ICacheElement item )
216         throws IOException
217     {
218         update( item, 0 );
219     }
220 
221     /**
222      * The internal processing is wrapped in event logging calls.
223      * <p>
224      * @param item
225      * @param requesterId
226      * @throws IOException
227      */
228     public void update( ICacheElement item, long requesterId )
229         throws IOException
230     {
231         ICacheEvent cacheEvent = createICacheEvent( item, requesterId, ICacheEventLogger.UPDATE_EVENT );
232         try
233         {
234             processUpdate( item, requesterId );
235         }
236         finally
237         {
238             logICacheEvent( cacheEvent );
239         }
240     }
241 
242     /**
243      * An update can come from either a local cache's remote auxiliary, or it can come from a remote
244      * server. A remote server is considered a a source of type cluster.
245      * <p>
246      * If the update came from a cluster, then we should tell the cache manager that this was a
247      * remote put. This way, any lateral and remote auxiliaries configured for the region will not
248      * be updated. This is basically how a remote listener works when plugged into a local cache.
249      * <p>
250      * If the cluster is configured to keep local cluster consistency, then all listeners will be
251      * updated. This allows cluster server A to update cluster server B and then B to update its
252      * clients if it is told to keep local cluster consistency. Otherwise, server A will update
253      * server B and B will not tell its clients. If you cluster using lateral caches for instance,
254      * this is how it will work. Updates to a cluster node, will never get to the leaves. The remote
255      * cluster, with local cluster consistency, allows you to update leaves. This basically allows
256      * you to have a failover remote server.
257      * <p>
258      * Since currently a cluster will not try to get from other cluster servers, you can scale a bit
259      * with a cluster configuration. Puts and removes will be broadcasted to all clients, but the
260      * get load on a remote server can be reduced.
261      * <p>
262      * @param item
263      * @param requesterId
264      */
265     private void processUpdate( ICacheElement item, long requesterId )
266     {
267         long start = 0;
268         if ( timing )
269         {
270             start = System.currentTimeMillis();
271         }
272 
273         logUpdateInfo( item );
274 
275         try
276         {
277             CacheListeners cacheDesc = getCacheListeners( item.getCacheName() );
278             /* Object val = */item.getVal();
279 
280             boolean fromCluster = isRequestFromCluster( requesterId );
281 
282             if ( log.isDebugEnabled() )
283             {
284                 log.debug( "In update, requesterId = [" + requesterId + "] fromCluster = " + fromCluster );
285             }
286 
287             // ordered cache item update and notification.
288             synchronized ( cacheDesc )
289             {
290                 try
291                 {
292                     CompositeCache c = (CompositeCache) cacheDesc.cache;
293 
294                     // If the source of this request was not from a cluster,
295                     // then consider it a local update. The cache manager will
296                     // try to
297                     // update all auxiliaries.
298                     //
299                     // This requires that two local caches not be connected to
300                     // two clustered remote caches. The failover runner will
301                     // have to make sure of this. ALos, the local cache needs
302                     // avoid updating this source. Will need to pass the source
303                     // id somehow. The remote cache should update all local
304                     // caches
305                     // but not update the cluster source. Cluster remote caches
306                     // should only be updated by the server and not the
307                     // RemoteCache.
308                     if ( fromCluster )
309                     {
310                         if ( log.isDebugEnabled() )
311                         {
312                             log.debug( "Put FROM cluster, NOT updating other auxiliaries for region. "
313                                 + " requesterId [" + requesterId + "]" );
314                         }
315                         c.localUpdate( item );
316                     }
317                     else
318                     {
319                         if ( log.isDebugEnabled() )
320                         {
321                             log.debug( "Put NOT from cluster, updating other auxiliaries for region. "
322                                 + " requesterId [" + requesterId + "]" );
323                         }
324                         c.update( item );
325                     }
326                 }
327                 catch ( Exception ce )
328                 {
329                     // swallow
330                     if ( log.isInfoEnabled() )
331                     {
332                         log.info( "Exception caught updating item. requesterId [" + requesterId + "] "
333                             + ce.getMessage() );
334                     }
335                 }
336 
337                 // UPDATE LOCALS IF A REQUEST COMES FROM A CLUSTER
338                 // IF LOCAL CLUSTER CONSISTENCY IS CONFIGURED
339                 if ( !fromCluster || ( fromCluster && remoteCacheServerAttributes.getLocalClusterConsistency() ) )
340                 {
341                     ICacheEventQueue[] qlist = getEventQList( cacheDesc, requesterId );
342 
343                     if ( qlist != null )
344                     {
345                         if ( log.isDebugEnabled() )
346                         {
347                             log.debug( "qlist.length = " + qlist.length );
348                         }
349                         for ( int i = 0; i < qlist.length; i++ )
350                         {
351                             qlist[i].addPutEvent( item );
352                         }
353                     }
354                     else
355                     {
356                         if ( log.isDebugEnabled() )
357                         {
358                             log.debug( "q list is null" );
359                         }
360                     }
361                 }
362             }
363         }
364         catch ( IOException e )
365         {
366             if ( cacheEventLogger != null )
367             {
368                 cacheEventLogger.logError( "RemoteCacheServer", ICacheEventLogger.UPDATE_EVENT, e.getMessage()
369                     + " REGION: " + item.getCacheName() + " ITEM: " + item );
370             }
371 
372             log.error( "Trouble in Update. requesterId [" + requesterId + "]", e );
373         }
374 
375         // TODO use JAMON for timing
376         if ( timing )
377         {
378             long end = System.currentTimeMillis();
379             if ( log.isDebugEnabled() )
380             {
381                 log.debug( "put took " + String.valueOf( end - start ) + " ms." );
382             }
383         }
384     }
385 
386     /**
387      * Log some details.
388      * <p>
389      * @param item
390      */
391     private void logUpdateInfo( ICacheElement item )
392     {
393         if ( log.isInfoEnabled() )
394         {
395             // not thread safe, but it doesn't have to be accurate
396             puts++;
397             if ( puts % logInterval == 0 )
398             {
399                 log.info( "puts = " + puts );
400             }
401         }
402 
403         if ( log.isDebugEnabled() )
404         {
405             log.debug( "In update, put [" + item.getKey() + "] in [" + item.getCacheName() + "]" );
406         }
407     }
408 
409     /**
410      * Returns a cache value from the specified remote cache; or null if the cache or key does not
411      * exist.
412      * <p>
413      * @param cacheName
414      * @param key
415      * @return ICacheElement
416      * @throws IOException
417      */
418     public ICacheElement get( String cacheName, Serializable key )
419         throws IOException
420     {
421         return this.get( cacheName, key, 0 );
422     }
423 
424     /**
425      * Returns a cache bean from the specified cache; or null if the key does not exist.
426      * <p>
427      * Adding the requestor id, allows the cache to determine the source of the get.
428      * <p>
429      * The internal processing is wrapped in event logging calls.
430      * <p>
431      * @param cacheName
432      * @param key
433      * @param requesterId
434      * @return ICacheElement
435      * @throws IOException
436      */
437     public ICacheElement get( String cacheName, Serializable key, long requesterId )
438         throws IOException
439     {
440         ICacheElement element = null;
441         ICacheEvent cacheEvent = createICacheEvent( cacheName, key, requesterId, ICacheEventLogger.GET_EVENT );
442         try
443         {
444             element = processGet( cacheName, key, requesterId );
445         }
446         finally
447         {
448             logICacheEvent( cacheEvent );
449         }
450         return element;
451     }
452 
453     /**
454      * Returns a cache bean from the specified cache; or null if the key does not exist.
455      * <p>
456      * Adding the requestor id, allows the cache to determine the source of the get.
457      * <p>
458      * @param cacheName
459      * @param key
460      * @param requesterId
461      * @return ICacheElement
462      */
463     private ICacheElement processGet( String cacheName, Serializable key, long requesterId )
464     {
465         boolean fromCluster = isRequestFromCluster( requesterId );
466 
467         if ( log.isDebugEnabled() )
468         {
469             log.debug( "get [" + key + "] from cache [" + cacheName + "] requesterId = [" + requesterId
470                 + "] fromCluster = " + fromCluster );
471         }
472 
473         CacheListeners cacheDesc = null;
474         try
475         {
476             cacheDesc = getCacheListeners( cacheName );
477         }
478         catch ( Exception e )
479         {
480             log.error( "Problem getting listeners.", e );
481 
482             if ( cacheEventLogger != null )
483             {
484                 cacheEventLogger.logError( "RemoteCacheServer", ICacheEventLogger.GET_EVENT, e.getMessage() + cacheName
485                     + " KEY: " + key );
486             }
487         }
488 
489         ICacheElement element = null;
490 
491         element = getFromCacheListeners( key, fromCluster, cacheDesc, element );
492         return element;
493     }
494 
495     /**
496      * Gets the item from the associated cache listeners.
497      * <p>
498      * @param key
499      * @param fromCluster
500      * @param cacheDesc
501      * @param element
502      * @return ICacheElement
503      */
504     private ICacheElement getFromCacheListeners( Serializable key, boolean fromCluster, CacheListeners cacheDesc,
505                                                  ICacheElement element )
506     {
507         if ( cacheDesc != null )
508         {
509             CompositeCache c = (CompositeCache) cacheDesc.cache;
510 
511             // If we have a get come in from a client and we don't have the item
512             // locally, we will allow the cache to look in other non local sources,
513             // such as a remote cache or a lateral.
514             //
515             // Since remote servers never get from clients and clients never go
516             // remote from a remote call, this
517             // will not result in any loops.
518             //
519             // This is the only instance I can think of where we allow a remote get
520             // from a remote call. The purpose is to allow remote cache servers to
521             // talk to each other. If one goes down, you want it to be able to get
522             // data from those that were up when the failed server comes back o
523             // line.
524 
525             if ( !fromCluster && this.remoteCacheServerAttributes.getAllowClusterGet() )
526             {
527                 if ( log.isDebugEnabled() )
528                 {
529                     log.debug( "NonLocalGet. fromCluster [" + fromCluster + "] AllowClusterGet ["
530                         + this.remoteCacheServerAttributes.getAllowClusterGet() + "]" );
531                 }
532                 element = c.get( key );
533             }
534             else
535             {
536                 // Gets from cluster type remote will end up here.
537                 // Gets from all clients will end up here if allow cluster get is
538                 // false.
539 
540                 if ( log.isDebugEnabled() )
541                 {
542                     log.debug( "LocalGet.  fromCluster [" + fromCluster + "] AllowClusterGet ["
543                         + this.remoteCacheServerAttributes.getAllowClusterGet() + "]" );
544                 }
545                 element = c.localGet( key );
546             }
547         }
548         return element;
549     }
550 
551     /**
552      * Gets all matching items.
553      * <p>
554      * @param cacheName
555      * @param pattern
556      * @return Map of keys and wrapped objects
557      * @throws IOException
558      */
559     public Map<Serializable, ICacheElement> getMatching( String cacheName, String pattern )
560         throws IOException
561     {
562         return getMatching( cacheName, pattern, 0 );
563     }
564 
565     /**
566      * Retrieves all matching keys.
567      * <p>
568      * @param cacheName
569      * @param pattern
570      * @param requesterId
571      * @return Map of keys and wrapped objects
572      * @throws IOException
573      */
574     public Map<Serializable, ICacheElement> getMatching( String cacheName, String pattern, long requesterId )
575         throws IOException
576     {
577         ICacheEvent cacheEvent = createICacheEvent( cacheName, pattern, requesterId,
578                                                     ICacheEventLogger.GETMATCHING_EVENT );
579         try
580         {
581             return processGetMatching( cacheName, pattern, requesterId );
582         }
583         finally
584         {
585             logICacheEvent( cacheEvent );
586         }
587     }
588 
589     /**
590      * Retrieves all matching keys.
591      * <p>
592      * @param cacheName
593      * @param pattern
594      * @param requesterId
595      * @return Map of keys and wrapped objects
596      */
597     protected Map<Serializable, ICacheElement> processGetMatching( String cacheName, String pattern, long requesterId )
598     {
599         boolean fromCluster = isRequestFromCluster( requesterId );
600 
601         if ( log.isDebugEnabled() )
602         {
603             log.debug( "getMatching [" + pattern + "] from cache [" + cacheName + "] requesterId = [" + requesterId
604                 + "] fromCluster = " + fromCluster );
605         }
606 
607         CacheListeners cacheDesc = null;
608         try
609         {
610             cacheDesc = getCacheListeners( cacheName );
611         }
612         catch ( Exception e )
613         {
614             log.error( "Problem getting listeners.", e );
615 
616             if ( cacheEventLogger != null )
617             {
618                 cacheEventLogger.logError( "RemoteCacheServer", ICacheEventLogger.GETMATCHING_EVENT, e.getMessage()
619                     + cacheName + " pattern: " + pattern );
620             }
621         }
622 
623         return getMatchingFromCacheListeners( pattern, fromCluster, cacheDesc );
624     }
625 
626     /**
627      * Gets the item from the associated cache listeners.
628      * <p>
629      * @param pattern
630      * @param fromCluster
631      * @param cacheDesc
632      * @return Map of keys to results
633      */
634     private Map<Serializable, ICacheElement> getMatchingFromCacheListeners( String pattern, boolean fromCluster, CacheListeners cacheDesc )
635     {
636         Map<Serializable, ICacheElement> elements = null;
637         if ( cacheDesc != null )
638         {
639             CompositeCache c = (CompositeCache) cacheDesc.cache;
640 
641             // We always want to go remote and then merge the items.  But this can lead to inconsistencies after
642             // failover recovery.  Removed items may show up.  There is no good way to prevent this.
643             // We should make it configurable.
644 
645             if ( !fromCluster && this.remoteCacheServerAttributes.getAllowClusterGet() )
646             {
647                 if ( log.isDebugEnabled() )
648                 {
649                     log.debug( "NonLocalGetMatching. fromCluster [" + fromCluster + "] AllowClusterGet ["
650                         + this.remoteCacheServerAttributes.getAllowClusterGet() + "]" );
651                 }
652                 elements = c.getMatching( pattern );
653             }
654             else
655             {
656                 // Gets from cluster type remote will end up here.
657                 // Gets from all clients will end up here if allow cluster get is
658                 // false.
659 
660                 if ( log.isDebugEnabled() )
661                 {
662                     log.debug( "LocalGetMatching.  fromCluster [" + fromCluster + "] AllowClusterGet ["
663                         + this.remoteCacheServerAttributes.getAllowClusterGet() + "]" );
664                 }
665                 elements = c.localGetMatching( pattern );
666             }
667         }
668         return elements;
669     }
670 
671     /**
672      * Gets multiple items from the cache based on the given set of keys.
673      * <p>
674      * @param cacheName
675      * @param keys
676      * @return a map of Serializable key to ICacheElement element, or an empty map if there is no
677      *         data in cache for any of these keys
678      * @throws IOException
679      */
680     public Map<Serializable, ICacheElement> getMultiple( String cacheName, Set<Serializable> keys )
681         throws IOException
682     {
683         return this.getMultiple( cacheName, keys, 0 );
684     }
685 
686     /**
687      * Gets multiple items from the cache based on the given set of keys.
688      * <p>
689      * The internal processing is wrapped in event logging calls.
690      * <p>
691      * @param cacheName
692      * @param keys
693      * @param requesterId
694      * @return a map of Serializable key to ICacheElement element, or an empty map if there is no
695      *         data in cache for any of these keys
696      * @throws IOException
697      */
698     public Map<Serializable, ICacheElement> getMultiple( String cacheName, Set<Serializable> keys, long requesterId )
699         throws IOException
700     {
701         ICacheEvent cacheEvent = createICacheEvent( cacheName, (Serializable) keys, requesterId,
702                                                     ICacheEventLogger.GETMULTIPLE_EVENT );
703         try
704         {
705             return processGetMultiple( cacheName, keys, requesterId );
706         }
707         finally
708         {
709             logICacheEvent( cacheEvent );
710         }
711     }
712 
713     /**
714      * Gets multiple items from the cache based on the given set of keys.
715      * <p>
716      * @param cacheName
717      * @param keys
718      * @param requesterId
719      * @return a map of Serializable key to ICacheElement element, or an empty map if there is no
720      *         data in cache for any of these keys
721      */
722     private Map<Serializable, ICacheElement> processGetMultiple( String cacheName, Set<Serializable> keys, long requesterId )
723     {
724         Map<Serializable, ICacheElement> elements = null;
725 
726         boolean fromCluster = isRequestFromCluster( requesterId );
727 
728         if ( log.isDebugEnabled() )
729         {
730             log.debug( "getMultiple [" + keys + "] from cache [" + cacheName + "] requesterId = [" + requesterId
731                 + "] fromCluster = " + fromCluster );
732         }
733 
734         CacheListeners cacheDesc = null;
735         try
736         {
737             cacheDesc = getCacheListeners( cacheName );
738         }
739         catch ( Exception e )
740         {
741             log.error( "Problem getting listeners.", e );
742         }
743 
744         elements = getMultipleFromCacheListeners( keys, elements, fromCluster, cacheDesc );
745         return elements;
746     }
747 
748     /**
749      * Since a non-receiving remote cache client will not register a listener, it will not have a
750      * listener id assigned from the server. As such the remote server cannot determine if it is a
751      * cluster or a normal client. It will assume that it is a normal client.
752      * <p>
753      * @param requesterId
754      * @return true is from a cluster.
755      */
756     private boolean isRequestFromCluster( long requesterId )
757     {
758         Integer remoteTypeL = idTypeMap.get( Long.valueOf( requesterId ) );
759 
760         boolean fromCluster = false;
761         if ( remoteTypeL != null && remoteTypeL.intValue() == IRemoteCacheAttributes.CLUSTER )
762         {
763             fromCluster = true;
764         }
765         return fromCluster;
766     }
767 
768     /**
769      * Gets the items from the associated cache listeners.
770      * <p>
771      * @param keys
772      * @param elements
773      * @param fromCluster
774      * @param cacheDesc
775      * @return Map
776      */
777     private Map<Serializable, ICacheElement> getMultipleFromCacheListeners( Set<Serializable> keys, Map<Serializable, ICacheElement> elements, boolean fromCluster, CacheListeners cacheDesc )
778     {
779         if ( cacheDesc != null )
780         {
781             CompositeCache c = (CompositeCache) cacheDesc.cache;
782 
783             // If we have a getMultiple come in from a client and we don't have the item
784             // locally, we will allow the cache to look in other non local sources,
785             // such as a remote cache or a lateral.
786             //
787             // Since remote servers never get from clients and clients never go
788             // remote from a remote call, this
789             // will not result in any loops.
790             //
791             // This is the only instance I can think of where we allow a remote get
792             // from a remote call. The purpose is to allow remote cache servers to
793             // talk to each other. If one goes down, you want it to be able to get
794             // data from those that were up when the failed server comes back on
795             // line.
796 
797             if ( !fromCluster && this.remoteCacheServerAttributes.getAllowClusterGet() )
798             {
799                 if ( log.isDebugEnabled() )
800                 {
801                     log.debug( "NonLocalGetMultiple. fromCluster [" + fromCluster + "] AllowClusterGet ["
802                         + this.remoteCacheServerAttributes.getAllowClusterGet() + "]" );
803                 }
804 
805                 elements = c.getMultiple( keys );
806             }
807             else
808             {
809                 // Gets from cluster type remote will end up here.
810                 // Gets from all clients will end up here if allow cluster get is
811                 // false.
812 
813                 if ( log.isDebugEnabled() )
814                 {
815                     log.debug( "LocalGetMultiple.  fromCluster [" + fromCluster + "] AllowClusterGet ["
816                         + this.remoteCacheServerAttributes.getAllowClusterGet() + "]" );
817                 }
818 
819                 elements = c.localGetMultiple( keys );
820             }
821         }
822         return elements;
823     }
824 
825     /**
826      * Gets the set of keys of objects currently in the group.
827      * <p>
828      * @param cacheName
829      * @param group
830      * @return A Set of group keys
831      */
832     public Set<Serializable> getGroupKeys( String cacheName, String group )
833     {
834         return processGetGroupKeys( cacheName, group );
835     }
836 
837     /**
838      * Gets the set of keys of objects currently in the group.
839      * <p>
840      * @param cacheName
841      * @param group
842      * @return Set
843      */
844     protected Set<Serializable> processGetGroupKeys( String cacheName, String group )
845     {
846         CacheListeners cacheDesc = null;
847         try
848         {
849             cacheDesc = getCacheListeners( cacheName );
850         }
851         catch ( Exception e )
852         {
853             log.error( "Problem getting listeners.", e );
854         }
855 
856         if ( cacheDesc == null )
857         {
858             return Collections.emptySet();
859         }
860 
861         CompositeCache c = (CompositeCache) cacheDesc.cache;
862         return c.getGroupKeys( group );
863     }
864 
865     /**
866      * Removes the given key from the specified remote cache. Defaults the listener id to 0.
867      * <p>
868      * @param cacheName
869      * @param key
870      * @throws IOException
871      */
872     public void remove( String cacheName, Serializable key )
873         throws IOException
874     {
875         remove( cacheName, key, 0 );
876     }
877 
878     /**
879      * Remove the key from the cache region and don't tell the source listener about it.
880      * <p>
881      * The internal processing is wrapped in event logging calls.
882      * <p>
883      * @param cacheName
884      * @param key
885      * @param requesterId
886      * @throws IOException
887      */
888     public void remove( String cacheName, Serializable key, long requesterId )
889         throws IOException
890     {
891         ICacheEvent cacheEvent = createICacheEvent( cacheName, key, requesterId, ICacheEventLogger.REMOVE_EVENT );
892         try
893         {
894             processRemove( cacheName, key, requesterId );
895         }
896         finally
897         {
898             logICacheEvent( cacheEvent );
899         }
900     }
901 
902     /**
903      * Remove the key from the cache region and don't tell the source listener about it.
904      * <p>
905      * @param cacheName
906      * @param key
907      * @param requesterId
908      * @throws IOException
909      */
910     private void processRemove( String cacheName, Serializable key, long requesterId )
911         throws IOException
912     {
913         if ( log.isDebugEnabled() )
914         {
915             log.debug( "remove [" + key + "] from cache [" + cacheName + "]" );
916         }
917 
918         CacheListeners cacheDesc = cacheListenersMap.get( cacheName );
919 
920         boolean fromCluster = isRequestFromCluster( requesterId );
921 
922         if ( cacheDesc != null )
923         {
924             // best attempt to achieve ordered cache item removal and
925             // notification.
926             synchronized ( cacheDesc )
927             {
928                 boolean removeSuccess = false;
929 
930                 // No need to notify if it was not cached.
931                 CompositeCache c = (CompositeCache) cacheDesc.cache;
932 
933                 if ( fromCluster )
934                 {
935                     if ( log.isDebugEnabled() )
936                     {
937                         log.debug( "Remove FROM cluster, NOT updating other auxiliaries for region" );
938                     }
939                     removeSuccess = c.localRemove( key );
940                 }
941                 else
942                 {
943                     if ( log.isDebugEnabled() )
944                     {
945                         log.debug( "Remove NOT from cluster, updating other auxiliaries for region" );
946                     }
947                     removeSuccess = c.remove( key );
948                 }
949 
950                 if ( log.isDebugEnabled() )
951                 {
952                     log.debug( "remove [" + key + "] from cache [" + cacheName + "] success (was it found) = "
953                         + removeSuccess );
954                 }
955 
956                 // UPDATE LOCALS IF A REQUEST COMES FROM A CLUSTER
957                 // IF LOCAL CLUSTER CONSISTENCY IS CONFIGURED
958                 if ( !fromCluster || ( fromCluster && remoteCacheServerAttributes.getLocalClusterConsistency() ) )
959                 {
960                     ICacheEventQueue[] qlist = getEventQList( cacheDesc, requesterId );
961 
962                     for ( int i = 0; i < qlist.length; i++ )
963                     {
964                         qlist[i].addRemoveEvent( key );
965                     }
966                 }
967             }
968         }
969     }
970 
971     /**
972      * Remove all keys from the specified remote cache.
973      * <p>
974      * @param cacheName
975      * @throws IOException
976      */
977     public void removeAll( String cacheName )
978         throws IOException
979     {
980         removeAll( cacheName, 0 );
981     }
982 
983     /**
984      * Remove all keys from the specified remote cache.
985      * <p>
986      * The internal processing is wrapped in event logging calls.
987      * <p>
988      * @param cacheName
989      * @param requesterId
990      * @throws IOException
991      */
992     public void removeAll( String cacheName, long requesterId )
993         throws IOException
994     {
995         ICacheEvent cacheEvent = createICacheEvent( cacheName, "all", requesterId, ICacheEventLogger.REMOVEALL_EVENT );
996         try
997         {
998             processRemoveAll( cacheName, requesterId );
999         }
1000         finally
1001         {
1002             logICacheEvent( cacheEvent );
1003         }
1004     }
1005 
1006     /**
1007      * Remove all keys from the specified remote cache.
1008      * <p>
1009      * @param cacheName
1010      * @param requesterId
1011      * @throws IOException
1012      */
1013     private void processRemoveAll( String cacheName, long requesterId )
1014         throws IOException
1015     {
1016         CacheListeners cacheDesc = cacheListenersMap.get( cacheName );
1017 
1018         boolean fromCluster = isRequestFromCluster( requesterId );
1019 
1020         if ( cacheDesc != null )
1021         {
1022             // best attempt to achieve ordered cache item removal and
1023             // notification.
1024             synchronized ( cacheDesc )
1025             {
1026                 // No need to broadcast, or notify if it was not cached.
1027                 CompositeCache c = (CompositeCache) cacheDesc.cache;
1028 
1029                 if ( fromCluster )
1030                 {
1031                     if ( log.isDebugEnabled() )
1032                     {
1033                         log.debug( "RemoveALL FROM cluster, NOT updating other auxiliaries for region" );
1034                     }
1035                     c.localRemoveAll();
1036                 }
1037                 else
1038                 {
1039                     if ( log.isDebugEnabled() )
1040                     {
1041                         log.debug( "RemoveALL NOT from cluster, updating other auxiliaries for region" );
1042                     }
1043                     c.removeAll();
1044                 }
1045 
1046                 // update registered listeners
1047                 if ( !fromCluster || ( fromCluster && remoteCacheServerAttributes.getLocalClusterConsistency() ) )
1048                 {
1049                     ICacheEventQueue[] qlist = getEventQList( cacheDesc, requesterId );
1050 
1051                     for ( int i = 0; i < qlist.length; i++ )
1052                     {
1053                         qlist[i].addRemoveAllEvent();
1054                     }
1055                 }
1056             }
1057         }
1058     }
1059 
1060     /**
1061      * How many put events have we received.
1062      * <p>
1063      * @return puts
1064      */
1065     protected int getPutCount()
1066     {
1067         return puts;
1068     }
1069 
1070     /**
1071      * Frees the specified remote cache.
1072      * <p>
1073      * @param cacheName
1074      * @throws IOException
1075      */
1076     public void dispose( String cacheName )
1077         throws IOException
1078     {
1079         dispose( cacheName, 0 );
1080     }
1081 
1082     /**
1083      * Frees the specified remote cache.
1084      * <p>
1085      * @param cacheName
1086      * @param requesterId
1087      * @throws IOException
1088      */
1089     public void dispose( String cacheName, long requesterId )
1090         throws IOException
1091     {
1092         ICacheEvent cacheEvent = createICacheEvent( cacheName, "none", requesterId, ICacheEventLogger.DISPOSE_EVENT );
1093         try
1094         {
1095             processDispose( cacheName, requesterId );
1096         }
1097         finally
1098         {
1099             logICacheEvent( cacheEvent );
1100         }
1101     }
1102 
1103     /**
1104      * @param cacheName
1105      * @param requesterId
1106      * @throws IOException
1107      */
1108     private void processDispose( String cacheName, long requesterId )
1109         throws IOException
1110     {
1111         if ( log.isInfoEnabled() )
1112         {
1113             log.info( "Dispose request received from listener [" + requesterId + "]" );
1114         }
1115 
1116         CacheListeners cacheDesc = cacheListenersMap.get( cacheName );
1117 
1118         // this is dangerous
1119         if ( cacheDesc != null )
1120         {
1121             // best attempt to achieve ordered free-cache-op and notification.
1122             synchronized ( cacheDesc )
1123             {
1124                 ICacheEventQueue[] qlist = getEventQList( cacheDesc, requesterId );
1125 
1126                 for ( int i = 0; i < qlist.length; i++ )
1127                 {
1128                     qlist[i].addDisposeEvent();
1129                 }
1130                 cacheManager.freeCache( cacheName );
1131             }
1132         }
1133     }
1134 
1135     /**
1136      * Frees all remote caches.
1137      * <p>
1138      * @throws IOException
1139      */
1140     public void release()
1141         throws IOException
1142     {
1143         synchronized ( cacheListenersMap )
1144         {
1145             for (CacheListeners cacheDesc : cacheListenersMap.values())
1146             {
1147                 ICacheEventQueue[] qlist = getEventQList( cacheDesc, 0 );
1148 
1149                 for ( int i = 0; i < qlist.length; i++ )
1150                 {
1151                     qlist[i].addDisposeEvent();
1152                 }
1153             }
1154             cacheManager.release();
1155         }
1156     }
1157 
1158     /**
1159      * Returns the cache listener for the specified cache. Creates the cache and the cache
1160      * descriptor if they do not already exist.
1161      * <p>
1162      * @param cacheName
1163      * @return The cacheListeners value
1164      */
1165     protected CacheListeners getCacheListeners( String cacheName )
1166     {
1167         CacheListeners cacheListeners = cacheListenersMap.get( cacheName );
1168         synchronized ( cacheListenersMap )
1169         {
1170             if ( cacheListeners == null )
1171             {
1172                 cacheListeners = cacheListenersMap.get( cacheName );
1173                 if ( cacheListeners == null )
1174                 {
1175                     cacheListeners = new CacheListeners( cacheManager.getCache( cacheName ) );
1176                     cacheListenersMap.put( cacheName, cacheListeners );
1177                 }
1178             }
1179         }
1180         return cacheListeners;
1181     }
1182 
1183     /**
1184      * Gets the clusterListeners attribute of the RemoteCacheServer object.
1185      * <p>
1186      * TODO may be able to remove this
1187      * @param cacheName
1188      * @return The clusterListeners value
1189      */
1190     protected CacheListeners getClusterListeners( String cacheName )
1191     {
1192         CacheListeners cacheListeners = clusterListenersMap.get( cacheName );
1193         synchronized ( clusterListenersMap )
1194         {
1195             if ( cacheListeners == null )
1196             {
1197                 cacheListeners = clusterListenersMap.get( cacheName );
1198                 if ( cacheListeners == null )
1199                 {
1200                     cacheListeners = new CacheListeners( cacheManager.getCache( cacheName ) );
1201                     clusterListenersMap.put( cacheName, cacheListeners );
1202                 }
1203             }
1204         }
1205         return cacheListeners;
1206     }
1207 
1208     /**
1209      * Gets the eventQList attribute of the RemoteCacheServer object. This returns the event queues
1210      * stored in the cacheListeners object for a particular region, if the queue is not for this
1211      * requester.
1212      * <p>
1213      * Basically, this makes sure that a request from a particular local cache, identified by its
1214      * listener id, does not result in a call to that same listener.
1215      * <p>
1216      * @param cacheListeners
1217      * @param requesterId
1218      * @return The eventQList value
1219      */
1220     private ICacheEventQueue[] getEventQList( CacheListeners cacheListeners, long requesterId )
1221     {
1222         ICacheEventQueue[] list = null;
1223         synchronized ( cacheListeners.eventQMap )
1224         {
1225             list = cacheListeners.eventQMap.values().toArray( new ICacheEventQueue[0] );
1226         }
1227         int count = 0;
1228         // Set those not qualified to null; Count those qualified.
1229         for ( int i = 0; i < list.length; i++ )
1230         {
1231             ICacheEventQueue q = list[i];
1232             if ( q.isWorking() && q.getListenerId() != requesterId )
1233             {
1234                 count++;
1235             }
1236             else
1237             {
1238                 list[i] = null;
1239             }
1240         }
1241         if ( count == list.length )
1242         {
1243             // All qualified.
1244             return list;
1245         }
1246 
1247         // Returns only the qualified.
1248         ICacheEventQueue[] qq = new ICacheEventQueue[count];
1249         count = 0;
1250         for ( int i = 0; i < list.length; i++ )
1251         {
1252             if ( list[i] != null )
1253             {
1254                 qq[count++] = list[i];
1255             }
1256         }
1257         return qq;
1258     }
1259 
1260     /**
1261      * Removes dead event queues. Should clean out deregistered listeners.
1262      * <p>
1263      * @param eventQMap
1264      */
1265     private static void cleanupEventQMap( Map<Long, ICacheEventQueue> eventQMap )
1266     {
1267         synchronized ( eventQMap )
1268         {
1269             for (Iterator<Map.Entry<Long, ICacheEventQueue>> itr = eventQMap.entrySet().iterator(); itr.hasNext(); )
1270             {
1271                 Map.Entry<Long, ICacheEventQueue> e = itr.next();
1272                 ICacheEventQueue q = e.getValue();
1273 
1274                 // this does not care if the q is alive (i.e. if
1275                 // there are active threads; it cares if the queue
1276                 // is working -- if it has not encountered errors
1277                 // above the failure threshold
1278                 if ( !q.isWorking() )
1279                 {
1280                     itr.remove();
1281                     log.warn( "Cache event queue " + q + " is not working and removed from cache server." );
1282                 }
1283             }
1284         }
1285     }
1286 
1287     /**
1288      * Subscribes to the specified remote cache.
1289      * <p>
1290      * If the client id is 0, then the remote cache server will increment it's local count and
1291      * assign an id to the client.
1292      * <p>
1293      * @param cacheName the specified remote cache.
1294      * @param listener object to notify for cache changes. must be synchronized since there are
1295      *            remote calls involved.
1296      * @throws IOException
1297      */
1298     public void addCacheListener( String cacheName, ICacheListener listener )
1299         throws IOException
1300     {
1301         if ( cacheName == null || listener == null )
1302         {
1303             throw new IllegalArgumentException( "cacheName and listener must not be null" );
1304         }
1305         CacheListeners cacheListeners;
1306 
1307         IRemoteCacheListener ircl = (IRemoteCacheListener) listener;
1308 
1309         String listenerAddress = ircl.getLocalHostAddress();
1310 
1311         int remoteType = ircl.getRemoteType();
1312         if ( remoteType == IRemoteCacheAttributes.CLUSTER )
1313         {
1314             log.debug( "adding cluster listener, listenerAddress [" + listenerAddress + "]" );
1315             cacheListeners = getClusterListeners( cacheName );
1316         }
1317         else
1318         {
1319             log.debug( "adding normal listener, listenerAddress [" + listenerAddress + "]" );
1320             cacheListeners = getCacheListeners( cacheName );
1321         }
1322         Map<Long, ICacheEventQueue> eventQMap = cacheListeners.eventQMap;
1323         cleanupEventQMap( eventQMap );
1324 
1325         // synchronized ( listenerId )
1326         synchronized ( ICacheListener.class )
1327         {
1328             long id = 0;
1329             try
1330             {
1331                 id = listener.getListenerId();
1332                 // clients probably shouldn't do this.
1333                 if ( id == 0 )
1334                 {
1335                     // must start at one so the next gets recognized
1336                     long listenerIdB = nextListenerId();
1337                     if ( log.isDebugEnabled() )
1338                     {
1339                         log.debug( "listener id=" + ( listenerIdB & 0xff ) + " addded for cache [" + cacheName
1340                             + "], listenerAddress [" + listenerAddress + "]" );
1341                     }
1342                     listener.setListenerId( listenerIdB );
1343                     id = listenerIdB;
1344 
1345                     // in case it needs synchronization
1346                     String message = "Adding vm listener under new id = [" + listenerIdB + "], listenerAddress ["
1347                         + listenerAddress + "]";
1348                     logApplicationEvent( "RemoteCacheServer", "addCacheListener", message );
1349                     if ( log.isInfoEnabled() )
1350                     {
1351                         log.info( message );
1352                     }
1353                 }
1354                 else
1355                 {
1356                     String message = "Adding listener under existing id = [" + id + "], listenerAddress ["
1357                         + listenerAddress + "]";
1358                     logApplicationEvent( "RemoteCacheServer", "addCacheListener", message );
1359                     if ( log.isInfoEnabled() )
1360                     {
1361                         log.info( message );
1362                     }
1363                     // should confirm the the host is the same as we have on
1364                     // record, just in case a client has made a mistake.
1365                 }
1366 
1367                 // relate the type to an id
1368                 this.idTypeMap.put( Long.valueOf( id ), Integer.valueOf( remoteType ) );
1369                 if ( listenerAddress != null )
1370                 {
1371                     this.idIPMap.put( Long.valueOf( id ), listenerAddress );
1372                 }
1373             }
1374             catch ( IOException ioe )
1375             {
1376                 String message = "Problem setting listener id, listenerAddress [" + listenerAddress + "]";
1377                 log.error( message, ioe );
1378 
1379                 if ( cacheEventLogger != null )
1380                 {
1381                     cacheEventLogger.logError( "RemoteCacheServer", "addCacheListener", message + " - "
1382                         + ioe.getMessage() );
1383                 }
1384             }
1385 
1386             CacheEventQueueFactory fact = new CacheEventQueueFactory();
1387             ICacheEventQueue q = fact.createCacheEventQueue( listener, id, cacheName, remoteCacheServerAttributes
1388                 .getEventQueuePoolName(), remoteCacheServerAttributes.getEventQueueType() );
1389 
1390             eventQMap.put(Long.valueOf(listener.getListenerId()), q);
1391 
1392             if ( log.isInfoEnabled() )
1393             {
1394                 log.info( cacheListeners );
1395             }
1396         }
1397     }
1398 
1399     /**
1400      * Subscribes to all remote caches.
1401      * <p>
1402      * @param listener The feature to be added to the CacheListener attribute
1403      * @throws IOException
1404      */
1405     public void addCacheListener( ICacheListener listener )
1406         throws IOException
1407     {
1408         for (String cacheName : cacheListenersMap.keySet())
1409         {
1410             addCacheListener( cacheName, listener );
1411 
1412             if ( log.isDebugEnabled() )
1413             {
1414                 log.debug( "Adding listener for cache [" + cacheName + "]" );
1415             }
1416         }
1417     }
1418 
1419     /**
1420      * Unsubscribe this listener from this region. If the listener is registered, it will be removed
1421      * from the event queue map list.
1422      * <p>
1423      * @param cacheName
1424      * @param listener
1425      * @throws IOException
1426      */
1427     public void removeCacheListener( String cacheName, ICacheListener listener )
1428         throws IOException
1429     {
1430         removeCacheListener( cacheName, listener.getListenerId() );
1431     }
1432 
1433     /**
1434      * Unsubscribe this listener from this region. If the listener is registered, it will be removed
1435      * from the event queue map list.
1436      * <p>
1437      * @param cacheName
1438      * @param listenerId
1439      */
1440     public void removeCacheListener( String cacheName, long listenerId )
1441     {
1442         String message = "Removing listener for cache region = [" + cacheName + "] and listenerId [" + listenerId + "]";
1443         logApplicationEvent( "RemoteCacheServer", "removeCacheListener", message );
1444         if ( log.isInfoEnabled() )
1445         {
1446             log.info( message );
1447         }
1448 
1449         boolean isClusterListener = isRequestFromCluster( listenerId );
1450 
1451         CacheListeners cacheDesc = null;
1452 
1453         if ( isClusterListener )
1454         {
1455             cacheDesc = getClusterListeners( cacheName );
1456         }
1457         else
1458         {
1459             cacheDesc = getCacheListeners( cacheName );
1460         }
1461         Map<Long, ICacheEventQueue> eventQMap = cacheDesc.eventQMap;
1462         cleanupEventQMap( eventQMap );
1463         ICacheEventQueue q = eventQMap.remove( Long.valueOf( listenerId ) );
1464 
1465         if ( q != null )
1466         {
1467             if ( log.isDebugEnabled() )
1468             {
1469                 log.debug( "Found queue for cache region = [" + cacheName + "] and listenerId  [" + listenerId + "]" );
1470             }
1471             q.destroy();
1472             cleanupEventQMap( eventQMap );
1473         }
1474         else
1475         {
1476             if ( log.isDebugEnabled() )
1477             {
1478                 log.debug( "Did not find queue for cache region = [" + cacheName + "] and listenerId [" + listenerId
1479                     + "]" );
1480             }
1481         }
1482 
1483         // cleanup
1484         idTypeMap.remove( Long.valueOf( listenerId ) );
1485         idIPMap.remove( Long.valueOf( listenerId ) );
1486 
1487         if ( log.isInfoEnabled() )
1488         {
1489             log.info( "After removing listener [" + listenerId + "] cache region " + cacheName + "'s listener size ["
1490                 + cacheDesc.eventQMap.size() + "]" );
1491         }
1492     }
1493 
1494     /**
1495      * Unsubscribes from all remote caches.
1496      * <p>
1497      * @param listener
1498      * @throws IOException
1499      */
1500     public void removeCacheListener( ICacheListener listener )
1501         throws IOException
1502     {
1503         for (String cacheName : cacheListenersMap.keySet())
1504         {
1505             removeCacheListener( cacheName, listener );
1506 
1507             if ( log.isInfoEnabled() )
1508             {
1509                 log.info( "Removing listener for cache [" + cacheName + "]" );
1510             }
1511         }
1512         return;
1513     }
1514 
1515     /**
1516      * Shuts down the remote server.
1517      * <p>
1518      * @throws IOException
1519      */
1520     public void shutdown()
1521         throws IOException
1522     {
1523         RemoteCacheServerFactory.shutdownImpl( "", Registry.REGISTRY_PORT );
1524     }
1525 
1526     /**
1527      * Shuts down a server at a particular host and port. Then it calls shutdown on the cache
1528      * itself.
1529      * <p>
1530      * @param host
1531      * @param port
1532      * @throws IOException
1533      */
1534     public void shutdown( String host, int port )
1535         throws IOException
1536     {
1537         if ( log.isInfoEnabled() )
1538         {
1539             log.info( "Received shutdown request.  Shutting down server." );
1540         }
1541         RemoteCacheServerFactory.shutdownImpl( host, port );
1542         this.cacheManager.shutDown();
1543     }
1544 
1545     /**
1546      * Called by the RMI runtime sometime after the runtime determines that the reference list, the
1547      * list of clients referencing the remote object, becomes empty.
1548      */
1549     // TODO: test out the DGC.
1550     public void unreferenced()
1551     {
1552         if ( log.isInfoEnabled() )
1553         {
1554             log.info( "*** Server now unreferenced and subject to GC. ***" );
1555         }
1556     }
1557 
1558     /**
1559      * Returns the next generated listener id [0,255].
1560      * <p>
1561      * @return the listener id of a client. This should be unique for this server.
1562      */
1563     private long nextListenerId()
1564     {
1565         long id = 0;
1566         if ( listenerId[0] == Long.MAX_VALUE )
1567         {
1568             synchronized ( listenerId )
1569             {
1570                 id = listenerId[0];
1571                 listenerId[0] = 0;
1572                 // TODO: record & check if the generated id is currently being
1573                 // used by a valid listener. Currently if the id wraps after
1574                 // Long.MAX_VALUE,
1575                 // we just assume it won't collide with an existing listener who
1576                 // is live.
1577             }
1578         }
1579         else
1580         {
1581             synchronized ( listenerId )
1582             {
1583                 id = ++listenerId[0];
1584             }
1585         }
1586         return id;
1587     }
1588 
1589     /**
1590      * Gets the stats attribute of the RemoteCacheServer object.
1591      * <p>
1592      * @return The stats value
1593      * @throws IOException
1594      */
1595     public String getStats()
1596         throws IOException
1597     {
1598         return cacheManager.getStats();
1599     }
1600 
1601     /**
1602      * Logs an event if an event logger is configured.
1603      * <p>
1604      * @param item
1605      * @param requesterId
1606      * @param eventName
1607      * @return ICacheEvent
1608      */
1609     private ICacheEvent createICacheEvent( ICacheElement item, long requesterId, String eventName )
1610     {
1611         if ( cacheEventLogger == null )
1612         {
1613             return EMPTY_ICACHE_EVENT;
1614         }
1615         String ipAddress = getExtraInfoForRequesterId( requesterId );
1616         return cacheEventLogger
1617             .createICacheEvent( "RemoteCacheServer", item.getCacheName(), eventName, ipAddress, item );
1618     }
1619 
1620     /**
1621      * Logs an event if an event logger is configured.
1622      * <p>
1623      * @param cacheName
1624      * @param key
1625      * @param requesterId
1626      * @param eventName
1627      * @return ICacheEvent
1628      */
1629     private ICacheEvent createICacheEvent( String cacheName, Serializable key, long requesterId, String eventName )
1630     {
1631         if ( cacheEventLogger == null )
1632         {
1633             return EMPTY_ICACHE_EVENT;
1634         }
1635         String ipAddress = getExtraInfoForRequesterId( requesterId );
1636         return cacheEventLogger.createICacheEvent( "RemoteCacheServer", cacheName, eventName, ipAddress, key );
1637     }
1638 
1639     /**
1640      * Logs an event if an event logger is configured.
1641      * <p>
1642      * @param source
1643      * @param eventName
1644      * @param optionalDetails
1645      */
1646     protected void logApplicationEvent( String source, String eventName, String optionalDetails )
1647     {
1648         if ( cacheEventLogger != null )
1649         {
1650             cacheEventLogger.logApplicationEvent( source, eventName, optionalDetails );
1651         }
1652     }
1653 
1654     /**
1655      * Logs an event if an event logger is configured.
1656      * <p>
1657      * @param cacheEvent
1658      */
1659     protected void logICacheEvent( ICacheEvent cacheEvent )
1660     {
1661         if ( cacheEventLogger != null )
1662         {
1663             cacheEventLogger.logICacheEvent( cacheEvent );
1664         }
1665     }
1666 
1667     /**
1668      * Ip address for the client, if one is stored.
1669      * <p>
1670      * Protected for testing.
1671      * <p>
1672      * @param requesterId
1673      * @return String
1674      */
1675     protected String getExtraInfoForRequesterId( long requesterId )
1676     {
1677         String ipAddress = idIPMap.get( Long.valueOf( requesterId ) );
1678         return ipAddress;
1679     }
1680 
1681     /**
1682      * Allows it to be injected.
1683      * <p>
1684      * @param cacheEventLogger
1685      */
1686     public void setCacheEventLogger( ICacheEventLogger cacheEventLogger )
1687     {
1688         this.cacheEventLogger = cacheEventLogger;
1689     }
1690 }