View Javadoc
1   package org.apache.commons.jcs.auxiliary.remote;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.HashMap;
25  import java.util.Map;
26  import java.util.Set;
27  import java.util.concurrent.Callable;
28  import java.util.concurrent.ExecutionException;
29  import java.util.concurrent.Future;
30  import java.util.concurrent.ThreadPoolExecutor;
31  import java.util.concurrent.TimeUnit;
32  import java.util.concurrent.TimeoutException;
33  
34  import org.apache.commons.jcs.auxiliary.AbstractAuxiliaryCacheEventLogging;
35  import org.apache.commons.jcs.auxiliary.AuxiliaryCacheAttributes;
36  import org.apache.commons.jcs.auxiliary.remote.behavior.IRemoteCacheAttributes;
37  import org.apache.commons.jcs.auxiliary.remote.behavior.IRemoteCacheClient;
38  import org.apache.commons.jcs.auxiliary.remote.behavior.IRemoteCacheListener;
39  import org.apache.commons.jcs.auxiliary.remote.server.behavior.RemoteType;
40  import org.apache.commons.jcs.engine.CacheStatus;
41  import org.apache.commons.jcs.engine.ZombieCacheServiceNonLocal;
42  import org.apache.commons.jcs.engine.behavior.ICacheElement;
43  import org.apache.commons.jcs.engine.behavior.ICacheElementSerialized;
44  import org.apache.commons.jcs.engine.behavior.ICacheServiceNonLocal;
45  import org.apache.commons.jcs.engine.behavior.IZombie;
46  import org.apache.commons.jcs.engine.logging.behavior.ICacheEventLogger;
47  import org.apache.commons.jcs.engine.stats.StatElement;
48  import org.apache.commons.jcs.engine.stats.Stats;
49  import org.apache.commons.jcs.engine.stats.behavior.IStatElement;
50  import org.apache.commons.jcs.engine.stats.behavior.IStats;
51  import org.apache.commons.jcs.utils.serialization.SerializationConversionUtil;
52  import org.apache.commons.jcs.utils.threadpool.ThreadPoolManager;
53  import org.apache.commons.logging.Log;
54  import org.apache.commons.logging.LogFactory;
55  
56  /** Abstract base for remote caches. I'm trying to break out and reuse common functionality. */
57  public abstract class AbstractRemoteAuxiliaryCache<K, V>
58      extends AbstractAuxiliaryCacheEventLogging<K, V>
59      implements IRemoteCacheClient<K, V>
60  {
61      /** The logger. */
62      private static final Log log = LogFactory.getLog( AbstractRemoteAuxiliaryCache.class );
63  
64      /**
65       * This does the work. In an RMI instances, it will be a remote reference. In an http remote
66       * cache it will be an http client. In zombie mode it is replaced with a balking facade.
67       */
68      private ICacheServiceNonLocal<K, V> remoteCacheService;
69  
70      /** The cacheName */
71      protected final String cacheName;
72  
73      /** The listener. This can be null. */
74      private IRemoteCacheListener<K, V> remoteCacheListener;
75  
76      /** The configuration values. TODO, we'll need a base here. */
77      private IRemoteCacheAttributes remoteCacheAttributes;
78  
79      /** A thread pool for gets if configured. */
80      private ThreadPoolExecutor pool = null;
81  
82      /** Should we get asynchronously using a pool. */
83      private boolean usePoolForGet = false;
84  
85      /**
86       * Creates the base.
87       * <p>
88       * @param cattr
89       * @param remote
90       * @param listener
91       */
92      public AbstractRemoteAuxiliaryCache( IRemoteCacheAttributes cattr, ICacheServiceNonLocal<K, V> remote,
93                                           IRemoteCacheListener<K, V> listener )
94      {
95          this.setRemoteCacheAttributes( cattr );
96          this.cacheName = cattr.getCacheName();
97          this.setRemoteCacheService( remote );
98          this.setRemoteCacheListener( listener );
99  
100         if ( log.isDebugEnabled() )
101         {
102             log.debug( "Construct> cacheName=" + cattr.getCacheName() );
103             log.debug( "irca = " + getRemoteCacheAttributes() );
104             log.debug( "remote = " + remote );
105             log.debug( "listener = " + listener );
106         }
107 
108         // use a pool if it is greater than 0
109         if ( log.isDebugEnabled() )
110         {
111             log.debug( "GetTimeoutMillis() = " + getRemoteCacheAttributes().getGetTimeoutMillis() );
112         }
113 
114         if ( getRemoteCacheAttributes().getGetTimeoutMillis() > 0 )
115         {
116             pool = ThreadPoolManager.getInstance().getPool( getRemoteCacheAttributes().getThreadPoolName() );
117             if ( log.isDebugEnabled() )
118             {
119                 log.debug( "Thread Pool = " + pool );
120             }
121             if ( pool != null )
122             {
123                 usePoolForGet = true;
124             }
125         }
126     }
127 
128     /**
129      * Synchronously dispose the remote cache; if failed, replace the remote handle with a zombie.
130      * <p>
131      * @throws IOException
132      */
133     @Override
134     protected void processDispose()
135         throws IOException
136     {
137         if ( log.isInfoEnabled() )
138         {
139             log.info( "Disposing of remote cache." );
140         }
141         try
142         {
143             if ( getRemoteCacheListener() != null )
144             {
145                 getRemoteCacheListener().dispose();
146             }
147         }
148         catch ( Exception ex )
149         {
150             log.error( "Couldn't dispose", ex );
151             handleException( ex, "Failed to dispose [" + cacheName + "]", ICacheEventLogger.DISPOSE_EVENT );
152         }
153     }
154 
155     /**
156      * Synchronously get from the remote cache; if failed, replace the remote handle with a zombie.
157      * <p>
158      * Use threadpool to timeout if a value is set for GetTimeoutMillis
159      * <p>
160      * If we are a cluster client, we need to leave the Element in its serialized form. Cluster
161      * clients cannot deserialize objects. Cluster clients get ICacheElementSerialized objects from
162      * other remote servers.
163      * <p>
164      * @param key
165      * @return ICacheElement, a wrapper around the key, value, and attributes
166      * @throws IOException
167      */
168     @Override
169     protected ICacheElement<K, V> processGet( K key )
170         throws IOException
171     {
172         ICacheElement<K, V> retVal = null;
173         try
174         {
175             if ( usePoolForGet )
176             {
177                 retVal = getUsingPool( key );
178             }
179             else
180             {
181                 retVal = getRemoteCacheService().get( cacheName, key, getListenerId() );
182             }
183 
184             // Eventually the instance of will not be necessary.
185             if ( retVal instanceof ICacheElementSerialized )
186             {
187                 // Never try to deserialize if you are a cluster client. Cluster
188                 // clients are merely intra-remote cache communicators. Remote caches are assumed
189                 // to have no ability to deserialize the objects.
190                 if ( this.getRemoteCacheAttributes().getRemoteType() != RemoteType.CLUSTER )
191                 {
192                     retVal = SerializationConversionUtil.getDeSerializedCacheElement( (ICacheElementSerialized<K, V>) retVal,
193                             super.getElementSerializer() );
194                 }
195             }
196         }
197         catch ( Exception ex )
198         {
199             handleException( ex, "Failed to get [" + key + "] from [" + cacheName + "]", ICacheEventLogger.GET_EVENT );
200         }
201         return retVal;
202     }
203 
204     /**
205      * This allows gets to timeout in case of remote server machine shutdown.
206      * <p>
207      * @param key
208      * @return ICacheElement
209      * @throws IOException
210      */
211     public ICacheElement<K, V> getUsingPool( final K key )
212         throws IOException
213     {
214         int timeout = getRemoteCacheAttributes().getGetTimeoutMillis();
215 
216         try
217         {
218             Callable<ICacheElement<K, V>> command = new Callable<ICacheElement<K, V>>()
219             {
220                 @Override
221                 public ICacheElement<K, V> call()
222                     throws IOException
223                 {
224                     return getRemoteCacheService().get( cacheName, key, getListenerId() );
225                 }
226             };
227 
228             // execute using the pool
229             Future<ICacheElement<K, V>> future = pool.submit(command);
230 
231             // used timed get in order to timeout
232             ICacheElement<K, V> ice = future.get(timeout, TimeUnit.MILLISECONDS);
233 
234             if ( log.isDebugEnabled() )
235             {
236                 if ( ice == null )
237                 {
238                     log.debug( "nothing found in remote cache" );
239                 }
240                 else
241                 {
242                     log.debug( "found item in remote cache" );
243                 }
244             }
245             return ice;
246         }
247         catch ( TimeoutException te )
248         {
249             log.warn( "TimeoutException, Get Request timed out after " + timeout );
250             throw new IOException( "Get Request timed out after " + timeout );
251         }
252         catch ( InterruptedException ex )
253         {
254             log.warn( "InterruptedException, Get Request timed out after " + timeout );
255             throw new IOException( "Get Request timed out after " + timeout );
256         }
257         catch (ExecutionException ex)
258         {
259             // assume that this is an IOException thrown by the callable.
260             log.error( "ExecutionException, Assuming an IO exception thrown in the background.", ex );
261             throw new IOException( "Get Request timed out after " + timeout );
262         }
263     }
264 
265     /**
266      * Calls get matching on the server. Each entry in the result is unwrapped.
267      * <p>
268      * @param pattern
269      * @return Map
270      * @throws IOException
271      */
272     @Override
273     public Map<K, ICacheElement<K, V>> processGetMatching( String pattern )
274         throws IOException
275     {
276         Map<K, ICacheElement<K, V>> results = new HashMap<K, ICacheElement<K, V>>();
277         try
278         {
279             Map<K, ICacheElement<K, V>> rawResults = getRemoteCacheService().getMatching( cacheName, pattern, getListenerId() );
280 
281             // Eventually the instance of will not be necessary.
282             if ( rawResults != null )
283             {
284                 for (Map.Entry<K, ICacheElement<K, V>> entry : rawResults.entrySet())
285                 {
286                     ICacheElement<K, V> unwrappedResult = null;
287                     if ( entry.getValue() instanceof ICacheElementSerialized )
288                     {
289                         // Never try to deserialize if you are a cluster client. Cluster
290                         // clients are merely intra-remote cache communicators. Remote caches are assumed
291                         // to have no ability to deserialize the objects.
292                         if ( this.getRemoteCacheAttributes().getRemoteType() != RemoteType.CLUSTER )
293                         {
294                             unwrappedResult = SerializationConversionUtil
295                                 .getDeSerializedCacheElement( (ICacheElementSerialized<K, V>) entry.getValue(),
296                                         super.getElementSerializer() );
297                         }
298                     }
299                     else
300                     {
301                         unwrappedResult = entry.getValue();
302                     }
303                     results.put( entry.getKey(), unwrappedResult );
304                 }
305             }
306         }
307         catch ( Exception ex )
308         {
309             handleException( ex, "Failed to getMatching [" + pattern + "] from [" + cacheName + "]",
310                              ICacheEventLogger.GET_EVENT );
311         }
312         return results;
313     }
314 
315     /**
316      * Gets multiple items from the cache based on the given set of keys.
317      * <p>
318      * @param keys
319      * @return a map of K key to ICacheElement&lt;K, V&gt; element, or an empty map if there is no
320      *         data in cache for any of these keys
321      * @throws IOException
322      */
323     @Override
324     protected Map<K, ICacheElement<K, V>> processGetMultiple( Set<K> keys )
325         throws IOException
326     {
327         Map<K, ICacheElement<K, V>> elements = new HashMap<K, ICacheElement<K, V>>();
328         if ( keys != null && !keys.isEmpty() )
329         {
330             for (K key : keys)
331             {
332                 ICacheElement<K, V> element = get( key );
333 
334                 if ( element != null )
335                 {
336                     elements.put( key, element );
337                 }
338             }
339         }
340         return elements;
341     }
342 
343     /**
344      * Synchronously remove from the remote cache; if failed, replace the remote handle with a
345      * zombie.
346      * <p>
347      * @param key
348      * @return boolean, whether or not the item was removed
349      * @throws IOException
350      */
351     @Override
352     protected boolean processRemove( K key )
353         throws IOException
354     {
355         if ( !this.getRemoteCacheAttributes().getGetOnly() )
356         {
357             if ( log.isDebugEnabled() )
358             {
359                 log.debug( "remove> key=" + key );
360             }
361             try
362             {
363                 getRemoteCacheService().remove( cacheName, key, getListenerId() );
364             }
365             catch ( Exception ex )
366             {
367                 handleException( ex, "Failed to remove " + key + " from " + cacheName, ICacheEventLogger.REMOVE_EVENT );
368             }
369             return true;
370         }
371         return false;
372     }
373 
374     /**
375      * Synchronously removeAll from the remote cache; if failed, replace the remote handle with a
376      * zombie.
377      * <p>
378      * @throws IOException
379      */
380     @Override
381     protected void processRemoveAll()
382         throws IOException
383     {
384         if ( !this.getRemoteCacheAttributes().getGetOnly() )
385         {
386             try
387             {
388                 getRemoteCacheService().removeAll( cacheName, getListenerId() );
389             }
390             catch ( Exception ex )
391             {
392                 handleException( ex, "Failed to remove all from " + cacheName, ICacheEventLogger.REMOVEALL_EVENT );
393             }
394         }
395     }
396 
397     /**
398      * Serializes the object and then calls update on the remote server with the byte array. The
399      * byte array is wrapped in a ICacheElementSerialized. This allows the remote server to operate
400      * without any knowledge of caches classes.
401      * <p>
402      * @param ce
403      * @throws IOException
404      */
405     @Override
406     protected void processUpdate( ICacheElement<K, V> ce )
407         throws IOException
408     {
409         if ( !getRemoteCacheAttributes().getGetOnly() )
410         {
411             ICacheElementSerialized<K, V> serialized = null;
412             try
413             {
414                 if ( log.isDebugEnabled() )
415                 {
416                     log.debug( "sending item to remote server" );
417                 }
418 
419                 // convert so we don't have to know about the object on the
420                 // other end.
421                 serialized = SerializationConversionUtil.getSerializedCacheElement( ce, super.getElementSerializer() );
422 
423                 remoteCacheService.update( serialized, getListenerId() );
424             }
425             catch ( NullPointerException npe )
426             {
427                 log.error( "npe for ce = " + ce + "ce.attr = " + ce.getElementAttributes(), npe );
428             }
429             catch ( Exception ex )
430             {
431                 // event queue will wait and retry
432                 handleException( ex, "Failed to put [" + ce.getKey() + "] to " + ce.getCacheName(),
433                                  ICacheEventLogger.UPDATE_EVENT );
434             }
435         }
436         else
437         {
438             if ( log.isDebugEnabled() )
439             {
440                 log.debug( "get only mode, not sending to remote server" );
441             }
442         }
443     }
444 
445     /**
446      * Return the keys in this cache.
447      * <p>
448      * @see org.apache.commons.jcs.auxiliary.AuxiliaryCache#getKeySet()
449      */
450     @Override
451     public Set<K> getKeySet()
452         throws IOException
453     {
454         return getRemoteCacheService().getKeySet(cacheName);
455     }
456 
457     /**
458      * Allows other member of this package to access the listener. This is mainly needed for
459      * deregistering a listener.
460      * <p>
461      * @return IRemoteCacheListener, the listener for this remote server
462      */
463     @Override
464     public IRemoteCacheListener<K, V> getListener()
465     {
466         return getRemoteCacheListener();
467     }
468 
469     /**
470      * let the remote cache set a listener_id. Since there is only one listener for all the regions
471      * and every region gets registered? the id shouldn't be set if it isn't zero. If it is we
472      * assume that it is a reconnect.
473      * <p>
474      * @param id The new listenerId value
475      */
476     public void setListenerId( long id )
477     {
478         if ( getRemoteCacheListener() != null )
479         {
480             try
481             {
482                 getRemoteCacheListener().setListenerId( id );
483 
484                 if ( log.isDebugEnabled() )
485                 {
486                     log.debug( "set listenerId = " + id );
487                 }
488             }
489             catch ( Exception e )
490             {
491                 log.error( "Problem setting listenerId", e );
492             }
493         }
494     }
495 
496     /**
497      * Gets the listenerId attribute of the RemoteCacheListener object
498      * <p>
499      * @return The listenerId value
500      */
501     @Override
502     public long getListenerId()
503     {
504         if ( getRemoteCacheListener() != null )
505         {
506             try
507             {
508                 if ( log.isDebugEnabled() )
509                 {
510                     log.debug( "get listenerId = " + getRemoteCacheListener().getListenerId() );
511                 }
512                 return getRemoteCacheListener().getListenerId();
513             }
514             catch ( Exception e )
515             {
516                 log.error( "Problem getting listenerId", e );
517             }
518         }
519         return -1;
520     }
521 
522     /**
523      * Returns the current cache size.
524      * @return The size value
525      */
526     @Override
527     public int getSize()
528     {
529         return 0;
530     }
531 
532     /**
533      * Custom exception handling some children.  This should be used to initiate failover.
534      * <p>
535      * @param ex
536      * @param msg
537      * @param eventName
538      * @throws IOException
539      */
540     protected abstract void handleException( Exception ex, String msg, String eventName )
541         throws IOException;
542 
543     /**
544      * Gets the stats attribute of the RemoteCache object.
545      * <p>
546      * @return The stats value
547      */
548     @Override
549     public String getStats()
550     {
551         return getStatistics().toString();
552     }
553 
554     /**
555      * @return IStats object
556      */
557     @Override
558     public IStats getStatistics()
559     {
560         IStats stats = new Stats();
561         stats.setTypeName( "AbstractRemoteAuxiliaryCache" );
562 
563         ArrayList<IStatElement<?>> elems = new ArrayList<IStatElement<?>>();
564 
565         elems.add(new StatElement<String>( "Remote Type", this.getRemoteCacheAttributes().getRemoteTypeName() ) );
566 
567 //      if ( this.getRemoteCacheAttributes().getRemoteType() == RemoteType.CLUSTER )
568 //      {
569 //          // something cluster specific
570 //      }
571 
572         elems.add(new StatElement<Boolean>( "UsePoolForGet", Boolean.valueOf(usePoolForGet) ) );
573 
574         if ( pool != null )
575         {
576             elems.add(new StatElement<Integer>( "Pool Size", Integer.valueOf(pool.getPoolSize()) ) );
577             elems.add(new StatElement<Integer>( "Maximum Pool Size", Integer.valueOf(pool.getMaximumPoolSize()) ) );
578         }
579 
580         if ( getRemoteCacheService() instanceof ZombieCacheServiceNonLocal )
581         {
582             elems.add(new StatElement<Integer>( "Zombie Queue Size",
583                     Integer.valueOf(( (ZombieCacheServiceNonLocal<K, V>) getRemoteCacheService() ).getQueueSize()) ) );
584         }
585 
586         stats.setStatElements( elems );
587 
588         return stats;
589     }
590 
591     /**
592      * Returns the cache status. An error status indicates the remote connection is not available.
593      * <p>
594      * @return The status value
595      */
596     @Override
597     public CacheStatus getStatus()
598     {
599         return getRemoteCacheService() instanceof IZombie ? CacheStatus.ERROR : CacheStatus.ALIVE;
600     }
601 
602     /**
603      * Replaces the current remote cache service handle with the given handle. If the current remote
604      * is a Zombie, then it propagates any events that are queued to the restored service.
605      * <p>
606      * @param restoredRemote ICacheServiceNonLocal -- the remote server or proxy to the remote server
607      */
608     @Override
609     public void fixCache( ICacheServiceNonLocal<?, ?> restoredRemote )
610     {
611         @SuppressWarnings("unchecked") // Don't know how to do this properly
612         ICacheServiceNonLocal<K, V> remote = (ICacheServiceNonLocal<K, V>)restoredRemote;
613         ICacheServiceNonLocal<K, V> prevRemote = getRemoteCacheService();
614         if ( prevRemote instanceof ZombieCacheServiceNonLocal )
615         {
616             ZombieCacheServiceNonLocal<K, V> zombie = (ZombieCacheServiceNonLocal<K, V>) prevRemote;
617             setRemoteCacheService( remote );
618             try
619             {
620                 zombie.propagateEvents( remote );
621             }
622             catch ( Exception e )
623             {
624                 try
625                 {
626                     handleException( e, "Problem propagating events from Zombie Queue to new Remote Service.",
627                                      "fixCache" );
628                 }
629                 catch ( IOException e1 )
630                 {
631                     // swallow, since this is just expected kick back.  Handle always throws
632                 }
633             }
634         }
635         else
636         {
637             setRemoteCacheService( remote );
638         }
639     }
640 
641 
642     /**
643      * Gets the cacheType attribute of the RemoteCache object
644      * @return The cacheType value
645      */
646     @Override
647     public CacheType getCacheType()
648     {
649         return CacheType.REMOTE_CACHE;
650     }
651 
652     /**
653      * Gets the cacheName attribute of the RemoteCache object.
654      * <p>
655      * @return The cacheName value
656      */
657     @Override
658     public String getCacheName()
659     {
660         return cacheName;
661     }
662 
663     /**
664      * @param remote the remote to set
665      */
666     protected void setRemoteCacheService( ICacheServiceNonLocal<K, V> remote )
667     {
668         this.remoteCacheService = remote;
669     }
670 
671     /**
672      * @return the remote
673      */
674     protected ICacheServiceNonLocal<K, V> getRemoteCacheService()
675     {
676         return remoteCacheService;
677     }
678 
679     /**
680      * @return Returns the AuxiliaryCacheAttributes.
681      */
682     @Override
683     public AuxiliaryCacheAttributes getAuxiliaryCacheAttributes()
684     {
685         return getRemoteCacheAttributes();
686     }
687 
688     /**
689      * @param remoteCacheAttributes the remoteCacheAttributes to set
690      */
691     protected void setRemoteCacheAttributes( IRemoteCacheAttributes remoteCacheAttributes )
692     {
693         this.remoteCacheAttributes = remoteCacheAttributes;
694     }
695 
696     /**
697      * @return the remoteCacheAttributes
698      */
699     protected IRemoteCacheAttributes getRemoteCacheAttributes()
700     {
701         return remoteCacheAttributes;
702     }
703 
704     /**
705      * @param remoteCacheListener the remoteCacheListener to set
706      */
707     protected void setRemoteCacheListener( IRemoteCacheListener<K, V> remoteCacheListener )
708     {
709         this.remoteCacheListener = remoteCacheListener;
710     }
711 
712     /**
713      * @return the remoteCacheListener
714      */
715     protected IRemoteCacheListener<K, V> getRemoteCacheListener()
716     {
717         return remoteCacheListener;
718     }
719 }