View Javadoc
1   package org.apache.commons.jcs3.auxiliary.remote;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.IOException;
23  import java.rmi.UnmarshalException;
24  import java.util.ArrayList;
25  import java.util.Collections;
26  import java.util.HashMap;
27  import java.util.Map;
28  import java.util.Set;
29  
30  import org.apache.commons.jcs3.auxiliary.AbstractAuxiliaryCache;
31  import org.apache.commons.jcs3.auxiliary.AuxiliaryCacheAttributes;
32  import org.apache.commons.jcs3.auxiliary.remote.behavior.IRemoteCacheClient;
33  import org.apache.commons.jcs3.engine.CacheAdaptor;
34  import org.apache.commons.jcs3.engine.CacheEventQueueFactory;
35  import org.apache.commons.jcs3.engine.CacheStatus;
36  import org.apache.commons.jcs3.engine.behavior.ICacheElement;
37  import org.apache.commons.jcs3.engine.behavior.ICacheEventQueue;
38  import org.apache.commons.jcs3.engine.behavior.ICacheServiceNonLocal;
39  import org.apache.commons.jcs3.engine.stats.StatElement;
40  import org.apache.commons.jcs3.engine.stats.Stats;
41  import org.apache.commons.jcs3.engine.stats.behavior.IStatElement;
42  import org.apache.commons.jcs3.engine.stats.behavior.IStats;
43  import org.apache.commons.jcs3.log.Log;
44  import org.apache.commons.jcs3.log.LogManager;
45  
46  /**
47   * The RemoteCacheNoWait wraps the RemoteCacheClient. The client holds a handle on the
48   * RemoteCacheService.
49   * <p>
50   * Used to queue up update requests to the underlying cache. These requests will be processed in
51   * their order of arrival via the cache event queue processor.
52   * <p>
53   * Typically errors will be handled down stream. We only need to kill the queue if an error makes it
54   * to this level from the queue. That can only happen if the queue is damaged, since the events are
55   * Processed asynchronously.
56   * <p>
57   * There is no reason to create a queue on startup if the remote is not healthy.
58   * <p>
59   * If the remote cache encounters an error it will zombie--create a balking facade for the service.
60   * The Zombie will queue up items until the connection is restored. An alternative way to accomplish
61   * the same thing would be to stop, not destroy the queue at this level. That way items would be
62   * added to the queue and then when the connection is restored, we could start the worker threads
63   * again. This is a better long term solution, but it requires some significant changes to the
64   * complicated worker queues.
65   */
66  public class RemoteCacheNoWait<K, V>
67      extends AbstractAuxiliaryCache<K, V>
68  {
69      /** log instance */
70      private static final Log log = LogManager.getLog( RemoteCacheNoWait.class );
71  
72      /** The remote cache client */
73      private final IRemoteCacheClient<K, V> remoteCacheClient;
74  
75      /** Event queue for queuing up calls like put and remove. */
76      private ICacheEventQueue<K, V> cacheEventQueue;
77  
78      /** how many times get has been called. */
79      private int getCount;
80  
81      /** how many times getMatching has been called. */
82      private int getMatchingCount;
83  
84      /** how many times getMultiple has been called. */
85      private int getMultipleCount;
86  
87      /** how many times remove has been called. */
88      private int removeCount;
89  
90      /** how many times put has been called. */
91      private int putCount;
92  
93      /**
94       * Constructs with the given remote cache, and fires up an event queue for asynchronous
95       * processing.
96       * <p>
97       * @param cache
98       */
99      public RemoteCacheNoWait( final IRemoteCacheClient<K, V> cache )
100     {
101         this.remoteCacheClient = cache;
102         this.cacheEventQueue = createCacheEventQueue(cache);
103 
104         if ( remoteCacheClient.getStatus() == CacheStatus.ERROR )
105         {
106             cacheEventQueue.destroy();
107         }
108     }
109 
110     /**
111      * Create a cache event queue from the parameters of the remote client
112      * @param client the remote client
113      */
114     private ICacheEventQueue<K, V> createCacheEventQueue( final IRemoteCacheClient<K, V> client )
115     {
116         final CacheEventQueueFactory<K, V> factory = new CacheEventQueueFactory<>();
117         return factory.createCacheEventQueue(
118             new CacheAdaptor<>( client ),
119             client.getListenerId(),
120             client.getCacheName(),
121             client.getAuxiliaryCacheAttributes().getEventQueuePoolName(),
122             client.getAuxiliaryCacheAttributes().getEventQueueType() );
123     }
124 
125     /**
126      * Adds a put event to the queue.
127      * <p>
128      * @param element
129      * @throws IOException
130      */
131     @Override
132     public void update( final ICacheElement<K, V> element )
133         throws IOException
134     {
135         putCount++;
136         try
137         {
138             cacheEventQueue.addPutEvent( element );
139         }
140         catch ( final IOException e )
141         {
142             log.error( "Problem adding putEvent to queue.", e );
143             cacheEventQueue.destroy();
144             throw e;
145         }
146     }
147 
148     /**
149      * Synchronously reads from the remote cache.
150      * <p>
151      * @param key
152      * @return element from the remote cache, or null if not present
153      * @throws IOException
154      */
155     @Override
156     public ICacheElement<K, V> get( final K key )
157         throws IOException
158     {
159         getCount++;
160         try
161         {
162             return remoteCacheClient.get( key );
163         }
164         catch ( final UnmarshalException ue )
165         {
166             log.debug( "Retrying the get owing to UnmarshalException." );
167 
168             try
169             {
170                 return remoteCacheClient.get( key );
171             }
172             catch ( final IOException ex )
173             {
174                 log.info( "Failed in retrying the get for the second time. ", ex );
175             }
176         }
177         catch ( final IOException ex )
178         {
179             // We don't want to destroy the queue on a get failure.
180             // The RemoteCache will Zombie and queue.
181             // Since get does not use the queue, I don't want to kill the queue.
182             throw ex;
183         }
184 
185         return null;
186     }
187 
188     /**
189      * @param pattern
190      * @return Map
191      * @throws IOException
192      *
193      */
194     @Override
195     public Map<K, ICacheElement<K, V>> getMatching( final String pattern )
196         throws IOException
197     {
198         getMatchingCount++;
199         try
200         {
201             return remoteCacheClient.getMatching( pattern );
202         }
203         catch ( final UnmarshalException ue )
204         {
205             log.debug( "Retrying the getMatching owing to UnmarshalException." );
206 
207             try
208             {
209                 return remoteCacheClient.getMatching( pattern );
210             }
211             catch ( final IOException ex )
212             {
213                 log.info( "Failed in retrying the getMatching for the second time.", ex );
214             }
215         }
216         catch ( final IOException ex )
217         {
218             // We don't want to destroy the queue on a get failure.
219             // The RemoteCache will Zombie and queue.
220             // Since get does not use the queue, I don't want to kill the queue.
221             throw ex;
222         }
223 
224         return Collections.emptyMap();
225     }
226 
227     /**
228      * Gets multiple items from the cache based on the given set of keys. Sends the getMultiple
229      * request on to the server rather than looping through the requested keys.
230      * <p>
231      * @param keys
232      * @return a map of K key to ICacheElement&lt;K, V&gt; element, or an empty map if there is no
233      *         data in cache for any of these keys
234      * @throws IOException
235      */
236     @Override
237     public Map<K, ICacheElement<K, V>> getMultiple( final Set<K> keys )
238         throws IOException
239     {
240         getMultipleCount++;
241         try
242         {
243             return remoteCacheClient.getMultiple( keys );
244         }
245         catch ( final UnmarshalException ue )
246         {
247             log.debug( "Retrying the getMultiple owing to UnmarshalException..." );
248 
249             try
250             {
251                 return remoteCacheClient.getMultiple( keys );
252             }
253             catch ( final IOException ex )
254             {
255                 log.info( "Failed in retrying the getMultiple for the second time.", ex );
256             }
257         }
258         catch ( final IOException ex )
259         {
260             // We don't want to destroy the queue on a get failure.
261             // The RemoteCache will Zombie and queue.
262             // Since get does not use the queue, I don't want to kill the queue.
263             throw ex;
264         }
265 
266         return new HashMap<>();
267     }
268 
269     /**
270      * Return the keys in this cache.
271      * <p>
272      * @see org.apache.commons.jcs3.auxiliary.AuxiliaryCache#getKeySet()
273      */
274     @Override
275     public Set<K> getKeySet() throws IOException
276     {
277         return remoteCacheClient.getKeySet();
278     }
279 
280     /**
281      * Adds a remove request to the remote cache.
282      * <p>
283      * @param key
284      * @return if this was successful
285      * @throws IOException
286      */
287     @Override
288     public boolean remove( final K key )
289         throws IOException
290     {
291         removeCount++;
292         try
293         {
294             cacheEventQueue.addRemoveEvent( key );
295         }
296         catch ( final IOException e )
297         {
298             log.error( "Problem adding RemoveEvent to queue.", e );
299             cacheEventQueue.destroy();
300             throw e;
301         }
302         return false;
303     }
304 
305     /**
306      * Adds a removeAll request to the remote cache.
307      * <p>
308      * @throws IOException
309      */
310     @Override
311     public void removeAll()
312         throws IOException
313     {
314         try
315         {
316             cacheEventQueue.addRemoveAllEvent();
317         }
318         catch ( final IOException e )
319         {
320             log.error( "Problem adding RemoveAllEvent to queue.", e );
321             cacheEventQueue.destroy();
322             throw e;
323         }
324     }
325 
326     /** Adds a dispose request to the remote cache. */
327     @Override
328     public void dispose()
329     {
330         try
331         {
332             cacheEventQueue.addDisposeEvent();
333         }
334         catch ( final IOException e )
335         {
336             log.error( "Problem adding DisposeEvent to queue.", e );
337             cacheEventQueue.destroy();
338         }
339     }
340 
341     /**
342      * No remote invocation.
343      * <p>
344      * @return The size value
345      */
346     @Override
347     public int getSize()
348     {
349         return remoteCacheClient.getSize();
350     }
351 
352     /**
353      * No remote invocation.
354      * <p>
355      * @return The cacheType value
356      */
357     @Override
358     public CacheType getCacheType()
359     {
360         return CacheType.REMOTE_CACHE;
361     }
362 
363     /**
364      * Returns the async cache status. An error status indicates either the remote connection is not
365      * available, or the asyn queue has been unexpectedly destroyed. No remote invocation.
366      * <p>
367      * @return The status value
368      */
369     @Override
370     public CacheStatus getStatus()
371     {
372         return cacheEventQueue.isWorking() ? remoteCacheClient.getStatus() : CacheStatus.ERROR;
373     }
374 
375     /**
376      * Gets the cacheName attribute of the RemoteCacheNoWait object
377      * <p>
378      * @return The cacheName value
379      */
380     @Override
381     public String getCacheName()
382     {
383         return remoteCacheClient.getCacheName();
384     }
385 
386     /**
387      * Replaces the remote cache service handle with the given handle and reset the event queue by
388      * starting up a new instance.
389      * <p>
390      * @param remote
391      */
392     public void fixCache( final ICacheServiceNonLocal<?, ?> remote )
393     {
394         remoteCacheClient.fixCache( remote );
395         resetEventQ();
396     }
397 
398     /**
399      * Resets the event q by first destroying the existing one and starting up new one.
400      * <p>
401      * There may be no good reason to kill the existing queue. We will sometimes need to set a new
402      * listener id, so we should create a new queue. We should let the old queue drain. If we were
403      * Connected to the failover, it would be best to finish sending items.
404      */
405     public void resetEventQ()
406     {
407         final ICacheEventQueue<K, V> previousQueue = cacheEventQueue;
408 
409         this.cacheEventQueue = createCacheEventQueue(this.remoteCacheClient);
410 
411         if ( previousQueue.isWorking() )
412         {
413             // we don't expect anything, it would have all gone to the zombie
414             log.info( "resetEventQ, previous queue has [{0}] items queued up.",
415                     previousQueue::size);
416             previousQueue.destroy();
417         }
418     }
419 
420     /**
421      * This is temporary. It allows the manager to get the lister.
422      * <p>
423      * @return the instance of the remote cache client used by this object
424      */
425     protected IRemoteCacheClient<K, V> getRemoteCache()
426     {
427         return remoteCacheClient;
428     }
429 
430     /**
431      * @return Returns the AuxiliaryCacheAttributes.
432      */
433     @Override
434     public AuxiliaryCacheAttributes getAuxiliaryCacheAttributes()
435     {
436         return remoteCacheClient.getAuxiliaryCacheAttributes();
437     }
438 
439     /**
440      * This is for testing only. It allows you to take a look at the event queue.
441      * <p>
442      * @return ICacheEventQueue
443      */
444     protected ICacheEventQueue<K, V> getCacheEventQueue()
445     {
446         return this.cacheEventQueue;
447     }
448 
449     /**
450      * Returns the stats and the cache.toString().
451      * <p>
452      * @see Object#toString()
453      */
454     @Override
455     public String toString()
456     {
457         return getStats() + "\n" + remoteCacheClient.toString();
458     }
459 
460     /**
461      * Returns the statistics in String form.
462      * <p>
463      * @return String
464      */
465     @Override
466     public String getStats()
467     {
468         return getStatistics().toString();
469     }
470 
471     /**
472      * @return statistics about this communication
473      */
474     @Override
475     public IStats getStatistics()
476     {
477         final IStats stats = new Stats();
478         stats.setTypeName( "Remote Cache No Wait" );
479 
480         final ArrayList<IStatElement<?>> elems = new ArrayList<>();
481 
482         elems.add(new StatElement<>( "Status", getStatus() ) );
483 
484         // get the stats from the cache queue too
485         final IStats cStats = this.remoteCacheClient.getStatistics();
486         if ( cStats != null )
487         {
488             elems.addAll(cStats.getStatElements());
489         }
490 
491         // get the stats from the event queue too
492         final IStats eqStats = this.cacheEventQueue.getStatistics();
493         elems.addAll(eqStats.getStatElements());
494 
495         elems.add(new StatElement<>( "Get Count", Integer.valueOf(this.getCount) ) );
496         elems.add(new StatElement<>( "GetMatching Count", Integer.valueOf(this.getMatchingCount) ) );
497         elems.add(new StatElement<>( "GetMultiple Count", Integer.valueOf(this.getMultipleCount) ) );
498         elems.add(new StatElement<>( "Remove Count", Integer.valueOf(this.removeCount) ) );
499         elems.add(new StatElement<>( "Put Count", Integer.valueOf(this.putCount) ) );
500 
501         stats.setStatElements( elems );
502 
503         return stats;
504     }
505 
506     /**
507      * this won't be called since we don't do ICache logging here.
508      * <p>
509      * @return String
510      */
511     @Override
512     public String getEventLoggingExtraInfo()
513     {
514         return "Remote Cache No Wait";
515     }
516 }