View Javadoc
1   package org.apache.commons.jcs3.auxiliary.remote;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.HashMap;
25  import java.util.Map;
26  import java.util.Set;
27  import java.util.concurrent.Callable;
28  import java.util.concurrent.ExecutionException;
29  import java.util.concurrent.ExecutorService;
30  import java.util.concurrent.Future;
31  import java.util.concurrent.TimeUnit;
32  import java.util.concurrent.TimeoutException;
33  
34  import org.apache.commons.jcs3.auxiliary.AbstractAuxiliaryCacheEventLogging;
35  import org.apache.commons.jcs3.auxiliary.AuxiliaryCacheAttributes;
36  import org.apache.commons.jcs3.auxiliary.remote.behavior.IRemoteCacheAttributes;
37  import org.apache.commons.jcs3.auxiliary.remote.behavior.IRemoteCacheClient;
38  import org.apache.commons.jcs3.auxiliary.remote.behavior.IRemoteCacheListener;
39  import org.apache.commons.jcs3.auxiliary.remote.server.behavior.RemoteType;
40  import org.apache.commons.jcs3.engine.CacheStatus;
41  import org.apache.commons.jcs3.engine.ZombieCacheServiceNonLocal;
42  import org.apache.commons.jcs3.engine.behavior.ICacheElement;
43  import org.apache.commons.jcs3.engine.behavior.ICacheElementSerialized;
44  import org.apache.commons.jcs3.engine.behavior.ICacheServiceNonLocal;
45  import org.apache.commons.jcs3.engine.behavior.IZombie;
46  import org.apache.commons.jcs3.engine.logging.behavior.ICacheEventLogger;
47  import org.apache.commons.jcs3.engine.stats.StatElement;
48  import org.apache.commons.jcs3.engine.stats.Stats;
49  import org.apache.commons.jcs3.engine.stats.behavior.IStatElement;
50  import org.apache.commons.jcs3.engine.stats.behavior.IStats;
51  import org.apache.commons.jcs3.log.Log;
52  import org.apache.commons.jcs3.log.LogManager;
53  import org.apache.commons.jcs3.utils.serialization.SerializationConversionUtil;
54  import org.apache.commons.jcs3.utils.threadpool.ThreadPoolManager;
55  
56  /** Abstract base for remote caches. I'm trying to break out and reuse common functionality. */
57  public abstract class AbstractRemoteAuxiliaryCache<K, V>
58      extends AbstractAuxiliaryCacheEventLogging<K, V>
59      implements IRemoteCacheClient<K, V>
60  {
61      /** The logger. */
62      private static final Log log = LogManager.getLog( AbstractRemoteAuxiliaryCache.class );
63  
64      /**
65       * This does the work. In an RMI instances, it will be a remote reference. In an http remote
66       * cache it will be an http client. In zombie mode it is replaced with a balking facade.
67       */
68      private ICacheServiceNonLocal<K, V> remoteCacheService;
69  
70      /** The cacheName */
71      protected final String cacheName;
72  
73      /** The listener. This can be null. */
74      private IRemoteCacheListener<K, V> remoteCacheListener;
75  
76      /** The configuration values. TODO, we'll need a base here. */
77      private IRemoteCacheAttributes remoteCacheAttributes;
78  
79      /** A thread pool for gets if configured. */
80      private ExecutorService pool;
81  
82      /** Should we get asynchronously using a pool. */
83      private boolean usePoolForGet;
84  
85      /**
86       * Creates the base.
87       * <p>
88       * @param cattr
89       * @param remote
90       * @param listener
91       */
92      public AbstractRemoteAuxiliaryCache( final IRemoteCacheAttributes cattr, final ICacheServiceNonLocal<K, V> remote,
93                                           final IRemoteCacheListener<K, V> listener )
94      {
95          this.setRemoteCacheAttributes( cattr );
96          this.cacheName = cattr.getCacheName();
97          this.setRemoteCacheService( remote );
98          this.setRemoteCacheListener( listener );
99  
100         if ( log.isDebugEnabled() )
101         {
102             log.debug( "Construct> cacheName={0}", cattr::getCacheName);
103             log.debug( "irca = {0}", this::getRemoteCacheAttributes);
104             log.debug( "remote = {0}", remote );
105             log.debug( "listener = {0}", listener );
106         }
107 
108         // use a pool if it is greater than 0
109         log.debug( "GetTimeoutMillis() = {0}",
110                 () -> getRemoteCacheAttributes().getGetTimeoutMillis() );
111 
112         if ( getRemoteCacheAttributes().getGetTimeoutMillis() > 0 )
113         {
114             pool = ThreadPoolManager.getInstance().getExecutorService( getRemoteCacheAttributes().getThreadPoolName() );
115             log.debug( "Thread Pool = {0}", pool );
116             usePoolForGet = true;
117         }
118     }
119 
120     /**
121      * Synchronously dispose the remote cache; if failed, replace the remote handle with a zombie.
122      * <p>
123      * @throws IOException
124      */
125     @Override
126     protected void processDispose()
127         throws IOException
128     {
129         log.info( "Disposing of remote cache." );
130         try
131         {
132             if ( getRemoteCacheListener() != null )
133             {
134                 getRemoteCacheListener().dispose();
135             }
136         }
137         catch ( final IOException ex )
138         {
139             log.error( "Couldn't dispose", ex );
140             handleException( ex, "Failed to dispose [" + cacheName + "]", ICacheEventLogger.DISPOSE_EVENT );
141         }
142     }
143 
144     /**
145      * Synchronously get from the remote cache; if failed, replace the remote handle with a zombie.
146      * <p>
147      * Use threadpool to timeout if a value is set for GetTimeoutMillis
148      * <p>
149      * If we are a cluster client, we need to leave the Element in its serialized form. Cluster
150      * clients cannot deserialize objects. Cluster clients get ICacheElementSerialized objects from
151      * other remote servers.
152      * <p>
153      * @param key
154      * @return ICacheElement, a wrapper around the key, value, and attributes
155      * @throws IOException
156      */
157     @Override
158     protected ICacheElement<K, V> processGet( final K key )
159         throws IOException
160     {
161         ICacheElement<K, V> retVal = null;
162         try
163         {
164             if ( usePoolForGet )
165             {
166                 retVal = getUsingPool( key );
167             }
168             else
169             {
170                 retVal = getRemoteCacheService().get( cacheName, key, getListenerId() );
171             }
172 
173             // Eventually the instance of will not be necessary.
174             // Never try to deserialize if you are a cluster client. Cluster
175             // clients are merely intra-remote cache communicators. Remote caches are assumed
176             // to have no ability to deserialize the objects.
177             if (retVal instanceof ICacheElementSerialized && this.getRemoteCacheAttributes().getRemoteType() != RemoteType.CLUSTER)
178             {
179                 retVal = SerializationConversionUtil.getDeSerializedCacheElement( (ICacheElementSerialized<K, V>) retVal,
180                         super.getElementSerializer() );
181             }
182         }
183         catch ( final IOException | ClassNotFoundException ex )
184         {
185             handleException( ex, "Failed to get [" + key + "] from [" + cacheName + "]", ICacheEventLogger.GET_EVENT );
186         }
187         return retVal;
188     }
189 
190     /**
191      * This allows gets to timeout in case of remote server machine shutdown.
192      * <p>
193      * @param key
194      * @return ICacheElement
195      * @throws IOException
196      */
197     public ICacheElement<K, V> getUsingPool( final K key )
198         throws IOException
199     {
200         final int timeout = getRemoteCacheAttributes().getGetTimeoutMillis();
201 
202         try
203         {
204             final Callable<ICacheElement<K, V>> command = () -> getRemoteCacheService().get( cacheName, key, getListenerId() );
205 
206             // execute using the pool
207             final Future<ICacheElement<K, V>> future = pool.submit(command);
208 
209             // used timed get in order to timeout
210             final ICacheElement<K, V> ice = future.get(timeout, TimeUnit.MILLISECONDS);
211 
212             if ( ice == null )
213             {
214                 log.debug( "nothing found in remote cache" );
215             }
216             else
217             {
218                 log.debug( "found item in remote cache" );
219             }
220             return ice;
221         }
222         catch ( final TimeoutException te )
223         {
224             log.warn( "TimeoutException, Get Request timed out after {0}", timeout );
225             throw new IOException( "Get Request timed out after " + timeout );
226         }
227         catch ( final InterruptedException ex )
228         {
229             log.warn( "InterruptedException, Get Request timed out after {0}", timeout );
230             throw new IOException( "Get Request timed out after " + timeout );
231         }
232         catch (final ExecutionException ex)
233         {
234             // assume that this is an IOException thrown by the callable.
235             log.error( "ExecutionException, Assuming an IO exception thrown in the background.", ex );
236             throw new IOException( "Get Request timed out after " + timeout );
237         }
238     }
239 
240     /**
241      * Calls get matching on the server. Each entry in the result is unwrapped.
242      * <p>
243      * @param pattern
244      * @return Map
245      * @throws IOException
246      */
247     @Override
248     public Map<K, ICacheElement<K, V>> processGetMatching( final String pattern )
249         throws IOException
250     {
251         final Map<K, ICacheElement<K, V>> results = new HashMap<>();
252         try
253         {
254             final Map<K, ICacheElement<K, V>> rawResults = getRemoteCacheService().getMatching( cacheName, pattern, getListenerId() );
255 
256             // Eventually the instance of will not be necessary.
257             if ( rawResults != null )
258             {
259                 for (final Map.Entry<K, ICacheElement<K, V>> entry : rawResults.entrySet())
260                 {
261                     ICacheElement<K, V> unwrappedResult = null;
262                     if ( entry.getValue() instanceof ICacheElementSerialized )
263                     {
264                         // Never try to deserialize if you are a cluster client. Cluster
265                         // clients are merely intra-remote cache communicators. Remote caches are assumed
266                         // to have no ability to deserialize the objects.
267                         if ( this.getRemoteCacheAttributes().getRemoteType() != RemoteType.CLUSTER )
268                         {
269                             unwrappedResult = SerializationConversionUtil
270                                 .getDeSerializedCacheElement( (ICacheElementSerialized<K, V>) entry.getValue(),
271                                         super.getElementSerializer() );
272                         }
273                     }
274                     else
275                     {
276                         unwrappedResult = entry.getValue();
277                     }
278                     results.put( entry.getKey(), unwrappedResult );
279                 }
280             }
281         }
282         catch ( final IOException | ClassNotFoundException ex )
283         {
284             handleException( ex, "Failed to getMatching [" + pattern + "] from [" + cacheName + "]",
285                              ICacheEventLogger.GET_EVENT );
286         }
287         return results;
288     }
289 
290     /**
291      * Synchronously remove from the remote cache; if failed, replace the remote handle with a
292      * zombie.
293      * <p>
294      * @param key
295      * @return boolean, whether or not the item was removed
296      * @throws IOException
297      */
298     @Override
299     protected boolean processRemove( final K key )
300         throws IOException
301     {
302         if ( !this.getRemoteCacheAttributes().getGetOnly() )
303         {
304             log.debug( "remove> key={0}", key );
305             try
306             {
307                 getRemoteCacheService().remove( cacheName, key, getListenerId() );
308             }
309             catch ( final IOException ex )
310             {
311                 handleException( ex, "Failed to remove " + key + " from " + cacheName, ICacheEventLogger.REMOVE_EVENT );
312             }
313             return true;
314         }
315         return false;
316     }
317 
318     /**
319      * Synchronously removeAll from the remote cache; if failed, replace the remote handle with a
320      * zombie.
321      * <p>
322      * @throws IOException
323      */
324     @Override
325     protected void processRemoveAll()
326         throws IOException
327     {
328         if ( !this.getRemoteCacheAttributes().getGetOnly() )
329         {
330             try
331             {
332                 getRemoteCacheService().removeAll( cacheName, getListenerId() );
333             }
334             catch ( final IOException ex )
335             {
336                 handleException( ex, "Failed to remove all from " + cacheName, ICacheEventLogger.REMOVEALL_EVENT );
337             }
338         }
339     }
340 
341     /**
342      * Serializes the object and then calls update on the remote server with the byte array. The
343      * byte array is wrapped in a ICacheElementSerialized. This allows the remote server to operate
344      * without any knowledge of caches classes.
345      * <p>
346      * @param ce
347      * @throws IOException
348      */
349     @Override
350     protected void processUpdate( final ICacheElement<K, V> ce )
351         throws IOException
352     {
353         if ( !getRemoteCacheAttributes().getGetOnly() )
354         {
355             try
356             {
357                 log.debug( "sending item to remote server" );
358 
359                 // convert so we don't have to know about the object on the
360                 // other end.
361                 ICacheElementSerialized<K, V> serialized = SerializationConversionUtil.getSerializedCacheElement( ce, super.getElementSerializer() );
362 
363                 remoteCacheService.update( serialized, getListenerId() );
364             }
365             catch ( final IOException ex )
366             {
367                 // event queue will wait and retry
368                 handleException( ex, "Failed to put [" + ce.getKey() + "] to " + ce.getCacheName(),
369                                  ICacheEventLogger.UPDATE_EVENT );
370             }
371         }
372         else
373         {
374             log.debug( "get only mode, not sending to remote server" );
375         }
376     }
377 
378     /**
379      * Return the keys in this cache.
380      * <p>
381      * @see org.apache.commons.jcs3.auxiliary.AuxiliaryCache#getKeySet()
382      */
383     @Override
384     public Set<K> getKeySet()
385         throws IOException
386     {
387         return getRemoteCacheService().getKeySet(cacheName);
388     }
389 
390     /**
391      * Allows other member of this package to access the listener. This is mainly needed for
392      * deregistering a listener.
393      * <p>
394      * @return IRemoteCacheListener, the listener for this remote server
395      */
396     @Override
397     public IRemoteCacheListener<K, V> getListener()
398     {
399         return getRemoteCacheListener();
400     }
401 
402     /**
403      * let the remote cache set a listener_id. Since there is only one listener for all the regions
404      * and every region gets registered? the id shouldn't be set if it isn't zero. If it is we
405      * assume that it is a reconnect.
406      * <p>
407      * @param id The new listenerId value
408      */
409     public void setListenerId( final long id )
410     {
411         if ( getRemoteCacheListener() != null )
412         {
413             try
414             {
415                 getRemoteCacheListener().setListenerId( id );
416 
417                 log.debug( "set listenerId = {0}", id );
418             }
419             catch ( final IOException e )
420             {
421                 log.error( "Problem setting listenerId", e );
422             }
423         }
424     }
425 
426     /**
427      * Gets the listenerId attribute of the RemoteCacheListener object
428      * <p>
429      * @return The listenerId value
430      */
431     @Override
432     public long getListenerId()
433     {
434         if ( getRemoteCacheListener() != null )
435         {
436             try
437             {
438                 log.debug( "get listenerId = {0}", getRemoteCacheListener().getListenerId() );
439                 return getRemoteCacheListener().getListenerId();
440             }
441             catch ( final IOException e )
442             {
443                 log.error( "Problem getting listenerId", e );
444             }
445         }
446         return -1;
447     }
448 
449     /**
450      * Returns the current cache size.
451      * @return The size value
452      */
453     @Override
454     public int getSize()
455     {
456         return 0;
457     }
458 
459     /**
460      * Custom exception handling some children.  This should be used to initiate failover.
461      * <p>
462      * @param ex
463      * @param msg
464      * @param eventName
465      * @throws IOException
466      */
467     protected abstract void handleException( Exception ex, String msg, String eventName )
468         throws IOException;
469 
470     /**
471      * Gets the stats attribute of the RemoteCache object.
472      * <p>
473      * @return The stats value
474      */
475     @Override
476     public String getStats()
477     {
478         return getStatistics().toString();
479     }
480 
481     /**
482      * @return IStats object
483      */
484     @Override
485     public IStats getStatistics()
486     {
487         final IStats stats = new Stats();
488         stats.setTypeName( "AbstractRemoteAuxiliaryCache" );
489 
490         final ArrayList<IStatElement<?>> elems = new ArrayList<>();
491 
492         elems.add(new StatElement<>( "Remote Type", this.getRemoteCacheAttributes().getRemoteTypeName() ) );
493 
494 //      if ( this.getRemoteCacheAttributes().getRemoteType() == RemoteType.CLUSTER )
495 //      {
496 //          // something cluster specific
497 //      }
498 
499         elems.add(new StatElement<>( "UsePoolForGet", Boolean.valueOf(usePoolForGet) ) );
500 
501         if ( pool != null )
502         {
503             elems.add(new StatElement<>( "Pool", pool ) );
504         }
505 
506         if ( getRemoteCacheService() instanceof ZombieCacheServiceNonLocal )
507         {
508             elems.add(new StatElement<>( "Zombie Queue Size",
509                     Integer.valueOf(( (ZombieCacheServiceNonLocal<K, V>) getRemoteCacheService() ).getQueueSize()) ) );
510         }
511 
512         stats.setStatElements( elems );
513 
514         return stats;
515     }
516 
517     /**
518      * Returns the cache status. An error status indicates the remote connection is not available.
519      * <p>
520      * @return The status value
521      */
522     @Override
523     public CacheStatus getStatus()
524     {
525         return getRemoteCacheService() instanceof IZombie ? CacheStatus.ERROR : CacheStatus.ALIVE;
526     }
527 
528     /**
529      * Replaces the current remote cache service handle with the given handle. If the current remote
530      * is a Zombie, then it propagates any events that are queued to the restored service.
531      * <p>
532      * @param restoredRemote ICacheServiceNonLocal -- the remote server or proxy to the remote server
533      */
534     @Override
535     public void fixCache( final ICacheServiceNonLocal<?, ?> restoredRemote )
536     {
537         @SuppressWarnings("unchecked") // Don't know how to do this properly
538         final
539         ICacheServiceNonLocal<K, V> remote = (ICacheServiceNonLocal<K, V>)restoredRemote;
540         final ICacheServiceNonLocal<K, V> prevRemote = getRemoteCacheService();
541         if ( prevRemote instanceof ZombieCacheServiceNonLocal )
542         {
543             final ZombieCacheServiceNonLocal<K, V> zombie = (ZombieCacheServiceNonLocal<K, V>) prevRemote;
544             setRemoteCacheService( remote );
545             try
546             {
547                 zombie.propagateEvents( remote );
548             }
549             catch ( final Exception e )
550             {
551                 try
552                 {
553                     handleException( e, "Problem propagating events from Zombie Queue to new Remote Service.",
554                                      "fixCache" );
555                 }
556                 catch ( final IOException e1 )
557                 {
558                     // swallow, since this is just expected kick back.  Handle always throws
559                 }
560             }
561         }
562         else
563         {
564             setRemoteCacheService( remote );
565         }
566     }
567 
568 
569     /**
570      * Gets the cacheType attribute of the RemoteCache object
571      * @return The cacheType value
572      */
573     @Override
574     public CacheType getCacheType()
575     {
576         return CacheType.REMOTE_CACHE;
577     }
578 
579     /**
580      * Gets the cacheName attribute of the RemoteCache object.
581      * <p>
582      * @return The cacheName value
583      */
584     @Override
585     public String getCacheName()
586     {
587         return cacheName;
588     }
589 
590     /**
591      * @param remote the remote to set
592      */
593     protected void setRemoteCacheService( final ICacheServiceNonLocal<K, V> remote )
594     {
595         this.remoteCacheService = remote;
596     }
597 
598     /**
599      * @return the remote
600      */
601     protected ICacheServiceNonLocal<K, V> getRemoteCacheService()
602     {
603         return remoteCacheService;
604     }
605 
606     /**
607      * @return Returns the AuxiliaryCacheAttributes.
608      */
609     @Override
610     public AuxiliaryCacheAttributes getAuxiliaryCacheAttributes()
611     {
612         return getRemoteCacheAttributes();
613     }
614 
615     /**
616      * @param remoteCacheAttributes the remoteCacheAttributes to set
617      */
618     protected void setRemoteCacheAttributes( final IRemoteCacheAttributes remoteCacheAttributes )
619     {
620         this.remoteCacheAttributes = remoteCacheAttributes;
621     }
622 
623     /**
624      * @return the remoteCacheAttributes
625      */
626     protected IRemoteCacheAttributes getRemoteCacheAttributes()
627     {
628         return remoteCacheAttributes;
629     }
630 
631     /**
632      * @param remoteCacheListener the remoteCacheListener to set
633      */
634     protected void setRemoteCacheListener( final IRemoteCacheListener<K, V> remoteCacheListener )
635     {
636         this.remoteCacheListener = remoteCacheListener;
637     }
638 
639     /**
640      * @return the remoteCacheListener
641      */
642     protected IRemoteCacheListener<K, V> getRemoteCacheListener()
643     {
644         return remoteCacheListener;
645     }
646 }