View Javadoc
1   package org.apache.commons.jcs3.auxiliary.remote.server;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.IOException;
23  import java.io.Serializable;
24  import java.rmi.RemoteException;
25  import java.rmi.registry.Registry;
26  import java.rmi.server.RMISocketFactory;
27  import java.rmi.server.UnicastRemoteObject;
28  import java.rmi.server.Unreferenced;
29  import java.util.Collections;
30  import java.util.Map;
31  import java.util.Properties;
32  import java.util.Set;
33  import java.util.concurrent.ConcurrentHashMap;
34  import java.util.concurrent.ConcurrentMap;
35  
36  import org.apache.commons.jcs3.access.exception.CacheException;
37  import org.apache.commons.jcs3.auxiliary.remote.behavior.IRemoteCacheListener;
38  import org.apache.commons.jcs3.auxiliary.remote.server.behavior.IRemoteCacheServer;
39  import org.apache.commons.jcs3.auxiliary.remote.server.behavior.IRemoteCacheServerAttributes;
40  import org.apache.commons.jcs3.auxiliary.remote.server.behavior.RemoteType;
41  import org.apache.commons.jcs3.engine.CacheEventQueueFactory;
42  import org.apache.commons.jcs3.engine.CacheListeners;
43  import org.apache.commons.jcs3.engine.behavior.ICacheElement;
44  import org.apache.commons.jcs3.engine.behavior.ICacheEventQueue;
45  import org.apache.commons.jcs3.engine.behavior.ICacheListener;
46  import org.apache.commons.jcs3.engine.control.CompositeCache;
47  import org.apache.commons.jcs3.engine.control.CompositeCacheManager;
48  import org.apache.commons.jcs3.engine.logging.CacheEvent;
49  import org.apache.commons.jcs3.engine.logging.behavior.ICacheEvent;
50  import org.apache.commons.jcs3.engine.logging.behavior.ICacheEventLogger;
51  import org.apache.commons.jcs3.log.Log;
52  import org.apache.commons.jcs3.log.LogManager;
53  import org.apache.commons.jcs3.utils.timing.ElapsedTimer;
54  
55  /**
56   * This class provides remote cache services. The remote cache server propagates events from local
57   * caches to other local caches. It can also store cached data, making it available to new clients.
58   * <p>
59   * Remote cache servers can be clustered. If the cache used by this remote cache is configured to
60   * use a remote cache of type cluster, the two remote caches will communicate with each other.
61   * Remote and put requests can be sent from one remote to another. If they are configured to
62   * broadcast such event to their client, then remove an puts can be sent to all locals in the
63   * cluster.
64   * <p>
65   * Get requests are made between clustered servers if AllowClusterGet is true. You can setup several
66   * clients to use one remote server and several to use another. The get local will be distributed
67   * between the two servers. Since caches are usually high get and low put, this should allow you to
68   * scale.
69   */
70  public class RemoteCacheServer<K, V>
71      extends UnicastRemoteObject
72      implements IRemoteCacheServer<K, V>, Unreferenced
73  {
74      public static final String DFEAULT_REMOTE_CONFIGURATION_FILE = "/remote.cache.ccf";
75  
76      /** For serialization. Don't change. */
77      private static final long serialVersionUID = -8072345435941473116L;
78  
79      /** log instance */
80      private static final Log log = LogManager.getLog( RemoteCacheServer.class );
81  
82      /** Number of puts into the cache. */
83      private int puts;
84  
85      /** Maps cache name to CacheListeners object. association of listeners (regions). */
86      private final transient ConcurrentMap<String, CacheListeners<K, V>> cacheListenersMap =
87          new ConcurrentHashMap<>();
88  
89      /** maps cluster listeners to regions. */
90      private final transient ConcurrentMap<String, CacheListeners<K, V>> clusterListenersMap =
91          new ConcurrentHashMap<>();
92  
93      /** The central hub */
94      private transient CompositeCacheManager cacheManager;
95  
96      /** relates listener id with a type */
97      private final ConcurrentMap<Long, RemoteType> idTypeMap = new ConcurrentHashMap<>();
98  
99      /** relates listener id with an ip address */
100     private final ConcurrentMap<Long, String> idIPMap = new ConcurrentHashMap<>();
101 
102     /** Used to get the next listener id. */
103     private final int[] listenerId = new int[1];
104 
105     /** Configuration settings. */
106     // package protected for access by unit test code
107     final IRemoteCacheServerAttributes remoteCacheServerAttributes;
108 
109     /** The interval at which we will log updates. */
110     private static final int logInterval = 100;
111 
112     /** An optional event logger */
113     private transient ICacheEventLogger cacheEventLogger;
114 
115     /**
116      * Constructor for the RemoteCacheServer object. This initializes the server with the values
117      * from the properties object.
118      * <p>
119      * @param rcsa
120      * @param config cache hub configuration
121      * @throws RemoteException
122      */
123     protected RemoteCacheServer( final IRemoteCacheServerAttributes rcsa, final Properties config )
124         throws RemoteException
125     {
126         super( rcsa.getServicePort() );
127         this.remoteCacheServerAttributes = rcsa;
128         init( config );
129     }
130 
131     /**
132      * Constructor for the RemoteCacheServer object. This initializes the server with the values
133      * from the properties object.
134      * <p>
135      * @param rcsa
136      * @param config cache hub configuration
137      * @param customRMISocketFactory
138      * @throws RemoteException
139      */
140     protected RemoteCacheServer( final IRemoteCacheServerAttributes rcsa, final Properties config, final RMISocketFactory customRMISocketFactory )
141         throws RemoteException
142     {
143         super( rcsa.getServicePort(), customRMISocketFactory, customRMISocketFactory );
144         this.remoteCacheServerAttributes = rcsa;
145         init( config );
146     }
147 
148     /**
149      * Initialize the RMI Cache Server from a properties object.
150      * <p>
151      * @param prop the configuration properties
152      * @throws RemoteException if the configuration of the cache manager instance fails
153      */
154     private void init( final Properties prop ) throws RemoteException
155     {
156         try
157         {
158             cacheManager = createCacheManager( prop );
159         }
160         catch (final CacheException e)
161         {
162             throw new RemoteException(e.getMessage(), e);
163         }
164 
165         // cacheManager would have created a number of ICache objects.
166         // Use these objects to set up the cacheListenersMap.
167         cacheManager.getCacheNames().forEach(name -> {
168             final CompositeCache<K, V> cache = cacheManager.getCache( name );
169             cacheListenersMap.put( name, new CacheListeners<>( cache ) );
170         });
171     }
172 
173     /**
174      * Subclass can override this method to create the specific cache manager.
175      * <p>
176      * @param prop the configuration object.
177      * @return The cache hub configured with this configuration.
178      *
179      * @throws CacheException if the configuration cannot be loaded
180      */
181     private static CompositeCacheManager createCacheManager( final Properties prop ) throws CacheException
182     {
183         final CompositeCacheManager hub = CompositeCacheManager.getUnconfiguredInstance();
184         hub.configure( prop );
185         return hub;
186     }
187 
188     /**
189      * Puts a cache bean to the remote cache and notifies all listeners which <br>
190      * <ol>
191      * <li>have a different listener id than the originating host;</li>
192      * <li>are currently subscribed to the related cache.</li>
193      * </ol>
194      * <p>
195      * @param item
196      * @throws IOException
197      */
198     public void put( final ICacheElement<K, V> item )
199         throws IOException
200     {
201         update( item );
202     }
203 
204     /**
205      * @param item
206      * @throws IOException
207      */
208     @Override
209     public void update( final ICacheElement<K, V> item )
210         throws IOException
211     {
212         update( item, 0 );
213     }
214 
215     /**
216      * The internal processing is wrapped in event logging calls.
217      * <p>
218      * @param item
219      * @param requesterId
220      * @throws IOException
221      */
222     @Override
223     public void update( final ICacheElement<K, V> item, final long requesterId )
224         throws IOException
225     {
226         final ICacheEvent<ICacheElement<K, V>> cacheEvent = createICacheEvent( item, requesterId, ICacheEventLogger.UPDATE_EVENT );
227         try
228         {
229             processUpdate( item, requesterId );
230         }
231         finally
232         {
233             logICacheEvent( cacheEvent );
234         }
235     }
236 
237     /**
238      * An update can come from either a local cache's remote auxiliary, or it can come from a remote
239      * server. A remote server is considered a source of type cluster.
240      * <p>
241      * If the update came from a cluster, then we should tell the cache manager that this was a
242      * remote put. This way, any lateral and remote auxiliaries configured for the region will not
243      * be updated. This is basically how a remote listener works when plugged into a local cache.
244      * <p>
245      * If the cluster is configured to keep local cluster consistency, then all listeners will be
246      * updated. This allows cluster server A to update cluster server B and then B to update its
247      * clients if it is told to keep local cluster consistency. Otherwise, server A will update
248      * server B and B will not tell its clients. If you cluster using lateral caches for instance,
249      * this is how it will work. Updates to a cluster node, will never get to the leaves. The remote
250      * cluster, with local cluster consistency, allows you to update leaves. This basically allows
251      * you to have a failover remote server.
252      * <p>
253      * Since currently a cluster will not try to get from other cluster servers, you can scale a bit
254      * with a cluster configuration. Puts and removes will be broadcasted to all clients, but the
255      * get load on a remote server can be reduced.
256      * <p>
257      * @param item
258      * @param requesterId
259      */
260     private void processUpdate( final ICacheElement<K, V> item, final long requesterId )
261     {
262         final ElapsedTimer timer = new ElapsedTimer();
263         logUpdateInfo( item );
264 
265         try
266         {
267             final CacheListeners<K, V> cacheDesc = getCacheListeners( item.getCacheName() );
268             final boolean fromCluster = isRequestFromCluster( requesterId );
269 
270             log.debug( "In update, requesterId = [{0}] fromCluster = {1}", requesterId, fromCluster );
271 
272             // ordered cache item update and notification.
273             synchronized ( cacheDesc )
274             {
275                 try
276                 {
277                     final CompositeCache<K, V> c = (CompositeCache<K, V>) cacheDesc.cache;
278 
279                     // If the source of this request was not from a cluster,
280                     // then consider it a local update. The cache manager will
281                     // try to
282                     // update all auxiliaries.
283                     //
284                     // This requires that two local caches not be connected to
285                     // two clustered remote caches. The failover runner will
286                     // have to make sure of this. ALos, the local cache needs
287                     // avoid updating this source. Will need to pass the source
288                     // id somehow. The remote cache should update all local
289                     // caches
290                     // but not update the cluster source. Cluster remote caches
291                     // should only be updated by the server and not the
292                     // RemoteCache.
293                     if ( fromCluster )
294                     {
295                         log.debug( "Put FROM cluster, NOT updating other auxiliaries for region. "
296                                 + " requesterId [{0}]", requesterId );
297                         c.localUpdate( item );
298                     }
299                     else
300                     {
301                         log.debug( "Put NOT from cluster, updating other auxiliaries for region. "
302                                 + " requesterId [{0}]", requesterId );
303                         c.update( item );
304                     }
305                 }
306                 catch ( final IOException ce )
307                 {
308                     // swallow
309                     log.info( "Exception caught updating item. requesterId [{0}]: {1}",
310                             requesterId, ce.getMessage() );
311                 }
312 
313                 // UPDATE LOCALS IF A REQUEST COMES FROM A CLUSTER
314                 // IF LOCAL CLUSTER CONSISTENCY IS CONFIGURED
315                 if (!fromCluster || fromCluster && remoteCacheServerAttributes.isLocalClusterConsistency())
316                 {
317                     final ICacheEventQueue<K, V>[] qlist = getEventQList( cacheDesc, requesterId );
318                     log.debug( "qlist.length = {0}", qlist.length );
319                     for (final ICacheEventQueue<K, V> element : qlist) {
320                         element.addPutEvent( item );
321                     }
322                 }
323             }
324         }
325         catch ( final IOException e )
326         {
327             if ( cacheEventLogger != null )
328             {
329                 cacheEventLogger.logError( "RemoteCacheServer", ICacheEventLogger.UPDATE_EVENT, e.getMessage()
330                     + " REGION: " + item.getCacheName() + " ITEM: " + item );
331             }
332 
333             log.error( "Trouble in Update. requesterId [{0}]", requesterId, e );
334         }
335 
336         // TODO use JAMON for timing
337         log.debug( "put took {0} ms.", timer::getElapsedTime);
338     }
339 
340     /**
341      * Log some details.
342      * <p>
343      * @param item
344      */
345     private void logUpdateInfo( final ICacheElement<K, V> item )
346     {
347         // not thread safe, but it doesn't have to be 100% accurate
348         puts++;
349 
350         if ( log.isInfoEnabled() && (puts % logInterval == 0) )
351         {
352             log.info( "puts = {0}", puts );
353         }
354 
355         log.debug( "In update, put [{0}] in [{1}]",
356                 item::getKey, item::getCacheName);
357     }
358 
359     /**
360      * Returns a cache value from the specified remote cache; or null if the cache or key does not
361      * exist.
362      * <p>
363      * @param cacheName
364      * @param key
365      * @return ICacheElement
366      * @throws IOException
367      */
368     @Override
369     public ICacheElement<K, V> get( final String cacheName, final K key )
370         throws IOException
371     {
372         return this.get( cacheName, key, 0 );
373     }
374 
375     /**
376      * Returns a cache bean from the specified cache; or null if the key does not exist.
377      * <p>
378      * Adding the requestor id, allows the cache to determine the source of the get.
379      * <p>
380      * The internal processing is wrapped in event logging calls.
381      * <p>
382      * @param cacheName
383      * @param key
384      * @param requesterId
385      * @return ICacheElement
386      * @throws IOException
387      */
388     @Override
389     public ICacheElement<K, V> get( final String cacheName, final K key, final long requesterId )
390         throws IOException
391     {
392         ICacheElement<K, V> element = null;
393         final ICacheEvent<K> cacheEvent = createICacheEvent( cacheName, key, requesterId, ICacheEventLogger.GET_EVENT );
394         try
395         {
396             element = processGet( cacheName, key, requesterId );
397         }
398         finally
399         {
400             logICacheEvent( cacheEvent );
401         }
402         return element;
403     }
404 
405     /**
406      * Returns a cache bean from the specified cache; or null if the key does not exist.
407      * <p>
408      * Adding the requester id, allows the cache to determine the source of the get.
409      * <p>
410      * @param cacheName
411      * @param key
412      * @param requesterId
413      * @return ICacheElement
414      */
415     private ICacheElement<K, V> processGet( final String cacheName, final K key, final long requesterId )
416     {
417         final boolean fromCluster = isRequestFromCluster( requesterId );
418 
419         log.debug( "get [{0}] from cache [{1}] requesterId = [{2}] fromCluster = {3}",
420                 key, cacheName, requesterId, fromCluster );
421 
422         final CacheListeners<K, V> cacheDesc = getCacheListeners( cacheName );
423 
424         return getFromCacheListeners( key, fromCluster, cacheDesc, null );
425     }
426 
427     /**
428      * Gets the item from the associated cache listeners.
429      * <p>
430      * @param key
431      * @param fromCluster
432      * @param cacheDesc
433      * @param element
434      * @return ICacheElement
435      */
436     private ICacheElement<K, V> getFromCacheListeners( final K key, final boolean fromCluster, final CacheListeners<K, V> cacheDesc,
437                                                  final ICacheElement<K, V> element )
438     {
439         ICacheElement<K, V> returnElement = element;
440 
441         if ( cacheDesc != null )
442         {
443             final CompositeCache<K, V> c = (CompositeCache<K, V>) cacheDesc.cache;
444 
445             // If we have a get come in from a client and we don't have the item
446             // locally, we will allow the cache to look in other non local sources,
447             // such as a remote cache or a lateral.
448             //
449             // Since remote servers never get from clients and clients never go
450             // remote from a remote call, this
451             // will not result in any loops.
452             //
453             // This is the only instance I can think of where we allow a remote get
454             // from a remote call. The purpose is to allow remote cache servers to
455             // talk to each other. If one goes down, you want it to be able to get
456             // data from those that were up when the failed server comes back o
457             // line.
458 
459             if ( !fromCluster && this.remoteCacheServerAttributes.isAllowClusterGet() )
460             {
461                 log.debug( "NonLocalGet. fromCluster [{0}] AllowClusterGet [{1}]",
462                         fromCluster, this.remoteCacheServerAttributes.isAllowClusterGet() );
463                 returnElement = c.get( key );
464             }
465             else
466             {
467                 // Gets from cluster type remote will end up here.
468                 // Gets from all clients will end up here if allow cluster get is
469                 // false.
470                 log.debug( "LocalGet. fromCluster [{0}] AllowClusterGet [{1}]",
471                         fromCluster, this.remoteCacheServerAttributes.isAllowClusterGet() );
472                 returnElement = c.localGet( key );
473             }
474         }
475 
476         return returnElement;
477     }
478 
479     /**
480      * Gets all matching items.
481      * <p>
482      * @param cacheName
483      * @param pattern
484      * @return Map of keys and wrapped objects
485      * @throws IOException
486      */
487     @Override
488     public Map<K, ICacheElement<K, V>> getMatching( final String cacheName, final String pattern )
489         throws IOException
490     {
491         return getMatching( cacheName, pattern, 0 );
492     }
493 
494     /**
495      * Retrieves all matching keys.
496      * <p>
497      * @param cacheName
498      * @param pattern
499      * @param requesterId
500      * @return Map of keys and wrapped objects
501      * @throws IOException
502      */
503     @Override
504     public Map<K, ICacheElement<K, V>> getMatching( final String cacheName, final String pattern, final long requesterId )
505         throws IOException
506     {
507         final ICacheEvent<String> cacheEvent = createICacheEvent( cacheName, pattern, requesterId,
508                                                     ICacheEventLogger.GETMATCHING_EVENT );
509         try
510         {
511             return processGetMatching( cacheName, pattern, requesterId );
512         }
513         finally
514         {
515             logICacheEvent( cacheEvent );
516         }
517     }
518 
519     /**
520      * Retrieves all matching keys.
521      * <p>
522      * @param cacheName
523      * @param pattern
524      * @param requesterId
525      * @return Map of keys and wrapped objects
526      */
527     protected Map<K, ICacheElement<K, V>> processGetMatching( final String cacheName, final String pattern, final long requesterId )
528     {
529         final boolean fromCluster = isRequestFromCluster( requesterId );
530 
531         log.debug( "getMatching [{0}] from cache [{1}] requesterId = [{2}] fromCluster = {3}",
532                 pattern, cacheName, requesterId, fromCluster );
533 
534         CacheListeners<K, V> cacheDesc = null;
535         try
536         {
537             cacheDesc = getCacheListeners( cacheName );
538         }
539         catch ( final Exception e )
540         {
541             log.error( "Problem getting listeners.", e );
542 
543             if ( cacheEventLogger != null )
544             {
545                 cacheEventLogger.logError( "RemoteCacheServer", ICacheEventLogger.GETMATCHING_EVENT, e.getMessage()
546                     + cacheName + " pattern: " + pattern );
547             }
548         }
549 
550         return getMatchingFromCacheListeners( pattern, fromCluster, cacheDesc );
551     }
552 
553     /**
554      * Gets the item from the associated cache listeners.
555      * <p>
556      * @param pattern
557      * @param fromCluster
558      * @param cacheDesc
559      * @return Map of keys to results
560      */
561     private Map<K, ICacheElement<K, V>> getMatchingFromCacheListeners( final String pattern, final boolean fromCluster, final CacheListeners<K, V> cacheDesc )
562     {
563         Map<K, ICacheElement<K, V>> elements = null;
564         if ( cacheDesc != null )
565         {
566             final CompositeCache<K, V> c = (CompositeCache<K, V>) cacheDesc.cache;
567 
568             // We always want to go remote and then merge the items.  But this can lead to inconsistencies after
569             // failover recovery.  Removed items may show up.  There is no good way to prevent this.
570             // We should make it configurable.
571 
572             if ( !fromCluster && this.remoteCacheServerAttributes.isAllowClusterGet() )
573             {
574                 log.debug( "NonLocalGetMatching. fromCluster [{0}] AllowClusterGet [{1}]",
575                         fromCluster, this.remoteCacheServerAttributes.isAllowClusterGet() );
576                 elements = c.getMatching( pattern );
577             }
578             else
579             {
580                 // Gets from cluster type remote will end up here.
581                 // Gets from all clients will end up here if allow cluster get is
582                 // false.
583 
584                 log.debug( "LocalGetMatching. fromCluster [{0}] AllowClusterGet [{1}]",
585                         fromCluster, this.remoteCacheServerAttributes.isAllowClusterGet() );
586                 elements = c.localGetMatching( pattern );
587             }
588         }
589         return elements;
590     }
591 
592     /**
593      * Gets multiple items from the cache based on the given set of keys.
594      * <p>
595      * @param cacheName
596      * @param keys
597      * @return a map of K key to ICacheElement&lt;K, V&gt; element, or an empty map if there is no
598      *         data in cache for any of these keys
599      * @throws IOException
600      */
601     @Override
602     public Map<K, ICacheElement<K, V>> getMultiple( final String cacheName, final Set<K> keys )
603         throws IOException
604     {
605         return this.getMultiple( cacheName, keys, 0 );
606     }
607 
608     /**
609      * Gets multiple items from the cache based on the given set of keys.
610      * <p>
611      * The internal processing is wrapped in event logging calls.
612      * <p>
613      * @param cacheName
614      * @param keys
615      * @param requesterId
616      * @return a map of K key to ICacheElement&lt;K, V&gt; element, or an empty map if there is no
617      *         data in cache for any of these keys
618      * @throws IOException
619      */
620     @Override
621     public Map<K, ICacheElement<K, V>> getMultiple( final String cacheName, final Set<K> keys, final long requesterId )
622         throws IOException
623     {
624         final ICacheEvent<Serializable> cacheEvent = createICacheEvent( cacheName, (Serializable) keys, requesterId,
625                                                     ICacheEventLogger.GETMULTIPLE_EVENT );
626         try
627         {
628             return processGetMultiple( cacheName, keys, requesterId );
629         }
630         finally
631         {
632             logICacheEvent( cacheEvent );
633         }
634     }
635 
636     /**
637      * Gets multiple items from the cache based on the given set of keys.
638      * <p>
639      * @param cacheName
640      * @param keys
641      * @param requesterId
642      * @return a map of K key to ICacheElement&lt;K, V&gt; element, or an empty map if there is no
643      *         data in cache for any of these keys
644      */
645     private Map<K, ICacheElement<K, V>> processGetMultiple( final String cacheName, final Set<K> keys, final long requesterId )
646     {
647         final boolean fromCluster = isRequestFromCluster( requesterId );
648 
649         log.debug( "getMultiple [{0}] from cache [{1}] requesterId = [{2}] fromCluster = {3}",
650                 keys, cacheName, requesterId, fromCluster );
651 
652         final CacheListeners<K, V> cacheDesc = getCacheListeners( cacheName );
653         return getMultipleFromCacheListeners( keys, null, fromCluster, cacheDesc );
654     }
655 
656     /**
657      * Since a non-receiving remote cache client will not register a listener, it will not have a
658      * listener id assigned from the server. As such the remote server cannot determine if it is a
659      * cluster or a normal client. It will assume that it is a normal client.
660      * <p>
661      * @param requesterId
662      * @return true is from a cluster.
663      */
664     private boolean isRequestFromCluster( final long requesterId )
665     {
666         final RemoteType remoteTypeL = idTypeMap.get( Long.valueOf( requesterId ) );
667         return remoteTypeL == RemoteType.CLUSTER;
668     }
669 
670     /**
671      * Gets the items from the associated cache listeners.
672      * <p>
673      * @param keys
674      * @param elements
675      * @param fromCluster
676      * @param cacheDesc
677      * @return Map
678      */
679     private Map<K, ICacheElement<K, V>> getMultipleFromCacheListeners( final Set<K> keys, final Map<K, ICacheElement<K, V>> elements, final boolean fromCluster, final CacheListeners<K, V> cacheDesc )
680     {
681         Map<K, ICacheElement<K, V>> returnElements = elements;
682 
683         if ( cacheDesc != null )
684         {
685             final CompositeCache<K, V> c = (CompositeCache<K, V>) cacheDesc.cache;
686 
687             // If we have a getMultiple come in from a client and we don't have the item
688             // locally, we will allow the cache to look in other non local sources,
689             // such as a remote cache or a lateral.
690             //
691             // Since remote servers never get from clients and clients never go
692             // remote from a remote call, this
693             // will not result in any loops.
694             //
695             // This is the only instance I can think of where we allow a remote get
696             // from a remote call. The purpose is to allow remote cache servers to
697             // talk to each other. If one goes down, you want it to be able to get
698             // data from those that were up when the failed server comes back on
699             // line.
700 
701             if ( !fromCluster && this.remoteCacheServerAttributes.isAllowClusterGet() )
702             {
703                 log.debug( "NonLocalGetMultiple. fromCluster [{0}] AllowClusterGet [{1}]",
704                         fromCluster, this.remoteCacheServerAttributes.isAllowClusterGet() );
705 
706                 returnElements = c.getMultiple( keys );
707             }
708             else
709             {
710                 // Gets from cluster type remote will end up here.
711                 // Gets from all clients will end up here if allow cluster get is
712                 // false.
713 
714                 log.debug( "LocalGetMultiple. fromCluster [{0}] AllowClusterGet [{1}]",
715                         fromCluster, this.remoteCacheServerAttributes.isAllowClusterGet() );
716 
717                 returnElements = c.localGetMultiple( keys );
718             }
719         }
720 
721         return returnElements;
722     }
723 
724     /**
725      * Return the keys in the cache.
726      * <p>
727      * @param cacheName the name of the cache region
728      * @see org.apache.commons.jcs3.auxiliary.AuxiliaryCache#getKeySet()
729      */
730     @Override
731     public Set<K> getKeySet(final String cacheName) throws IOException
732     {
733         return processGetKeySet( cacheName );
734     }
735 
736     /**
737      * Gets the set of keys of objects currently in the cache.
738      * <p>
739      * @param cacheName
740      * @return Set
741      */
742     protected Set<K> processGetKeySet( final String cacheName )
743     {
744         final CacheListeners<K, V> cacheDesc = getCacheListeners( cacheName );
745 
746         if ( cacheDesc == null )
747         {
748             return Collections.emptySet();
749         }
750 
751         final CompositeCache<K, V> c = (CompositeCache<K, V>) cacheDesc.cache;
752         return c.getKeySet();
753     }
754 
755     /**
756      * Removes the given key from the specified remote cache. Defaults the listener id to 0.
757      * <p>
758      * @param cacheName
759      * @param key
760      * @throws IOException
761      */
762     @Override
763     public void remove( final String cacheName, final K key )
764         throws IOException
765     {
766         remove( cacheName, key, 0 );
767     }
768 
769     /**
770      * Remove the key from the cache region and don't tell the source listener about it.
771      * <p>
772      * The internal processing is wrapped in event logging calls.
773      * <p>
774      * @param cacheName
775      * @param key
776      * @param requesterId
777      * @throws IOException
778      */
779     @Override
780     public void remove( final String cacheName, final K key, final long requesterId )
781         throws IOException
782     {
783         final ICacheEvent<K> cacheEvent = createICacheEvent( cacheName, key, requesterId, ICacheEventLogger.REMOVE_EVENT );
784         try
785         {
786             processRemove( cacheName, key, requesterId );
787         }
788         finally
789         {
790             logICacheEvent( cacheEvent );
791         }
792     }
793 
794     /**
795      * Remove the key from the cache region and don't tell the source listener about it.
796      * <p>
797      * @param cacheName
798      * @param key
799      * @param requesterId
800      * @throws IOException
801      */
802     private void processRemove( final String cacheName, final K key, final long requesterId )
803         throws IOException
804     {
805         log.debug( "remove [{0}] from cache [{1}]", key, cacheName );
806 
807         final CacheListeners<K, V> cacheDesc = cacheListenersMap.get( cacheName );
808 
809         final boolean fromCluster = isRequestFromCluster( requesterId );
810 
811         if ( cacheDesc != null )
812         {
813             // best attempt to achieve ordered cache item removal and
814             // notification.
815             synchronized ( cacheDesc )
816             {
817                 boolean removeSuccess = false;
818 
819                 // No need to notify if it was not cached.
820                 final CompositeCache<K, V> c = (CompositeCache<K, V>) cacheDesc.cache;
821 
822                 if ( fromCluster )
823                 {
824                     log.debug( "Remove FROM cluster, NOT updating other auxiliaries for region" );
825                     removeSuccess = c.localRemove( key );
826                 }
827                 else
828                 {
829                     log.debug( "Remove NOT from cluster, updating other auxiliaries for region" );
830                     removeSuccess = c.remove( key );
831                 }
832 
833                 log.debug( "remove [{0}] from cache [{1}] success (was it found) = {2}",
834                         key, cacheName, removeSuccess );
835 
836                 // UPDATE LOCALS IF A REQUEST COMES FROM A CLUSTER
837                 // IF LOCAL CLUSTER CONSISTENCY IS CONFIGURED
838                 if (!fromCluster || fromCluster && remoteCacheServerAttributes.isLocalClusterConsistency())
839                 {
840                     final ICacheEventQueue<K, V>[] qlist = getEventQList( cacheDesc, requesterId );
841 
842                     for (final ICacheEventQueue<K, V> element : qlist) {
843                         element.addRemoveEvent( key );
844                     }
845                 }
846             }
847         }
848     }
849 
850     /**
851      * Remove all keys from the specified remote cache.
852      * <p>
853      * @param cacheName
854      * @throws IOException
855      */
856     @Override
857     public void removeAll( final String cacheName )
858         throws IOException
859     {
860         removeAll( cacheName, 0 );
861     }
862 
863     /**
864      * Remove all keys from the specified remote cache.
865      * <p>
866      * The internal processing is wrapped in event logging calls.
867      * <p>
868      * @param cacheName
869      * @param requesterId
870      * @throws IOException
871      */
872     @Override
873     public void removeAll( final String cacheName, final long requesterId )
874         throws IOException
875     {
876         final ICacheEvent<String> cacheEvent = createICacheEvent( cacheName, "all", requesterId, ICacheEventLogger.REMOVEALL_EVENT );
877         try
878         {
879             processRemoveAll( cacheName, requesterId );
880         }
881         finally
882         {
883             logICacheEvent( cacheEvent );
884         }
885     }
886 
887     /**
888      * Remove all keys from the specified remote cache.
889      * <p>
890      * @param cacheName
891      * @param requesterId
892      * @throws IOException
893      */
894     private void processRemoveAll( final String cacheName, final long requesterId )
895         throws IOException
896     {
897         final CacheListeners<K, V> cacheDesc = cacheListenersMap.get( cacheName );
898 
899         final boolean fromCluster = isRequestFromCluster( requesterId );
900 
901         if ( cacheDesc != null )
902         {
903             // best attempt to achieve ordered cache item removal and
904             // notification.
905             synchronized ( cacheDesc )
906             {
907                 // No need to broadcast, or notify if it was not cached.
908                 final CompositeCache<K, V> c = (CompositeCache<K, V>) cacheDesc.cache;
909 
910                 if ( fromCluster )
911                 {
912                     log.debug( "RemoveALL FROM cluster, NOT updating other auxiliaries for region" );
913                     c.localRemoveAll();
914                 }
915                 else
916                 {
917                     log.debug( "RemoveALL NOT from cluster, updating other auxiliaries for region" );
918                     c.removeAll();
919                 }
920 
921                 // update registered listeners
922                 if (!fromCluster || fromCluster && remoteCacheServerAttributes.isLocalClusterConsistency())
923                 {
924                     final ICacheEventQueue<K, V>[] qlist = getEventQList( cacheDesc, requesterId );
925 
926                     for (final ICacheEventQueue<K, V> q : qlist)
927                     {
928                         q.addRemoveAllEvent();
929                     }
930                 }
931             }
932         }
933     }
934 
935     /**
936      * How many put events have we received.
937      * <p>
938      * @return puts
939      */
940     // Currently only intended for use by unit tests
941     int getPutCount()
942     {
943         return puts;
944     }
945 
946     /**
947      * Frees the specified remote cache.
948      * <p>
949      * @param cacheName
950      * @throws IOException
951      */
952     @Override
953     public void dispose( final String cacheName )
954         throws IOException
955     {
956         dispose( cacheName, 0 );
957     }
958 
959     /**
960      * Frees the specified remote cache.
961      * <p>
962      * @param cacheName
963      * @param requesterId
964      * @throws IOException
965      */
966     public void dispose( final String cacheName, final long requesterId )
967         throws IOException
968     {
969         final ICacheEvent<String> cacheEvent = createICacheEvent( cacheName, "none", requesterId, ICacheEventLogger.DISPOSE_EVENT );
970         try
971         {
972             processDispose( cacheName, requesterId );
973         }
974         finally
975         {
976             logICacheEvent( cacheEvent );
977         }
978     }
979 
980     /**
981      * @param cacheName
982      * @param requesterId
983      * @throws IOException
984      */
985     private void processDispose( final String cacheName, final long requesterId )
986         throws IOException
987     {
988         log.info( "Dispose request received from listener [{0}]", requesterId );
989 
990         final CacheListeners<K, V> cacheDesc = cacheListenersMap.get( cacheName );
991 
992         // this is dangerous
993         if ( cacheDesc != null )
994         {
995             // best attempt to achieve ordered free-cache-op and notification.
996             synchronized ( cacheDesc )
997             {
998                 final ICacheEventQueue<K, V>[] qlist = getEventQList( cacheDesc, requesterId );
999 
1000                 for (final ICacheEventQueue<K, V> element : qlist) {
1001                     element.addDisposeEvent();
1002                 }
1003                 cacheManager.freeCache( cacheName );
1004             }
1005         }
1006     }
1007 
1008     /**
1009      * Frees all remote caches.
1010      * <p>
1011      * @throws IOException
1012      */
1013     @Override
1014     public void release()
1015         throws IOException
1016     {
1017         for (final CacheListeners<K, V> cacheDesc : cacheListenersMap.values())
1018         {
1019             final ICacheEventQueue<K, V>[] qlist = getEventQList( cacheDesc, 0 );
1020 
1021             for (final ICacheEventQueue<K, V> element : qlist) {
1022                 element.addDisposeEvent();
1023             }
1024         }
1025         cacheManager.release();
1026     }
1027 
1028     /**
1029      * Returns the cache listener for the specified cache. Creates the cache and the cache
1030      * descriptor if they do not already exist.
1031      * <p>
1032      * @param cacheName
1033      * @return The cacheListeners value
1034      */
1035     protected CacheListeners<K, V> getCacheListeners( final String cacheName )
1036     {
1037 
1038         return cacheListenersMap.computeIfAbsent(cacheName, key -> {
1039             final CompositeCache<K, V> cache = cacheManager.getCache(key);
1040             return new CacheListeners<>( cache );
1041         });
1042     }
1043 
1044     /**
1045      * Gets the clusterListeners attribute of the RemoteCacheServer object.
1046      * <p>
1047      * TODO may be able to remove this
1048      * @param cacheName
1049      * @return The clusterListeners value
1050      */
1051     protected CacheListeners<K, V> getClusterListeners( final String cacheName )
1052     {
1053 
1054         return clusterListenersMap.computeIfAbsent(cacheName, key -> {
1055             final CompositeCache<K, V> cache = cacheManager.getCache( cacheName );
1056             return new CacheListeners<>( cache );
1057         });
1058     }
1059 
1060     /**
1061      * Gets the eventQList attribute of the RemoteCacheServer object. This returns the event queues
1062      * stored in the cacheListeners object for a particular region, if the queue is not for this
1063      * requester.
1064      * <p>
1065      * Basically, this makes sure that a request from a particular local cache, identified by its
1066      * listener id, does not result in a call to that same listener.
1067      * <p>
1068      * @param cacheListeners
1069      * @param requesterId
1070      * @return The eventQList value
1071      */
1072     @SuppressWarnings("unchecked") // No generic arrays in java
1073     private ICacheEventQueue<K, V>[] getEventQList( final CacheListeners<K, V> cacheListeners, final long requesterId )
1074     {
1075         final ICacheEventQueue<K, V>[] list = cacheListeners.eventQMap.values().toArray( new ICacheEventQueue[0] );
1076         int count = 0;
1077         // Set those not qualified to null; Count those qualified.
1078         for ( int i = 0; i < list.length; i++ )
1079         {
1080             final ICacheEventQueue<K, V> q = list[i];
1081             if ( q.isWorking() && q.getListenerId() != requesterId )
1082             {
1083                 count++;
1084             }
1085             else
1086             {
1087                 list[i] = null;
1088             }
1089         }
1090         if ( count == list.length )
1091         {
1092             // All qualified.
1093             return list;
1094         }
1095 
1096         // Returns only the qualified.
1097         final ICacheEventQueue<K, V>[] qq = new ICacheEventQueue[count];
1098         count = 0;
1099         for (final ICacheEventQueue<K, V> element : list) {
1100             if ( element != null )
1101             {
1102                 qq[count++] = element;
1103             }
1104         }
1105         return qq;
1106     }
1107 
1108     /**
1109      * Removes dead event queues. Should clean out deregistered listeners.
1110      * <p>
1111      * @param eventQMap
1112      */
1113     private static <KK, VV> void cleanupEventQMap( final Map<Long, ICacheEventQueue<KK, VV>> eventQMap )
1114     {
1115         // this does not care if the q is alive (i.e. if
1116         // there are active threads; it cares if the queue
1117         // is working -- if it has not encountered errors
1118         // above the failure threshold
1119         eventQMap.entrySet().removeIf(e -> !e.getValue().isWorking());
1120     }
1121 
1122     /**
1123      * Subscribes to the specified remote cache.
1124      * <p>
1125      * If the client id is 0, then the remote cache server will increment it's local count and
1126      * assign an id to the client.
1127      * <p>
1128      * @param cacheName the specified remote cache.
1129      * @param listener object to notify for cache changes. must be synchronized since there are
1130      *            remote calls involved.
1131      * @throws IOException
1132      */
1133     @Override
1134     @SuppressWarnings("unchecked") // Need to cast to specific return type from getClusterListeners()
1135     public <KK, VV> void addCacheListener( final String cacheName, final ICacheListener<KK, VV> listener )
1136         throws IOException
1137     {
1138         if ( cacheName == null || listener == null )
1139         {
1140             throw new IllegalArgumentException( "cacheName and listener must not be null" );
1141         }
1142         final CacheListeners<KK, VV> cacheListeners;
1143 
1144         final IRemoteCacheListener<KK, VV> ircl = (IRemoteCacheListener<KK, VV>) listener;
1145 
1146         final String listenerAddress = ircl.getLocalHostAddress();
1147 
1148         final RemoteType remoteType = ircl.getRemoteType();
1149         if ( remoteType == RemoteType.CLUSTER )
1150         {
1151             log.debug( "adding cluster listener, listenerAddress [{0}]", listenerAddress );
1152             cacheListeners = (CacheListeners<KK, VV>)getClusterListeners( cacheName );
1153         }
1154         else
1155         {
1156             log.debug( "adding normal listener, listenerAddress [{0}]", listenerAddress );
1157             cacheListeners = (CacheListeners<KK, VV>)getCacheListeners( cacheName );
1158         }
1159         final Map<Long, ICacheEventQueue<KK, VV>> eventQMap = cacheListeners.eventQMap;
1160         cleanupEventQMap( eventQMap );
1161 
1162         // synchronized ( listenerId )
1163         synchronized ( ICacheListener.class )
1164         {
1165             long id = 0;
1166             try
1167             {
1168                 id = listener.getListenerId();
1169                 // clients probably shouldn't do this.
1170                 if ( id == 0 )
1171                 {
1172                     // must start at one so the next gets recognized
1173                     final long listenerIdB = nextListenerId();
1174                     log.debug( "listener id={0} addded for cache [{1}], listenerAddress [{2}]",
1175                             listenerIdB & 0xff, cacheName, listenerAddress );
1176                     listener.setListenerId( listenerIdB );
1177                     id = listenerIdB;
1178 
1179                     // in case it needs synchronization
1180                     final String message = "Adding vm listener under new id = [" + listenerIdB + "], listenerAddress ["
1181                         + listenerAddress + "]";
1182                     logApplicationEvent( "RemoteCacheServer", "addCacheListener", message );
1183                     log.info( message );
1184                 }
1185                 else
1186                 {
1187                     final String message = "Adding listener under existing id = [" + id + "], listenerAddress ["
1188                         + listenerAddress + "]";
1189                     logApplicationEvent( "RemoteCacheServer", "addCacheListener", message );
1190                     log.info( message );
1191                     // should confirm the host is the same as we have on
1192                     // record, just in case a client has made a mistake.
1193                 }
1194 
1195                 // relate the type to an id
1196                 this.idTypeMap.put( Long.valueOf( id ), remoteType);
1197                 if ( listenerAddress != null )
1198                 {
1199                     this.idIPMap.put( Long.valueOf( id ), listenerAddress );
1200                 }
1201             }
1202             catch ( final IOException ioe )
1203             {
1204                 final String message = "Problem setting listener id, listenerAddress [" + listenerAddress + "]";
1205                 log.error( message, ioe );
1206 
1207                 if ( cacheEventLogger != null )
1208                 {
1209                     cacheEventLogger.logError( "RemoteCacheServer", "addCacheListener", message + " - "
1210                         + ioe.getMessage() );
1211                 }
1212             }
1213 
1214             final CacheEventQueueFactory<KK, VV> fact = new CacheEventQueueFactory<>();
1215             final ICacheEventQueue<KK, VV> q = fact.createCacheEventQueue( listener, id, cacheName, remoteCacheServerAttributes
1216                 .getEventQueuePoolName(), remoteCacheServerAttributes.getEventQueueType() );
1217 
1218             eventQMap.put(Long.valueOf(listener.getListenerId()), q);
1219 
1220             log.info( cacheListeners );
1221         }
1222     }
1223 
1224     /**
1225      * Subscribes to all remote caches.
1226      * <p>
1227      * @param listener The feature to be added to the CacheListener attribute
1228      * @throws IOException
1229      */
1230     @Override
1231     public <KK, VV> void addCacheListener( final ICacheListener<KK, VV> listener )
1232         throws IOException
1233     {
1234         for (final String cacheName : cacheListenersMap.keySet())
1235         {
1236             addCacheListener( cacheName, listener );
1237 
1238             log.debug( "Adding listener for cache [{0}]", cacheName );
1239         }
1240     }
1241 
1242     /**
1243      * Unsubscribe this listener from this region. If the listener is registered, it will be removed
1244      * from the event queue map list.
1245      * <p>
1246      * @param cacheName
1247      * @param listener
1248      * @throws IOException
1249      */
1250     @Override
1251     public <KK, VV> void removeCacheListener( final String cacheName, final ICacheListener<KK, VV> listener )
1252         throws IOException
1253     {
1254         removeCacheListener( cacheName, listener.getListenerId() );
1255     }
1256 
1257     /**
1258      * Unsubscribe this listener from this region. If the listener is registered, it will be removed
1259      * from the event queue map list.
1260      * <p>
1261      * @param cacheName
1262      * @param listenerId
1263      */
1264     public void removeCacheListener( final String cacheName, final long listenerId )
1265     {
1266         final String message = "Removing listener for cache region = [" + cacheName + "] and listenerId [" + listenerId + "]";
1267         logApplicationEvent( "RemoteCacheServer", "removeCacheListener", message );
1268         log.info( message );
1269 
1270         final boolean isClusterListener = isRequestFromCluster( listenerId );
1271 
1272         CacheListeners<K, V> cacheDesc = null;
1273 
1274         if ( isClusterListener )
1275         {
1276             cacheDesc = getClusterListeners( cacheName );
1277         }
1278         else
1279         {
1280             cacheDesc = getCacheListeners( cacheName );
1281         }
1282         final Map<Long, ICacheEventQueue<K, V>> eventQMap = cacheDesc.eventQMap;
1283         cleanupEventQMap( eventQMap );
1284         final ICacheEventQueue<K, V> q = eventQMap.remove( Long.valueOf( listenerId ) );
1285 
1286         if ( q != null )
1287         {
1288             log.debug( "Found queue for cache region = [{0}] and listenerId [{1}]",
1289                     cacheName, listenerId );
1290             q.destroy();
1291             cleanupEventQMap( eventQMap );
1292         }
1293         else
1294         {
1295             log.debug( "Did not find queue for cache region = [{0}] and listenerId [{1}]",
1296                     cacheName, listenerId );
1297         }
1298 
1299         // cleanup
1300         idTypeMap.remove( Long.valueOf( listenerId ) );
1301         idIPMap.remove( Long.valueOf( listenerId ) );
1302 
1303         log.info( "After removing listener [{0}] cache region {1} listener size [{2}]",
1304                 listenerId, cacheName, eventQMap.size() );
1305     }
1306 
1307     /**
1308      * Unsubscribes from all remote caches.
1309      * <p>
1310      * @param listener
1311      * @throws IOException
1312      */
1313     @Override
1314     public <KK, VV> void removeCacheListener( final ICacheListener<KK, VV> listener )
1315         throws IOException
1316     {
1317         for (final String cacheName : cacheListenersMap.keySet())
1318         {
1319             removeCacheListener( cacheName, listener );
1320 
1321             log.info( "Removing listener for cache [{0}]", cacheName );
1322         }
1323     }
1324 
1325     /**
1326      * Shuts down the remote server.
1327      * <p>
1328      * @throws IOException
1329      */
1330     @Override
1331     public void shutdown()
1332         throws IOException
1333     {
1334         shutdown("", Registry.REGISTRY_PORT);
1335     }
1336 
1337     /**
1338      * Shuts down a server at a particular host and port. Then it calls shutdown on the cache
1339      * itself.
1340      * <p>
1341      * @param host
1342      * @param port
1343      * @throws IOException
1344      */
1345     @Override
1346     public void shutdown( final String host, final int port )
1347         throws IOException
1348     {
1349         log.info( "Received shutdown request. Shutting down server." );
1350 
1351         synchronized (listenerId)
1352         {
1353             for (final String cacheName : cacheListenersMap.keySet())
1354             {
1355                 for (int i = 0; i <= listenerId[0]; i++)
1356                 {
1357                     removeCacheListener( cacheName, i );
1358                 }
1359 
1360                 log.info( "Removing listener for cache [{0}]", cacheName );
1361             }
1362 
1363             cacheListenersMap.clear();
1364             clusterListenersMap.clear();
1365         }
1366         RemoteCacheServerFactory.shutdownImpl( host, port );
1367         this.cacheManager.shutDown();
1368     }
1369 
1370     /**
1371      * Called by the RMI runtime sometime after the runtime determines that the reference list, the
1372      * list of clients referencing the remote object, becomes empty.
1373      */
1374     // TODO: test out the DGC.
1375     @Override
1376     public void unreferenced()
1377     {
1378         log.info( "*** Server now unreferenced and subject to GC. ***" );
1379     }
1380 
1381     /**
1382      * Returns the next generated listener id [0,255].
1383      * <p>
1384      * @return the listener id of a client. This should be unique for this server.
1385      */
1386     private long nextListenerId()
1387     {
1388         long id = 0;
1389         if ( listenerId[0] == Integer.MAX_VALUE )
1390         {
1391             synchronized ( listenerId )
1392             {
1393                 id = listenerId[0];
1394                 listenerId[0] = 0;
1395                 // TODO: record & check if the generated id is currently being
1396                 // used by a valid listener. Currently if the id wraps after
1397                 // Long.MAX_VALUE,
1398                 // we just assume it won't collide with an existing listener who
1399                 // is live.
1400             }
1401         }
1402         else
1403         {
1404             synchronized ( listenerId )
1405             {
1406                 id = ++listenerId[0];
1407             }
1408         }
1409         return id;
1410     }
1411 
1412     /**
1413      * Gets the stats attribute of the RemoteCacheServer object.
1414      * <p>
1415      * @return The stats value
1416      * @throws IOException
1417      */
1418     @Override
1419     public String getStats()
1420         throws IOException
1421     {
1422         return cacheManager.getStats();
1423     }
1424 
1425     /**
1426      * Logs an event if an event logger is configured.
1427      * <p>
1428      * @param item
1429      * @param requesterId
1430      * @param eventName
1431      * @return ICacheEvent
1432      */
1433     private ICacheEvent<ICacheElement<K, V>> createICacheEvent( final ICacheElement<K, V> item, final long requesterId, final String eventName )
1434     {
1435         if ( cacheEventLogger == null )
1436         {
1437             return new CacheEvent<>();
1438         }
1439         final String ipAddress = getExtraInfoForRequesterId( requesterId );
1440         return cacheEventLogger
1441             .createICacheEvent( "RemoteCacheServer", item.getCacheName(), eventName, ipAddress, item );
1442     }
1443 
1444     /**
1445      * Logs an event if an event logger is configured.
1446      * <p>
1447      * @param cacheName
1448      * @param key
1449      * @param requesterId
1450      * @param eventName
1451      * @return ICacheEvent
1452      */
1453     private <T> ICacheEvent<T> createICacheEvent( final String cacheName, final T key, final long requesterId, final String eventName )
1454     {
1455         if ( cacheEventLogger == null )
1456         {
1457             return new CacheEvent<>();
1458         }
1459         final String ipAddress = getExtraInfoForRequesterId( requesterId );
1460         return cacheEventLogger.createICacheEvent( "RemoteCacheServer", cacheName, eventName, ipAddress, key );
1461     }
1462 
1463     /**
1464      * Logs an event if an event logger is configured.
1465      * <p>
1466      * @param source
1467      * @param eventName
1468      * @param optionalDetails
1469      */
1470     protected void logApplicationEvent( final String source, final String eventName, final String optionalDetails )
1471     {
1472         if ( cacheEventLogger != null )
1473         {
1474             cacheEventLogger.logApplicationEvent( source, eventName, optionalDetails );
1475         }
1476     }
1477 
1478     /**
1479      * Logs an event if an event logger is configured.
1480      * <p>
1481      * @param cacheEvent
1482      */
1483     protected <T> void logICacheEvent( final ICacheEvent<T> cacheEvent )
1484     {
1485         if ( cacheEventLogger != null )
1486         {
1487             cacheEventLogger.logICacheEvent( cacheEvent );
1488         }
1489     }
1490 
1491     /**
1492      * Ip address for the client, if one is stored.
1493      * <p>
1494      * Protected for testing.
1495      * <p>
1496      * @param requesterId
1497      * @return String
1498      */
1499     protected String getExtraInfoForRequesterId( final long requesterId )
1500     {
1501         return idIPMap.get( Long.valueOf( requesterId ) );
1502     }
1503 
1504     /**
1505      * Allows it to be injected.
1506      * <p>
1507      * @param cacheEventLogger
1508      */
1509     public void setCacheEventLogger( final ICacheEventLogger cacheEventLogger )
1510     {
1511         this.cacheEventLogger = cacheEventLogger;
1512     }
1513 }