View Javadoc
1   package org.apache.commons.jcs.auxiliary.remote;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.IOException;
23  import java.rmi.UnmarshalException;
24  import java.util.ArrayList;
25  import java.util.Collections;
26  import java.util.HashMap;
27  import java.util.Map;
28  import java.util.Set;
29  
30  import org.apache.commons.jcs.auxiliary.AbstractAuxiliaryCache;
31  import org.apache.commons.jcs.auxiliary.AuxiliaryCacheAttributes;
32  import org.apache.commons.jcs.auxiliary.remote.behavior.IRemoteCacheClient;
33  import org.apache.commons.jcs.engine.CacheAdaptor;
34  import org.apache.commons.jcs.engine.CacheEventQueueFactory;
35  import org.apache.commons.jcs.engine.CacheStatus;
36  import org.apache.commons.jcs.engine.behavior.ICacheElement;
37  import org.apache.commons.jcs.engine.behavior.ICacheEventQueue;
38  import org.apache.commons.jcs.engine.behavior.ICacheServiceNonLocal;
39  import org.apache.commons.jcs.engine.stats.StatElement;
40  import org.apache.commons.jcs.engine.stats.Stats;
41  import org.apache.commons.jcs.engine.stats.behavior.IStatElement;
42  import org.apache.commons.jcs.engine.stats.behavior.IStats;
43  import org.apache.commons.logging.Log;
44  import org.apache.commons.logging.LogFactory;
45  
46  /**
47   * The RemoteCacheNoWait wraps the RemoteCacheClient. The client holds a handle on the
48   * RemoteCacheService.
49   * <p>
50   * Used to queue up update requests to the underlying cache. These requests will be processed in
51   * their order of arrival via the cache event queue processor.
52   * <p>
53   * Typically errors will be handled down stream. We only need to kill the queue if an error makes it
54   * to this level from the queue. That can only happen if the queue is damaged, since the events are
55   * Processed asynchronously.
56   * <p>
57   * There is no reason to create a queue on startup if the remote is not healthy.
58   * <p>
59   * If the remote cache encounters an error it will zombie--create a balking facade for the service.
60   * The Zombie will queue up items until the connection is restored. An alternative way to accomplish
61   * the same thing would be to stop, not destroy the queue at this level. That way items would be
62   * added to the queue and then when the connection is restored, we could start the worker threads
63   * again. This is a better long term solution, but it requires some significant changes to the
64   * complicated worker queues.
65   */
66  public class RemoteCacheNoWait<K, V>
67      extends AbstractAuxiliaryCache<K, V>
68  {
69      /** log instance */
70      private static final Log log = LogFactory.getLog( RemoteCacheNoWait.class );
71  
72      /** The remote cache client */
73      private final IRemoteCacheClient<K, V> remoteCacheClient;
74  
75      /** Event queue for queuing up calls like put and remove. */
76      private ICacheEventQueue<K, V> cacheEventQueue;
77  
78      /** how many times get has been called. */
79      private int getCount = 0;
80  
81      /** how many times getMatching has been called. */
82      private int getMatchingCount = 0;
83  
84      /** how many times getMultiple has been called. */
85      private int getMultipleCount = 0;
86  
87      /** how many times remove has been called. */
88      private int removeCount = 0;
89  
90      /** how many times put has been called. */
91      private int putCount = 0;
92  
93      /**
94       * Constructs with the given remote cache, and fires up an event queue for asynchronous
95       * processing.
96       * <p>
97       * @param cache
98       */
99      public RemoteCacheNoWait( IRemoteCacheClient<K, V> cache )
100     {
101         remoteCacheClient = cache;
102         this.cacheEventQueue = createCacheEventQueue(cache);
103 
104         if ( remoteCacheClient.getStatus() == CacheStatus.ERROR )
105         {
106             cacheEventQueue.destroy();
107         }
108     }
109 
110     /**
111      * Create a cache event queue from the parameters of the remote client
112      * @param client the remote client
113      */
114     private ICacheEventQueue<K, V> createCacheEventQueue( IRemoteCacheClient<K, V> client )
115     {
116         CacheEventQueueFactory<K, V> factory = new CacheEventQueueFactory<K, V>();
117         ICacheEventQueue<K, V> ceq = factory.createCacheEventQueue(
118             new CacheAdaptor<K, V>( client ),
119             client.getListenerId(),
120             client.getCacheName(),
121             client.getAuxiliaryCacheAttributes().getEventQueuePoolName(),
122             client.getAuxiliaryCacheAttributes().getEventQueueType() );
123         return ceq;
124     }
125 
126     /**
127      * Adds a put event to the queue.
128      * <p>
129      * @param element
130      * @throws IOException
131      */
132     @Override
133     public void update( ICacheElement<K, V> element )
134         throws IOException
135     {
136         putCount++;
137         try
138         {
139             cacheEventQueue.addPutEvent( element );
140         }
141         catch ( IOException e )
142         {
143             log.error( "Problem adding putEvent to queue.", e );
144             cacheEventQueue.destroy();
145             throw e;
146         }
147     }
148 
149     /**
150      * Synchronously reads from the remote cache.
151      * <p>
152      * @param key
153      * @return element from the remote cache, or null if not present
154      * @throws IOException
155      */
156     @Override
157     public ICacheElement<K, V> get( K key )
158         throws IOException
159     {
160         getCount++;
161         try
162         {
163             return remoteCacheClient.get( key );
164         }
165         catch ( UnmarshalException ue )
166         {
167             if ( log.isDebugEnabled() )
168             {
169                 log.debug( "Retrying the get owing to UnmarshalException." );
170             }
171 
172             try
173             {
174                 return remoteCacheClient.get( key );
175             }
176             catch ( IOException ex )
177             {
178                 if ( log.isInfoEnabled() )
179                 {
180                     log.info( "Failed in retrying the get for the second time. " + ex.getMessage() );
181                 }
182             }
183         }
184         catch ( IOException ex )
185         {
186             // We don't want to destroy the queue on a get failure.
187             // The RemoteCache will Zombie and queue.
188             // Since get does not use the queue, I don't want to kill the queue.
189             throw ex;
190         }
191 
192         return null;
193     }
194 
195     /**
196      * @param pattern
197      * @return Map
198      * @throws IOException
199      *
200      */
201     @Override
202     public Map<K, ICacheElement<K, V>> getMatching( String pattern )
203         throws IOException
204     {
205         getMatchingCount++;
206         try
207         {
208             return remoteCacheClient.getMatching( pattern );
209         }
210         catch ( UnmarshalException ue )
211         {
212             if ( log.isDebugEnabled() )
213             {
214                 log.debug( "Retrying the getMatching owing to UnmarshalException." );
215             }
216 
217             try
218             {
219                 return remoteCacheClient.getMatching( pattern );
220             }
221             catch ( IOException ex )
222             {
223                 if ( log.isInfoEnabled() )
224                 {
225                     log.info( "Failed in retrying the getMatching for the second time. " + ex.getMessage() );
226                 }
227             }
228         }
229         catch ( IOException ex )
230         {
231             // We don't want to destroy the queue on a get failure.
232             // The RemoteCache will Zombie and queue.
233             // Since get does not use the queue, I don't want to kill the queue.
234             throw ex;
235         }
236 
237         return Collections.emptyMap();
238     }
239 
240     /**
241      * Gets multiple items from the cache based on the given set of keys. Sends the getMultiple
242      * request on to the server rather than looping through the requested keys.
243      * <p>
244      * @param keys
245      * @return a map of K key to ICacheElement&lt;K, V&gt; element, or an empty map if there is no
246      *         data in cache for any of these keys
247      * @throws IOException
248      */
249     @Override
250     public Map<K, ICacheElement<K, V>> getMultiple( Set<K> keys )
251         throws IOException
252     {
253         getMultipleCount++;
254         try
255         {
256             return remoteCacheClient.getMultiple( keys );
257         }
258         catch ( UnmarshalException ue )
259         {
260             if ( log.isDebugEnabled() )
261             {
262                 log.debug( "Retrying the getMultiple owing to UnmarshalException..." );
263             }
264 
265             try
266             {
267                 return remoteCacheClient.getMultiple( keys );
268             }
269             catch ( IOException ex )
270             {
271                 if ( log.isInfoEnabled() )
272                 {
273                     log.info( "Failed in retrying the getMultiple for the second time. " + ex.getMessage() );
274                 }
275             }
276         }
277         catch ( IOException ex )
278         {
279             // We don't want to destroy the queue on a get failure.
280             // The RemoteCache will Zombie and queue.
281             // Since get does not use the queue, I don't want to kill the queue.
282             throw ex;
283         }
284 
285         return new HashMap<K, ICacheElement<K, V>>();
286     }
287 
288     /**
289      * Return the keys in this cache.
290      * <p>
291      * @see org.apache.commons.jcs.auxiliary.AuxiliaryCache#getKeySet()
292      */
293     @Override
294     public Set<K> getKeySet() throws IOException
295     {
296         return remoteCacheClient.getKeySet();
297     }
298 
299     /**
300      * Adds a remove request to the remote cache.
301      * <p>
302      * @param key
303      * @return if this was successful
304      * @throws IOException
305      */
306     @Override
307     public boolean remove( K key )
308         throws IOException
309     {
310         removeCount++;
311         try
312         {
313             cacheEventQueue.addRemoveEvent( key );
314         }
315         catch ( IOException e )
316         {
317             log.error( "Problem adding RemoveEvent to queue.", e );
318             cacheEventQueue.destroy();
319             throw e;
320         }
321         return false;
322     }
323 
324     /**
325      * Adds a removeAll request to the remote cache.
326      * <p>
327      * @throws IOException
328      */
329     @Override
330     public void removeAll()
331         throws IOException
332     {
333         try
334         {
335             cacheEventQueue.addRemoveAllEvent();
336         }
337         catch ( IOException e )
338         {
339             log.error( "Problem adding RemoveAllEvent to queue.", e );
340             cacheEventQueue.destroy();
341             throw e;
342         }
343     }
344 
345     /** Adds a dispose request to the remote cache. */
346     @Override
347     public void dispose()
348     {
349         try
350         {
351             cacheEventQueue.addDisposeEvent();
352         }
353         catch ( IOException e )
354         {
355             log.error( "Problem adding DisposeEvent to queue.", e );
356             cacheEventQueue.destroy();
357         }
358     }
359 
360     /**
361      * No remote invocation.
362      * <p>
363      * @return The size value
364      */
365     @Override
366     public int getSize()
367     {
368         return remoteCacheClient.getSize();
369     }
370 
371     /**
372      * No remote invocation.
373      * <p>
374      * @return The cacheType value
375      */
376     @Override
377     public CacheType getCacheType()
378     {
379         return CacheType.REMOTE_CACHE;
380     }
381 
382     /**
383      * Returns the asyn cache status. An error status indicates either the remote connection is not
384      * available, or the asyn queue has been unexpectedly destroyed. No remote invocation.
385      * <p>
386      * @return The status value
387      */
388     @Override
389     public CacheStatus getStatus()
390     {
391         return cacheEventQueue.isWorking() ? remoteCacheClient.getStatus() : CacheStatus.ERROR;
392     }
393 
394     /**
395      * Gets the cacheName attribute of the RemoteCacheNoWait object
396      * <p>
397      * @return The cacheName value
398      */
399     @Override
400     public String getCacheName()
401     {
402         return remoteCacheClient.getCacheName();
403     }
404 
405     /**
406      * Replaces the remote cache service handle with the given handle and reset the event queue by
407      * starting up a new instance.
408      * <p>
409      * @param remote
410      */
411     public void fixCache( ICacheServiceNonLocal<?, ?> remote )
412     {
413         remoteCacheClient.fixCache( remote );
414         resetEventQ();
415     }
416 
417     /**
418      * Resets the event q by first destroying the existing one and starting up new one.
419      * <p>
420      * There may be no good reason to kill the existing queue. We will sometimes need to set a new
421      * listener id, so we should create a new queue. We should let the old queue drain. If we were
422      * Connected to the failover, it would be best to finish sending items.
423      */
424     public void resetEventQ()
425     {
426         ICacheEventQueue<K, V> previousQueue = cacheEventQueue;
427 
428         this.cacheEventQueue = createCacheEventQueue(this.remoteCacheClient);
429 
430         if ( previousQueue.isWorking() )
431         {
432             // we don't expect anything, it would have all gone to the zombie
433             if ( log.isInfoEnabled() )
434             {
435                 log.info( "resetEventQ, previous queue has [" + previousQueue.size() + "] items queued up." );
436             }
437             previousQueue.destroy();
438         }
439     }
440 
441     /**
442      * This is temporary. It allows the manager to get the lister.
443      * <p>
444      * @return the instance of the remote cache client used by this object
445      */
446     protected IRemoteCacheClient<K, V> getRemoteCache()
447     {
448         return remoteCacheClient;
449     }
450 
451     /**
452      * @return Returns the AuxiliaryCacheAttributes.
453      */
454     @Override
455     public AuxiliaryCacheAttributes getAuxiliaryCacheAttributes()
456     {
457         return remoteCacheClient.getAuxiliaryCacheAttributes();
458     }
459 
460     /**
461      * This is for testing only. It allows you to take a look at the event queue.
462      * <p>
463      * @return ICacheEventQueue
464      */
465     protected ICacheEventQueue<K, V> getCacheEventQueue()
466     {
467         return this.cacheEventQueue;
468     }
469 
470     /**
471      * Returns the stats and the cache.toString().
472      * <p>
473      * @see java.lang.Object#toString()
474      */
475     @Override
476     public String toString()
477     {
478         return getStats() + "\n" + remoteCacheClient.toString();
479     }
480 
481     /**
482      * Returns the statistics in String form.
483      * <p>
484      * @return String
485      */
486     @Override
487     public String getStats()
488     {
489         return getStatistics().toString();
490     }
491 
492     /**
493      * @return statistics about this communication
494      */
495     @Override
496     public IStats getStatistics()
497     {
498         IStats stats = new Stats();
499         stats.setTypeName( "Remote Cache No Wait" );
500 
501         ArrayList<IStatElement<?>> elems = new ArrayList<IStatElement<?>>();
502 
503         elems.add(new StatElement<CacheStatus>( "Status", getStatus() ) );
504 
505         // get the stats from the cache queue too
506         IStats cStats = this.remoteCacheClient.getStatistics();
507         if ( cStats != null )
508         {
509             elems.addAll(cStats.getStatElements());
510         }
511 
512         // get the stats from the event queue too
513         IStats eqStats = this.cacheEventQueue.getStatistics();
514         elems.addAll(eqStats.getStatElements());
515 
516         elems.add(new StatElement<Integer>( "Get Count", Integer.valueOf(this.getCount) ) );
517         elems.add(new StatElement<Integer>( "GetMatching Count", Integer.valueOf(this.getMatchingCount) ) );
518         elems.add(new StatElement<Integer>( "GetMultiple Count", Integer.valueOf(this.getMultipleCount) ) );
519         elems.add(new StatElement<Integer>( "Remove Count", Integer.valueOf(this.removeCount) ) );
520         elems.add(new StatElement<Integer>( "Put Count", Integer.valueOf(this.putCount) ) );
521 
522         stats.setStatElements( elems );
523 
524         return stats;
525     }
526 
527     /**
528      * this won't be called since we don't do ICache logging here.
529      * <p>
530      * @return String
531      */
532     @Override
533     public String getEventLoggingExtraInfo()
534     {
535         return "Remote Cache No Wait";
536     }
537 }