View Javadoc

1   package org.apache.jcs.auxiliary.remote;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.IOException;
23  import java.io.Serializable;
24  import java.util.ArrayList;
25  import java.util.HashMap;
26  import java.util.Map;
27  import java.util.Set;
28  import java.util.concurrent.Callable;
29  import java.util.concurrent.ExecutionException;
30  import java.util.concurrent.Future;
31  import java.util.concurrent.ThreadPoolExecutor;
32  import java.util.concurrent.TimeUnit;
33  import java.util.concurrent.TimeoutException;
34  
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.apache.jcs.auxiliary.AbstractAuxiliaryCacheEventLogging;
38  import org.apache.jcs.auxiliary.AuxiliaryCacheAttributes;
39  import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheAttributes;
40  import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheClient;
41  import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheListener;
42  import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheService;
43  import org.apache.jcs.engine.CacheConstants;
44  import org.apache.jcs.engine.behavior.ICacheElement;
45  import org.apache.jcs.engine.behavior.ICacheElementSerialized;
46  import org.apache.jcs.engine.behavior.IZombie;
47  import org.apache.jcs.engine.logging.behavior.ICacheEventLogger;
48  import org.apache.jcs.engine.stats.StatElement;
49  import org.apache.jcs.engine.stats.Stats;
50  import org.apache.jcs.engine.stats.behavior.IStatElement;
51  import org.apache.jcs.engine.stats.behavior.IStats;
52  import org.apache.jcs.utils.serialization.SerializationConversionUtil;
53  import org.apache.jcs.utils.threadpool.ThreadPoolManager;
54  
55  /** Abstract base for remote caches. I'm trying to break out and reuse common functionality. */
56  public abstract class AbstractRemoteAuxiliaryCache
57      extends AbstractAuxiliaryCacheEventLogging
58      implements IRemoteCacheClient
59  {
60      /** Don't change. */
61      private static final long serialVersionUID = -5329231850422826461L;
62  
63      /** The logger. */
64      private final static Log log = LogFactory.getLog( AbstractRemoteAuxiliaryCache.class );
65  
66      /**
67       * This does the work. In an RMI instances, it will be a remote reference. In an http remote
68       * cache it will be an http client. In zombie mode it is replaced with a balking facade.
69       */
70      private IRemoteCacheService remoteCacheService;
71  
72      /** The cacheName */
73      protected final String cacheName;
74  
75      /** The listener. This can be null. */
76      private IRemoteCacheListener remoteCacheListener;
77  
78      /** The configuration values. TODO, we'll need a base here. */
79      private IRemoteCacheAttributes remoteCacheAttributes;
80  
81      /** A thread pool for gets if configured. */
82      private ThreadPoolExecutor pool = null;
83  
84      /** Should we get asynchronously using a pool. */
85      private boolean usePoolForGet = false;
86  
87      /**
88       * Creates the base.
89       * <p>
90       * @param cattr
91       * @param remote
92       * @param listener
93       */
94      public AbstractRemoteAuxiliaryCache( IRemoteCacheAttributes cattr, IRemoteCacheService remote,
95                                           IRemoteCacheListener listener )
96      {
97          this.setRemoteCacheAttributes( cattr );
98          this.cacheName = cattr.getCacheName();
99          this.setRemoteCacheService( remote );
100         this.setRemoteCacheListener( listener );
101 
102         if ( log.isDebugEnabled() )
103         {
104             log.debug( "Construct> cacheName=" + cattr.getCacheName() );
105             log.debug( "irca = " + getRemoteCacheAttributes() );
106             log.debug( "remote = " + remote );
107             log.debug( "listener = " + listener );
108         }
109 
110         // use a pool if it is greater than 0
111         if ( log.isDebugEnabled() )
112         {
113             log.debug( "GetTimeoutMillis() = " + getRemoteCacheAttributes().getGetTimeoutMillis() );
114         }
115 
116         if ( getRemoteCacheAttributes().getGetTimeoutMillis() > 0 )
117         {
118             pool = ThreadPoolManager.getInstance().getPool( getRemoteCacheAttributes().getThreadPoolName() );
119             if ( log.isDebugEnabled() )
120             {
121                 log.debug( "Thread Pool = " + pool );
122             }
123             if ( pool != null )
124             {
125                 usePoolForGet = true;
126             }
127         }
128     }
129 
130     /**
131      * Synchronously dispose the remote cache; if failed, replace the remote handle with a zombie.
132      * <p>
133      * @throws IOException
134      */
135     @Override
136     protected void processDispose()
137         throws IOException
138     {
139         if ( log.isInfoEnabled() )
140         {
141             log.info( "Disposing of remote cache." );
142         }
143         try
144         {
145             if ( getRemoteCacheListener() != null )
146             {
147                 getRemoteCacheListener().dispose();
148             }
149         }
150         catch ( Exception ex )
151         {
152             log.error( "Couldn't dispose", ex );
153             handleException( ex, "Failed to dispose [" + cacheName + "]", ICacheEventLogger.DISPOSE_EVENT );
154         }
155     }
156 
157     /**
158      * Synchronously get from the remote cache; if failed, replace the remote handle with a zombie.
159      * <p>
160      * Use threadpool to timeout if a value is set for GetTimeoutMillis
161      * <p>
162      * If we are a cluster client, we need to leave the Element in its serialized form. Cluster
163      * clients cannot deserialize objects. Cluster clients get ICacheElementSerialized objects from
164      * other remote servers.
165      * <p>
166      * @param key
167      * @return ICacheElement, a wrapper around the key, value, and attributes
168      * @throws IOException
169      */
170     @Override
171     protected ICacheElement processGet( Serializable key )
172         throws IOException
173     {
174         ICacheElement retVal = null;
175         try
176         {
177             if ( usePoolForGet )
178             {
179                 retVal = getUsingPool( key );
180             }
181             else
182             {
183                 retVal = getRemoteCacheService().get( cacheName, key, getListenerId() );
184             }
185 
186             // Eventually the instance of will not be necessary.
187             if ( retVal != null && retVal instanceof ICacheElementSerialized )
188             {
189                 // Never try to deserialize if you are a cluster client. Cluster
190                 // clients are merely intra-remote cache communicators. Remote caches are assumed
191                 // to have no ability to deserialze the objects.
192                 if ( this.getRemoteCacheAttributes().getRemoteType() != IRemoteCacheAttributes.CLUSTER )
193                 {
194                     retVal = SerializationConversionUtil.getDeSerializedCacheElement( (ICacheElementSerialized) retVal,
195                                                                                       this.elementSerializer );
196                 }
197             }
198         }
199         catch ( Exception ex )
200         {
201             handleException( ex, "Failed to get [" + key + "] from [" + cacheName + "]", ICacheEventLogger.GET_EVENT );
202         }
203         return retVal;
204     }
205 
206     /**
207      * This allows gets to timeout in case of remote server machine shutdown.
208      * <p>
209      * @param key
210      * @return ICacheElement
211      * @throws IOException
212      */
213     public ICacheElement getUsingPool( final Serializable key )
214         throws IOException
215     {
216         int timeout = getRemoteCacheAttributes().getGetTimeoutMillis();
217 
218         try
219         {
220             Callable<ICacheElement> command = new Callable<ICacheElement>()
221             {
222                 public ICacheElement call()
223                     throws IOException
224                 {
225                     return getRemoteCacheService().get( cacheName, key, getListenerId() );
226                 }
227             };
228 
229             // execute using the pool
230             Future<ICacheElement> future = pool.submit(command);
231 
232             // used timed get in order to timeout
233             ICacheElement ice = future.get(timeout, TimeUnit.MILLISECONDS);
234 
235             if ( log.isDebugEnabled() )
236             {
237                 if ( ice == null )
238                 {
239                     log.debug( "nothing found in remote cache" );
240                 }
241                 else
242                 {
243                     log.debug( "found item in remote cache" );
244                 }
245             }
246             return ice;
247         }
248         catch ( TimeoutException te )
249         {
250             log.warn( "TimeoutException, Get Request timed out after " + timeout );
251             throw new IOException( "Get Request timed out after " + timeout );
252         }
253         catch ( InterruptedException ex )
254         {
255             log.warn( "InterruptedException, Get Request timed out after " + timeout );
256             throw new IOException( "Get Request timed out after " + timeout );
257         }
258         catch (ExecutionException ex)
259         {
260             // assume that this is an IOException thrown by the callable.
261             log.error( "ExecutionException, Assuming an IO exception thrown in the background.", ex );
262             throw new IOException( "Get Request timed out after " + timeout );
263         }
264     }
265 
266     /**
267      * Calls get matching on the server. Each entry in the result is unwrapped.
268      * <p>
269      * @param pattern
270      * @return Map
271      * @throws IOException
272      */
273     @Override
274     public Map<Serializable, ICacheElement> processGetMatching( String pattern )
275         throws IOException
276     {
277         Map<Serializable, ICacheElement> results = new HashMap<Serializable, ICacheElement>();
278         try
279         {
280             Map<Serializable, ICacheElement> rawResults = getRemoteCacheService().getMatching( cacheName, pattern, getListenerId() );
281 
282             // Eventually the instance of will not be necessary.
283             if ( rawResults != null )
284             {
285                 for (Map.Entry<Serializable, ICacheElement> entry : rawResults.entrySet())
286                 {
287                     ICacheElement unwrappedResult = null;
288                     if ( entry.getValue() instanceof ICacheElementSerialized )
289                     {
290                         // Never try to deserialize if you are a cluster client. Cluster
291                         // clients are merely intra-remote cache communicators. Remote caches are assumed
292                         // to have no ability to deserialize the objects.
293                         if ( this.getRemoteCacheAttributes().getRemoteType() != IRemoteCacheAttributes.CLUSTER )
294                         {
295                             unwrappedResult = SerializationConversionUtil
296                                 .getDeSerializedCacheElement( (ICacheElementSerialized) entry.getValue(),
297                                                               this.elementSerializer );
298                         }
299                     }
300                     else
301                     {
302                         unwrappedResult = entry.getValue();
303                     }
304                     results.put( entry.getKey(), unwrappedResult );
305                 }
306             }
307         }
308         catch ( Exception ex )
309         {
310             handleException( ex, "Failed to getMatching [" + pattern + "] from [" + cacheName + "]",
311                              ICacheEventLogger.GET_EVENT );
312         }
313         return results;
314     }
315 
316     /**
317      * Gets multiple items from the cache based on the given set of keys.
318      * <p>
319      * @param keys
320      * @return a map of Serializable key to ICacheElement element, or an empty map if there is no
321      *         data in cache for any of these keys
322      * @throws IOException
323      */
324     @Override
325     protected Map<Serializable, ICacheElement> processGetMultiple( Set<Serializable> keys )
326         throws IOException
327     {
328         Map<Serializable, ICacheElement> elements = new HashMap<Serializable, ICacheElement>();
329         if ( keys != null && !keys.isEmpty() )
330         {
331             for (Serializable key : keys)
332             {
333                 ICacheElement element = get( key );
334 
335                 if ( element != null )
336                 {
337                     elements.put( key, element );
338                 }
339             }
340         }
341         return elements;
342     }
343 
344     /**
345      * Synchronously remove from the remote cache; if failed, replace the remote handle with a
346      * zombie.
347      * <p>
348      * @param key
349      * @return boolean, whether or not the item was removed
350      * @throws IOException
351      */
352     @Override
353     protected boolean processRemove( Serializable key )
354         throws IOException
355     {
356         if ( !this.getRemoteCacheAttributes().getGetOnly() )
357         {
358             if ( log.isDebugEnabled() )
359             {
360                 log.debug( "remove> key=" + key );
361             }
362             try
363             {
364                 getRemoteCacheService().remove( cacheName, key, getListenerId() );
365             }
366             catch ( Exception ex )
367             {
368                 handleException( ex, "Failed to remove " + key + " from " + cacheName, ICacheEventLogger.REMOVE_EVENT );
369             }
370             return true;
371         }
372         return false;
373     }
374 
375     /**
376      * Synchronously removeAll from the remote cache; if failed, replace the remote handle with a
377      * zombie.
378      * <p>
379      * @throws IOException
380      */
381     @Override
382     protected void processRemoveAll()
383         throws IOException
384     {
385         if ( !this.getRemoteCacheAttributes().getGetOnly() )
386         {
387             try
388             {
389                 getRemoteCacheService().removeAll( cacheName, getListenerId() );
390             }
391             catch ( Exception ex )
392             {
393                 handleException( ex, "Failed to remove all from " + cacheName, ICacheEventLogger.REMOVEALL_EVENT );
394             }
395         }
396     }
397 
398     /**
399      * Serializes the object and then calls update on the remote server with the byte array. The
400      * byte array is wrapped in a ICacheElementSerialized. This allows the remote server to operate
401      * without any knowledge of caches classes.
402      * <p>
403      * @param ce
404      * @throws IOException
405      */
406     @Override
407     protected void processUpdate( ICacheElement ce )
408         throws IOException
409     {
410         if ( !getRemoteCacheAttributes().getGetOnly() )
411         {
412             ICacheElementSerialized serialized = null;
413             try
414             {
415                 if ( log.isDebugEnabled() )
416                 {
417                     log.debug( "sending item to remote server" );
418                 }
419 
420                 // convert so we don't have to know about the object on the
421                 // other end.
422                 serialized = SerializationConversionUtil.getSerializedCacheElement( ce, this.elementSerializer );
423 
424                 remoteCacheService.update( serialized, getListenerId() );
425             }
426             catch ( NullPointerException npe )
427             {
428                 log.error( "npe for ce = " + ce + "ce.attr = " + ce.getElementAttributes(), npe );
429             }
430             catch ( Exception ex )
431             {
432                 // event queue will wait and retry
433                 handleException( ex, "Failed to put [" + ce.getKey() + "] to " + ce.getCacheName(),
434                                  ICacheEventLogger.UPDATE_EVENT );
435             }
436         }
437         else
438         {
439             if ( log.isDebugEnabled() )
440             {
441                 log.debug( "get only mode, not sending to remote server" );
442             }
443         }
444     }
445 
446     /**
447      * Returns all the keys for a group.
448      * <p>
449      * @param groupName
450      * @return Set
451      * @throws java.rmi.RemoteException
452      * @throws IOException
453      */
454     public Set<Serializable> getGroupKeys( String groupName )
455         throws java.rmi.RemoteException, IOException
456     {
457         return getRemoteCacheService().getGroupKeys( cacheName, groupName );
458     }
459 
460     /**
461      * Allows other member of this package to access the listener. This is mainly needed for
462      * deregistering a listener.
463      * <p>
464      * @return IRemoteCacheListener, the listener for this remote server
465      */
466     public IRemoteCacheListener getListener()
467     {
468         return getRemoteCacheListener();
469     }
470 
471     /**
472      * let the remote cache set a listener_id. Since there is only one listener for all the regions
473      * and every region gets registered? the id shouldn't be set if it isn't zero. If it is we
474      * assume that it is a reconnect.
475      * <p>
476      * @param id The new listenerId value
477      */
478     public void setListenerId( long id )
479     {
480         if ( getRemoteCacheListener() != null )
481         {
482             try
483             {
484                 getRemoteCacheListener().setListenerId( id );
485 
486                 if ( log.isDebugEnabled() )
487                 {
488                     log.debug( "set listenerId = " + id );
489                 }
490             }
491             catch ( Exception e )
492             {
493                 log.error( "Problem setting listenerId", e );
494             }
495         }
496     }
497 
498     /**
499      * Gets the listenerId attribute of the RemoteCacheListener object
500      * <p>
501      * @return The listenerId value
502      */
503     public long getListenerId()
504     {
505         if ( getRemoteCacheListener() != null )
506         {
507             try
508             {
509                 if ( log.isDebugEnabled() )
510                 {
511                     log.debug( "get listenerId = " + getRemoteCacheListener().getListenerId() );
512                 }
513                 return getRemoteCacheListener().getListenerId();
514             }
515             catch ( Exception e )
516             {
517                 log.error( "Problem getting listenerId", e );
518             }
519         }
520         return -1;
521     }
522 
523     /**
524      * Returns the current cache size.
525      * @return The size value
526      */
527     public int getSize()
528     {
529         return 0;
530     }
531 
532     /**
533      * Custom exception handling some children.  This should be used to initiate failover.
534      * <p>
535      * @param ex
536      * @param msg
537      * @param eventName
538      * @throws IOException
539      */
540     protected abstract void handleException( Exception ex, String msg, String eventName )
541         throws IOException;
542 
543     /**
544      * Gets the stats attribute of the RemoteCache object.
545      * <p>
546      * @return The stats value
547      */
548     public String getStats()
549     {
550         return getStatistics().toString();
551     }
552 
553     /**
554      * @return IStats object
555      */
556     public IStats getStatistics()
557     {
558         IStats stats = new Stats();
559         stats.setTypeName( "AbstractRemoteAuxiliaryCache" );
560 
561         ArrayList<IStatElement> elems = new ArrayList<IStatElement>();
562 
563         IStatElement se = null;
564 
565         se = new StatElement();
566         se.setName( "Remote Type" );
567         se.setData( this.getRemoteCacheAttributes().getRemoteTypeName() + "" );
568         elems.add( se );
569 
570         if ( this.getRemoteCacheAttributes().getRemoteType() == IRemoteCacheAttributes.CLUSTER )
571         {
572             // something cluster specific
573         }
574 
575         // no data gathered here
576 
577         se = new StatElement();
578         se.setName( "UsePoolForGet" );
579         se.setData( "" + usePoolForGet );
580         elems.add( se );
581 
582         if ( pool != null )
583         {
584             se = new StatElement();
585             se.setName( "Pool Size" );
586             se.setData( "" + pool.getPoolSize() );
587             elems.add( se );
588 
589             se = new StatElement();
590             se.setName( "Maximum Pool Size" );
591             se.setData( "" + pool.getMaximumPoolSize() );
592             elems.add( se );
593         }
594 
595         if ( getRemoteCacheService() instanceof ZombieRemoteCacheService )
596         {
597             se = new StatElement();
598             se.setName( "Zombie Queue Size" );
599             se.setData( "" + ( (ZombieRemoteCacheService) getRemoteCacheService() ).getQueueSize() );
600             elems.add( se );
601         }
602 
603         // get an array and put them in the Stats object
604         IStatElement[] ses = elems.toArray( new StatElement[0] );
605         stats.setStatElements( ses );
606 
607         return stats;
608     }
609 
610     /**
611      * Returns the cache status. An error status indicates the remote connection is not available.
612      * <p>
613      * @return The status value
614      */
615     public int getStatus()
616     {
617         return getRemoteCacheService() instanceof IZombie ? CacheConstants.STATUS_ERROR : CacheConstants.STATUS_ALIVE;
618     }
619 
620     /**
621      * Replaces the current remote cache service handle with the given handle. If the current remote
622      * is a Zombie, then it propagates any events that are queued to the restored service.
623      * <p>
624      * @param restoredRemote IRemoteCacheService -- the remote server or proxy to the remote server
625      */
626     public void fixCache( IRemoteCacheService restoredRemote )
627     {
628         if ( getRemoteCacheService() != null && getRemoteCacheService() instanceof ZombieRemoteCacheService )
629         {
630             ZombieRemoteCacheService zombie = (ZombieRemoteCacheService) getRemoteCacheService();
631             setRemoteCacheService( restoredRemote );
632             try
633             {
634                 zombie.propagateEvents( restoredRemote );
635             }
636             catch ( Exception e )
637             {
638                 try
639                 {
640                     handleException( e, "Problem propagating events from Zombie Queue to new Remote Service.",
641                                      "fixCache" );
642                 }
643                 catch ( IOException e1 )
644                 {
645                     // swallow, since this is just expected kick back.  Handle always throws
646                 }
647             }
648         }
649         else
650         {
651             setRemoteCacheService( restoredRemote );
652         }
653     }
654 
655 
656     /**
657      * Gets the cacheType attribute of the RemoteCache object
658      * @return The cacheType value
659      */
660     public int getCacheType()
661     {
662         return REMOTE_CACHE;
663     }
664 
665     /**
666      * Gets the cacheName attribute of the RemoteCache object.
667      * <p>
668      * @return The cacheName value
669      */
670     public String getCacheName()
671     {
672         return cacheName;
673     }
674 
675     /**
676      * @param remote the remote to set
677      */
678     protected void setRemoteCacheService( IRemoteCacheService remote )
679     {
680         this.remoteCacheService = remote;
681     }
682 
683     /**
684      * @return the remote
685      */
686     protected IRemoteCacheService getRemoteCacheService()
687     {
688         return remoteCacheService;
689     }
690 
691     /**
692      * @return Returns the AuxiliaryCacheAttributes.
693      */
694     public AuxiliaryCacheAttributes getAuxiliaryCacheAttributes()
695     {
696         return getRemoteCacheAttributes();
697     }
698 
699     /**
700      * @param remoteCacheAttributes the remoteCacheAttributes to set
701      */
702     protected void setRemoteCacheAttributes( IRemoteCacheAttributes remoteCacheAttributes )
703     {
704         this.remoteCacheAttributes = remoteCacheAttributes;
705     }
706 
707     /**
708      * @return the remoteCacheAttributes
709      */
710     protected IRemoteCacheAttributes getRemoteCacheAttributes()
711     {
712         return remoteCacheAttributes;
713     }
714 
715     /**
716      * @param remoteCacheListener the remoteCacheListener to set
717      */
718     protected void setRemoteCacheListener( IRemoteCacheListener remoteCacheListener )
719     {
720         this.remoteCacheListener = remoteCacheListener;
721     }
722 
723     /**
724      * @return the remoteCacheListener
725      */
726     protected IRemoteCacheListener getRemoteCacheListener()
727     {
728         return remoteCacheListener;
729     }
730 }