View Javadoc

1   package org.apache.jcs.auxiliary.remote;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.IOException;
23  import java.io.Serializable;
24  import java.util.ArrayList;
25  import java.util.HashMap;
26  import java.util.Map;
27  import java.util.Set;
28  import java.util.concurrent.Callable;
29  import java.util.concurrent.ExecutionException;
30  import java.util.concurrent.Future;
31  import java.util.concurrent.ThreadPoolExecutor;
32  import java.util.concurrent.TimeUnit;
33  import java.util.concurrent.TimeoutException;
34  
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.apache.jcs.auxiliary.AbstractAuxiliaryCacheEventLogging;
38  import org.apache.jcs.auxiliary.AuxiliaryCacheAttributes;
39  import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheAttributes;
40  import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheClient;
41  import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheListener;
42  import org.apache.jcs.auxiliary.remote.server.behavior.RemoteType;
43  import org.apache.jcs.engine.CacheStatus;
44  import org.apache.jcs.engine.ZombieCacheServiceNonLocal;
45  import org.apache.jcs.engine.behavior.ICacheElement;
46  import org.apache.jcs.engine.behavior.ICacheElementSerialized;
47  import org.apache.jcs.engine.behavior.ICacheServiceNonLocal;
48  import org.apache.jcs.engine.behavior.IZombie;
49  import org.apache.jcs.engine.logging.behavior.ICacheEventLogger;
50  import org.apache.jcs.engine.stats.StatElement;
51  import org.apache.jcs.engine.stats.Stats;
52  import org.apache.jcs.engine.stats.behavior.IStatElement;
53  import org.apache.jcs.engine.stats.behavior.IStats;
54  import org.apache.jcs.utils.serialization.SerializationConversionUtil;
55  import org.apache.jcs.utils.threadpool.ThreadPoolManager;
56  
57  /** Abstract base for remote caches. I'm trying to break out and reuse common functionality. */
58  public abstract class AbstractRemoteAuxiliaryCache<K extends Serializable, V extends Serializable>
59      extends AbstractAuxiliaryCacheEventLogging<K, V>
60      implements IRemoteCacheClient<K, V>
61  {
62      /** Don't change. */
63      private static final long serialVersionUID = -5329231850422826461L;
64  
65      /** The logger. */
66      private final static Log log = LogFactory.getLog( AbstractRemoteAuxiliaryCache.class );
67  
68      /**
69       * This does the work. In an RMI instances, it will be a remote reference. In an http remote
70       * cache it will be an http client. In zombie mode it is replaced with a balking facade.
71       */
72      private ICacheServiceNonLocal<K, V> remoteCacheService;
73  
74      /** The cacheName */
75      protected final String cacheName;
76  
77      /** The listener. This can be null. */
78      private IRemoteCacheListener<K, V> remoteCacheListener;
79  
80      /** The configuration values. TODO, we'll need a base here. */
81      private IRemoteCacheAttributes remoteCacheAttributes;
82  
83      /** A thread pool for gets if configured. */
84      private ThreadPoolExecutor pool = null;
85  
86      /** Should we get asynchronously using a pool. */
87      private boolean usePoolForGet = false;
88  
89      /**
90       * Creates the base.
91       * <p>
92       * @param cattr
93       * @param remote
94       * @param listener
95       */
96      public AbstractRemoteAuxiliaryCache( IRemoteCacheAttributes cattr, ICacheServiceNonLocal<K, V> remote,
97                                           IRemoteCacheListener<K, V> listener )
98      {
99          this.setRemoteCacheAttributes( cattr );
100         this.cacheName = cattr.getCacheName();
101         this.setRemoteCacheService( remote );
102         this.setRemoteCacheListener( listener );
103 
104         if ( log.isDebugEnabled() )
105         {
106             log.debug( "Construct> cacheName=" + cattr.getCacheName() );
107             log.debug( "irca = " + getRemoteCacheAttributes() );
108             log.debug( "remote = " + remote );
109             log.debug( "listener = " + listener );
110         }
111 
112         // use a pool if it is greater than 0
113         if ( log.isDebugEnabled() )
114         {
115             log.debug( "GetTimeoutMillis() = " + getRemoteCacheAttributes().getGetTimeoutMillis() );
116         }
117 
118         if ( getRemoteCacheAttributes().getGetTimeoutMillis() > 0 )
119         {
120             pool = ThreadPoolManager.getInstance().getPool( getRemoteCacheAttributes().getThreadPoolName() );
121             if ( log.isDebugEnabled() )
122             {
123                 log.debug( "Thread Pool = " + pool );
124             }
125             if ( pool != null )
126             {
127                 usePoolForGet = true;
128             }
129         }
130     }
131 
132     /**
133      * Synchronously dispose the remote cache; if failed, replace the remote handle with a zombie.
134      * <p>
135      * @throws IOException
136      */
137     @Override
138     protected void processDispose()
139         throws IOException
140     {
141         if ( log.isInfoEnabled() )
142         {
143             log.info( "Disposing of remote cache." );
144         }
145         try
146         {
147             if ( getRemoteCacheListener() != null )
148             {
149                 getRemoteCacheListener().dispose();
150             }
151         }
152         catch ( Exception ex )
153         {
154             log.error( "Couldn't dispose", ex );
155             handleException( ex, "Failed to dispose [" + cacheName + "]", ICacheEventLogger.DISPOSE_EVENT );
156         }
157     }
158 
159     /**
160      * Synchronously get from the remote cache; if failed, replace the remote handle with a zombie.
161      * <p>
162      * Use threadpool to timeout if a value is set for GetTimeoutMillis
163      * <p>
164      * If we are a cluster client, we need to leave the Element in its serialized form. Cluster
165      * clients cannot deserialize objects. Cluster clients get ICacheElementSerialized objects from
166      * other remote servers.
167      * <p>
168      * @param key
169      * @return ICacheElement, a wrapper around the key, value, and attributes
170      * @throws IOException
171      */
172     @Override
173     protected ICacheElement<K, V> processGet( K key )
174         throws IOException
175     {
176         ICacheElement<K, V> retVal = null;
177         try
178         {
179             if ( usePoolForGet )
180             {
181                 retVal = getUsingPool( key );
182             }
183             else
184             {
185                 retVal = getRemoteCacheService().get( cacheName, key, getListenerId() );
186             }
187 
188             // Eventually the instance of will not be necessary.
189             if ( retVal != null && retVal instanceof ICacheElementSerialized )
190             {
191                 // Never try to deserialize if you are a cluster client. Cluster
192                 // clients are merely intra-remote cache communicators. Remote caches are assumed
193                 // to have no ability to deserialize the objects.
194                 if ( this.getRemoteCacheAttributes().getRemoteType() != RemoteType.CLUSTER )
195                 {
196                     retVal = SerializationConversionUtil.getDeSerializedCacheElement( (ICacheElementSerialized<K, V>) retVal,
197                                                                                       this.elementSerializer );
198                 }
199             }
200         }
201         catch ( Exception ex )
202         {
203             handleException( ex, "Failed to get [" + key + "] from [" + cacheName + "]", ICacheEventLogger.GET_EVENT );
204         }
205         return retVal;
206     }
207 
208     /**
209      * This allows gets to timeout in case of remote server machine shutdown.
210      * <p>
211      * @param key
212      * @return ICacheElement
213      * @throws IOException
214      */
215     public ICacheElement<K, V> getUsingPool( final K key )
216         throws IOException
217     {
218         int timeout = getRemoteCacheAttributes().getGetTimeoutMillis();
219 
220         try
221         {
222             Callable<ICacheElement<K, V>> command = new Callable<ICacheElement<K, V>>()
223             {
224                 public ICacheElement<K, V> call()
225                     throws IOException
226                 {
227                     return getRemoteCacheService().get( cacheName, key, getListenerId() );
228                 }
229             };
230 
231             // execute using the pool
232             Future<ICacheElement<K, V>> future = pool.submit(command);
233 
234             // used timed get in order to timeout
235             ICacheElement<K, V> ice = future.get(timeout, TimeUnit.MILLISECONDS);
236 
237             if ( log.isDebugEnabled() )
238             {
239                 if ( ice == null )
240                 {
241                     log.debug( "nothing found in remote cache" );
242                 }
243                 else
244                 {
245                     log.debug( "found item in remote cache" );
246                 }
247             }
248             return ice;
249         }
250         catch ( TimeoutException te )
251         {
252             log.warn( "TimeoutException, Get Request timed out after " + timeout );
253             throw new IOException( "Get Request timed out after " + timeout );
254         }
255         catch ( InterruptedException ex )
256         {
257             log.warn( "InterruptedException, Get Request timed out after " + timeout );
258             throw new IOException( "Get Request timed out after " + timeout );
259         }
260         catch (ExecutionException ex)
261         {
262             // assume that this is an IOException thrown by the callable.
263             log.error( "ExecutionException, Assuming an IO exception thrown in the background.", ex );
264             throw new IOException( "Get Request timed out after " + timeout );
265         }
266     }
267 
268     /**
269      * Calls get matching on the server. Each entry in the result is unwrapped.
270      * <p>
271      * @param pattern
272      * @return Map
273      * @throws IOException
274      */
275     @Override
276     public Map<K, ICacheElement<K, V>> processGetMatching( String pattern )
277         throws IOException
278     {
279         Map<K, ICacheElement<K, V>> results = new HashMap<K, ICacheElement<K, V>>();
280         try
281         {
282             Map<K, ICacheElement<K, V>> rawResults = getRemoteCacheService().getMatching( cacheName, pattern, getListenerId() );
283 
284             // Eventually the instance of will not be necessary.
285             if ( rawResults != null )
286             {
287                 for (Map.Entry<K, ICacheElement<K, V>> entry : rawResults.entrySet())
288                 {
289                     ICacheElement<K, V> unwrappedResult = null;
290                     if ( entry.getValue() instanceof ICacheElementSerialized )
291                     {
292                         // Never try to deserialize if you are a cluster client. Cluster
293                         // clients are merely intra-remote cache communicators. Remote caches are assumed
294                         // to have no ability to deserialize the objects.
295                         if ( this.getRemoteCacheAttributes().getRemoteType() != RemoteType.CLUSTER )
296                         {
297                             unwrappedResult = SerializationConversionUtil
298                                 .getDeSerializedCacheElement( (ICacheElementSerialized<K, V>) entry.getValue(),
299                                                               this.elementSerializer );
300                         }
301                     }
302                     else
303                     {
304                         unwrappedResult = entry.getValue();
305                     }
306                     results.put( entry.getKey(), unwrappedResult );
307                 }
308             }
309         }
310         catch ( Exception ex )
311         {
312             handleException( ex, "Failed to getMatching [" + pattern + "] from [" + cacheName + "]",
313                              ICacheEventLogger.GET_EVENT );
314         }
315         return results;
316     }
317 
318     /**
319      * Gets multiple items from the cache based on the given set of keys.
320      * <p>
321      * @param keys
322      * @return a map of K key to ICacheElement<K, V> element, or an empty map if there is no
323      *         data in cache for any of these keys
324      * @throws IOException
325      */
326     @Override
327     protected Map<K, ICacheElement<K, V>> processGetMultiple( Set<K> keys )
328         throws IOException
329     {
330         Map<K, ICacheElement<K, V>> elements = new HashMap<K, ICacheElement<K, V>>();
331         if ( keys != null && !keys.isEmpty() )
332         {
333             for (K key : keys)
334             {
335                 ICacheElement<K, V> element = get( key );
336 
337                 if ( element != null )
338                 {
339                     elements.put( key, element );
340                 }
341             }
342         }
343         return elements;
344     }
345 
346     /**
347      * Synchronously remove from the remote cache; if failed, replace the remote handle with a
348      * zombie.
349      * <p>
350      * @param key
351      * @return boolean, whether or not the item was removed
352      * @throws IOException
353      */
354     @Override
355     protected boolean processRemove( K key )
356         throws IOException
357     {
358         if ( !this.getRemoteCacheAttributes().getGetOnly() )
359         {
360             if ( log.isDebugEnabled() )
361             {
362                 log.debug( "remove> key=" + key );
363             }
364             try
365             {
366                 getRemoteCacheService().remove( cacheName, key, getListenerId() );
367             }
368             catch ( Exception ex )
369             {
370                 handleException( ex, "Failed to remove " + key + " from " + cacheName, ICacheEventLogger.REMOVE_EVENT );
371             }
372             return true;
373         }
374         return false;
375     }
376 
377     /**
378      * Synchronously removeAll from the remote cache; if failed, replace the remote handle with a
379      * zombie.
380      * <p>
381      * @throws IOException
382      */
383     @Override
384     protected void processRemoveAll()
385         throws IOException
386     {
387         if ( !this.getRemoteCacheAttributes().getGetOnly() )
388         {
389             try
390             {
391                 getRemoteCacheService().removeAll( cacheName, getListenerId() );
392             }
393             catch ( Exception ex )
394             {
395                 handleException( ex, "Failed to remove all from " + cacheName, ICacheEventLogger.REMOVEALL_EVENT );
396             }
397         }
398     }
399 
400     /**
401      * Serializes the object and then calls update on the remote server with the byte array. The
402      * byte array is wrapped in a ICacheElementSerialized. This allows the remote server to operate
403      * without any knowledge of caches classes.
404      * <p>
405      * @param ce
406      * @throws IOException
407      */
408     @Override
409     protected void processUpdate( ICacheElement<K, V> ce )
410         throws IOException
411     {
412         if ( !getRemoteCacheAttributes().getGetOnly() )
413         {
414             ICacheElementSerialized<K, V> serialized = null;
415             try
416             {
417                 if ( log.isDebugEnabled() )
418                 {
419                     log.debug( "sending item to remote server" );
420                 }
421 
422                 // convert so we don't have to know about the object on the
423                 // other end.
424                 serialized = SerializationConversionUtil.getSerializedCacheElement( ce, this.elementSerializer );
425 
426                 remoteCacheService.update( serialized, getListenerId() );
427             }
428             catch ( NullPointerException npe )
429             {
430                 log.error( "npe for ce = " + ce + "ce.attr = " + ce.getElementAttributes(), npe );
431             }
432             catch ( Exception ex )
433             {
434                 // event queue will wait and retry
435                 handleException( ex, "Failed to put [" + ce.getKey() + "] to " + ce.getCacheName(),
436                                  ICacheEventLogger.UPDATE_EVENT );
437             }
438         }
439         else
440         {
441             if ( log.isDebugEnabled() )
442             {
443                 log.debug( "get only mode, not sending to remote server" );
444             }
445         }
446     }
447 
448     /**
449      * Returns all the keys for a group.
450      * <p>
451      * @param groupName
452      * @return Set
453      * @throws java.rmi.RemoteException
454      * @throws IOException
455      */
456     public Set<K> getGroupKeys( String groupName )
457         throws java.rmi.RemoteException, IOException
458     {
459         return getRemoteCacheService().getGroupKeys( cacheName, groupName );
460     }
461 
462     /**
463      * Returns all the group names for a cache.
464      * <p>
465      * @return Set
466      * @throws java.rmi.RemoteException
467      * @throws IOException
468      */
469     public Set<String> getGroupNames()
470         throws java.rmi.RemoteException, IOException
471     {
472         return getRemoteCacheService().getGroupNames( cacheName );
473     }
474 
475     /**
476      * Allows other member of this package to access the listener. This is mainly needed for
477      * deregistering a listener.
478      * <p>
479      * @return IRemoteCacheListener, the listener for this remote server
480      */
481     public IRemoteCacheListener<K, V> getListener()
482     {
483         return getRemoteCacheListener();
484     }
485 
486     /**
487      * let the remote cache set a listener_id. Since there is only one listener for all the regions
488      * and every region gets registered? the id shouldn't be set if it isn't zero. If it is we
489      * assume that it is a reconnect.
490      * <p>
491      * @param id The new listenerId value
492      */
493     public void setListenerId( long id )
494     {
495         if ( getRemoteCacheListener() != null )
496         {
497             try
498             {
499                 getRemoteCacheListener().setListenerId( id );
500 
501                 if ( log.isDebugEnabled() )
502                 {
503                     log.debug( "set listenerId = " + id );
504                 }
505             }
506             catch ( Exception e )
507             {
508                 log.error( "Problem setting listenerId", e );
509             }
510         }
511     }
512 
513     /**
514      * Gets the listenerId attribute of the RemoteCacheListener object
515      * <p>
516      * @return The listenerId value
517      */
518     public long getListenerId()
519     {
520         if ( getRemoteCacheListener() != null )
521         {
522             try
523             {
524                 if ( log.isDebugEnabled() )
525                 {
526                     log.debug( "get listenerId = " + getRemoteCacheListener().getListenerId() );
527                 }
528                 return getRemoteCacheListener().getListenerId();
529             }
530             catch ( Exception e )
531             {
532                 log.error( "Problem getting listenerId", e );
533             }
534         }
535         return -1;
536     }
537 
538     /**
539      * Returns the current cache size.
540      * @return The size value
541      */
542     public int getSize()
543     {
544         return 0;
545     }
546 
547     /**
548      * Custom exception handling some children.  This should be used to initiate failover.
549      * <p>
550      * @param ex
551      * @param msg
552      * @param eventName
553      * @throws IOException
554      */
555     protected abstract void handleException( Exception ex, String msg, String eventName )
556         throws IOException;
557 
558     /**
559      * Gets the stats attribute of the RemoteCache object.
560      * <p>
561      * @return The stats value
562      */
563     public String getStats()
564     {
565         return getStatistics().toString();
566     }
567 
568     /**
569      * @return IStats object
570      */
571     public IStats getStatistics()
572     {
573         IStats stats = new Stats();
574         stats.setTypeName( "AbstractRemoteAuxiliaryCache" );
575 
576         ArrayList<IStatElement> elems = new ArrayList<IStatElement>();
577 
578         IStatElement se = null;
579 
580         se = new StatElement();
581         se.setName( "Remote Type" );
582         se.setData( this.getRemoteCacheAttributes().getRemoteTypeName() + "" );
583         elems.add( se );
584 
585         if ( this.getRemoteCacheAttributes().getRemoteType() == RemoteType.CLUSTER )
586         {
587             // something cluster specific
588         }
589 
590         // no data gathered here
591 
592         se = new StatElement();
593         se.setName( "UsePoolForGet" );
594         se.setData( "" + usePoolForGet );
595         elems.add( se );
596 
597         if ( pool != null )
598         {
599             se = new StatElement();
600             se.setName( "Pool Size" );
601             se.setData( "" + pool.getPoolSize() );
602             elems.add( se );
603 
604             se = new StatElement();
605             se.setName( "Maximum Pool Size" );
606             se.setData( "" + pool.getMaximumPoolSize() );
607             elems.add( se );
608         }
609 
610         if ( getRemoteCacheService() instanceof ZombieCacheServiceNonLocal )
611         {
612             se = new StatElement();
613             se.setName( "Zombie Queue Size" );
614             se.setData( "" + ( (ZombieCacheServiceNonLocal<K, V>) getRemoteCacheService() ).getQueueSize() );
615             elems.add( se );
616         }
617 
618         // get an array and put them in the Stats object
619         IStatElement[] ses = elems.toArray( new StatElement[0] );
620         stats.setStatElements( ses );
621 
622         return stats;
623     }
624 
625     /**
626      * Returns the cache status. An error status indicates the remote connection is not available.
627      * <p>
628      * @return The status value
629      */
630     public CacheStatus getStatus()
631     {
632         return getRemoteCacheService() instanceof IZombie ? CacheStatus.ERROR : CacheStatus.ALIVE;
633     }
634 
635     /**
636      * Replaces the current remote cache service handle with the given handle. If the current remote
637      * is a Zombie, then it propagates any events that are queued to the restored service.
638      * <p>
639      * @param restoredRemote ICacheServiceNonLocal -- the remote server or proxy to the remote server
640      */
641     public void fixCache( ICacheServiceNonLocal<?, ?> restoredRemote )
642     {
643         @SuppressWarnings("unchecked") // Don't know how to do this properly
644         ICacheServiceNonLocal<K, V> remote = (ICacheServiceNonLocal<K, V>)restoredRemote;
645         if ( getRemoteCacheService() != null && getRemoteCacheService() instanceof ZombieCacheServiceNonLocal )
646         {
647             ZombieCacheServiceNonLocal<K, V> zombie = (ZombieCacheServiceNonLocal<K, V>) getRemoteCacheService();
648             setRemoteCacheService( remote );
649             try
650             {
651                 zombie.propagateEvents( remote );
652             }
653             catch ( Exception e )
654             {
655                 try
656                 {
657                     handleException( e, "Problem propagating events from Zombie Queue to new Remote Service.",
658                                      "fixCache" );
659                 }
660                 catch ( IOException e1 )
661                 {
662                     // swallow, since this is just expected kick back.  Handle always throws
663                 }
664             }
665         }
666         else
667         {
668             setRemoteCacheService( remote );
669         }
670     }
671 
672 
673     /**
674      * Gets the cacheType attribute of the RemoteCache object
675      * @return The cacheType value
676      */
677     public CacheType getCacheType()
678     {
679         return CacheType.REMOTE_CACHE;
680     }
681 
682     /**
683      * Gets the cacheName attribute of the RemoteCache object.
684      * <p>
685      * @return The cacheName value
686      */
687     public String getCacheName()
688     {
689         return cacheName;
690     }
691 
692     /**
693      * @param remote the remote to set
694      */
695     protected void setRemoteCacheService( ICacheServiceNonLocal<K, V> remote )
696     {
697         this.remoteCacheService = remote;
698     }
699 
700     /**
701      * @return the remote
702      */
703     protected ICacheServiceNonLocal<K, V> getRemoteCacheService()
704     {
705         return remoteCacheService;
706     }
707 
708     /**
709      * @return Returns the AuxiliaryCacheAttributes.
710      */
711     public AuxiliaryCacheAttributes getAuxiliaryCacheAttributes()
712     {
713         return getRemoteCacheAttributes();
714     }
715 
716     /**
717      * @param remoteCacheAttributes the remoteCacheAttributes to set
718      */
719     protected void setRemoteCacheAttributes( IRemoteCacheAttributes remoteCacheAttributes )
720     {
721         this.remoteCacheAttributes = remoteCacheAttributes;
722     }
723 
724     /**
725      * @return the remoteCacheAttributes
726      */
727     protected IRemoteCacheAttributes getRemoteCacheAttributes()
728     {
729         return remoteCacheAttributes;
730     }
731 
732     /**
733      * @param remoteCacheListener the remoteCacheListener to set
734      */
735     protected void setRemoteCacheListener( IRemoteCacheListener<K, V> remoteCacheListener )
736     {
737         this.remoteCacheListener = remoteCacheListener;
738     }
739 
740     /**
741      * @return the remoteCacheListener
742      */
743     protected IRemoteCacheListener<K, V> getRemoteCacheListener()
744     {
745         return remoteCacheListener;
746     }
747 }