View Javadoc
1   package org.apache.commons.jcs3.auxiliary.lateral;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.IOException;
23  import java.rmi.UnmarshalException;
24  import java.util.ArrayList;
25  import java.util.Collections;
26  import java.util.HashMap;
27  import java.util.Map;
28  import java.util.Map.Entry;
29  import java.util.Set;
30  import java.util.stream.Collectors;
31  
32  import org.apache.commons.jcs3.auxiliary.AbstractAuxiliaryCache;
33  import org.apache.commons.jcs3.auxiliary.lateral.behavior.ILateralCacheAttributes;
34  import org.apache.commons.jcs3.engine.CacheAdaptor;
35  import org.apache.commons.jcs3.engine.CacheEventQueueFactory;
36  import org.apache.commons.jcs3.engine.CacheInfo;
37  import org.apache.commons.jcs3.engine.CacheStatus;
38  import org.apache.commons.jcs3.engine.behavior.ICacheElement;
39  import org.apache.commons.jcs3.engine.behavior.ICacheEventQueue;
40  import org.apache.commons.jcs3.engine.behavior.ICacheServiceNonLocal;
41  import org.apache.commons.jcs3.engine.stats.StatElement;
42  import org.apache.commons.jcs3.engine.stats.Stats;
43  import org.apache.commons.jcs3.engine.stats.behavior.IStatElement;
44  import org.apache.commons.jcs3.engine.stats.behavior.IStats;
45  import org.apache.commons.jcs3.log.Log;
46  import org.apache.commons.jcs3.log.LogManager;
47  
48  /**
49   * Used to queue up update requests to the underlying cache. These requests will be processed in
50   * their order of arrival via the cache event queue processor.
51   */
52  public class LateralCacheNoWait<K, V>
53      extends AbstractAuxiliaryCache<K, V>
54  {
55      /** The logger. */
56      private static final Log log = LogManager.getLog( LateralCacheNoWait.class );
57  
58      /** The cache */
59      private final LateralCache<K, V> cache;
60  
61      /** Identify this object */
62      private String identityKey;
63  
64      /** The event queue */
65      private ICacheEventQueue<K, V> eventQueue;
66  
67      /** times get called */
68      private int getCount;
69  
70      /** times remove called */
71      private int removeCount;
72  
73      /** times put called */
74      private int putCount;
75  
76      /**
77       * Constructs with the given lateral cache, and fires up an event queue for asynchronous
78       * processing.
79       * <p>
80       * @param cache
81       */
82      public LateralCacheNoWait( final LateralCache<K, V> cache )
83      {
84          this.cache = cache;
85          this.identityKey = cache.getCacheName();
86          this.setCacheEventLogger(cache.getCacheEventLogger());
87          this.setElementSerializer(cache.getElementSerializer());
88  
89          log.debug( "Constructing LateralCacheNoWait, LateralCache = [{0}]", cache );
90  
91          final CacheEventQueueFactory<K, V> fact = new CacheEventQueueFactory<>();
92          this.eventQueue = fact.createCacheEventQueue( new CacheAdaptor<>( cache ),
93                  CacheInfo.listenerId, cache.getCacheName(),
94                  getAuxiliaryCacheAttributes().getEventQueuePoolName(),
95                  getAuxiliaryCacheAttributes().getEventQueueType() );
96  
97          // need each no wait to handle each of its real updates and removes,
98          // since there may
99          // be more than one per cache? alternative is to have the cache
100         // perform updates using a different method that specifies the listener
101         // this.q = new CacheEventQueue(new CacheAdaptor(this),
102         // LateralCacheInfo.listenerId, cache.getCacheName());
103         if ( cache.getStatus() == CacheStatus.ERROR )
104         {
105             eventQueue.destroy();
106         }
107     }
108 
109     /**
110      * The identifying key to this no wait
111      *
112      * @return the identity key
113      * @since 3.1
114      */
115     public String getIdentityKey()
116     {
117         return identityKey;
118     }
119 
120     /**
121      * Set the identifying key to this no wait
122      *
123      * @param identityKey the identityKey to set
124      * @since 3.1
125      */
126     public void setIdentityKey(String identityKey)
127     {
128         this.identityKey = identityKey;
129     }
130 
131     /**
132      * @param ce
133      * @throws IOException
134      */
135     @Override
136     public void update( final ICacheElement<K, V> ce )
137         throws IOException
138     {
139         putCount++;
140         try
141         {
142             eventQueue.addPutEvent( ce );
143         }
144         catch ( final IOException ex )
145         {
146             log.error( ex );
147             eventQueue.destroy();
148         }
149     }
150 
151     /**
152      * Synchronously reads from the lateral cache.
153      * <p>
154      * @param key
155      * @return ICacheElement&lt;K, V&gt; if found, else null
156      */
157     @Override
158     public ICacheElement<K, V> get( final K key )
159     {
160         getCount++;
161         if ( this.getStatus() != CacheStatus.ERROR )
162         {
163             try
164             {
165                 return cache.get( key );
166             }
167             catch ( final UnmarshalException ue )
168             {
169                 log.debug( "Retrying the get owing to UnmarshalException..." );
170                 try
171                 {
172                     return cache.get( key );
173                 }
174                 catch ( final IOException ex )
175                 {
176                     log.error( "Failed in retrying the get for the second time." );
177                     eventQueue.destroy();
178                 }
179             }
180             catch ( final IOException ex )
181             {
182                 eventQueue.destroy();
183             }
184         }
185         return null;
186     }
187 
188     /**
189      * Gets multiple items from the cache based on the given set of keys.
190      * <p>
191      * @param keys
192      * @return a map of K key to ICacheElement&lt;K, V&gt; element, or an empty map if there is no
193      *         data in cache for any of these keys
194      */
195     @Override
196     public Map<K, ICacheElement<K, V>> getMultiple(final Set<K> keys)
197     {
198         if ( keys != null && !keys.isEmpty() )
199         {
200             return keys.stream()
201                 .collect(Collectors.toMap(
202                         key -> key,
203                         this::get)).entrySet().stream()
204                     .filter(entry -> entry.getValue() != null)
205                     .collect(Collectors.toMap(
206                             Entry::getKey,
207                             Entry::getValue));
208         }
209 
210         return new HashMap<>();
211     }
212 
213     /**
214      * Synchronously reads from the lateral cache.
215      * <p>
216      * @param pattern
217      * @return ICacheElement&lt;K, V&gt; if found, else empty
218      */
219     @Override
220     public Map<K, ICacheElement<K, V>> getMatching(final String pattern)
221     {
222         getCount++;
223         if ( this.getStatus() != CacheStatus.ERROR )
224         {
225             try
226             {
227                 return cache.getMatching( pattern );
228             }
229             catch ( final UnmarshalException ue )
230             {
231                 log.debug( "Retrying the get owing to UnmarshalException." );
232                 try
233                 {
234                     return cache.getMatching( pattern );
235                 }
236                 catch ( final IOException ex )
237                 {
238                     log.error( "Failed in retrying the get for the second time." );
239                     eventQueue.destroy();
240                 }
241             }
242             catch ( final IOException ex )
243             {
244                 eventQueue.destroy();
245             }
246         }
247         return Collections.emptyMap();
248     }
249 
250     /**
251      * Return the keys in this cache.
252      * <p>
253      * @see org.apache.commons.jcs3.auxiliary.AuxiliaryCache#getKeySet()
254      */
255     @Override
256     public Set<K> getKeySet() throws IOException
257     {
258         try
259         {
260             return cache.getKeySet();
261         }
262         catch ( final IOException ex )
263         {
264             log.error( ex );
265             eventQueue.destroy();
266         }
267         return Collections.emptySet();
268     }
269 
270     /**
271      * Adds a remove request to the lateral cache.
272      * <p>
273      * @param key
274      * @return always false
275      */
276     @Override
277     public boolean remove( final K key )
278     {
279         removeCount++;
280         try
281         {
282             eventQueue.addRemoveEvent( key );
283         }
284         catch ( final IOException ex )
285         {
286             log.error( ex );
287             eventQueue.destroy();
288         }
289         return false;
290     }
291 
292     /** Adds a removeAll request to the lateral cache. */
293     @Override
294     public void removeAll()
295     {
296         try
297         {
298             eventQueue.addRemoveAllEvent();
299         }
300         catch ( final IOException ex )
301         {
302             log.error( ex );
303             eventQueue.destroy();
304         }
305     }
306 
307     /** Adds a dispose request to the lateral cache. */
308     @Override
309     public void dispose()
310     {
311         try
312         {
313             eventQueue.addDisposeEvent();
314         }
315         catch ( final IOException ex )
316         {
317             log.error( ex );
318             eventQueue.destroy();
319         }
320     }
321 
322     /**
323      * No lateral invocation.
324      * <p>
325      * @return The size value
326      */
327     @Override
328     public int getSize()
329     {
330         return cache.getSize();
331     }
332 
333     /**
334      * No lateral invocation.
335      * <p>
336      * @return The cacheType value
337      */
338     @Override
339     public CacheType getCacheType()
340     {
341         return cache.getCacheType();
342     }
343 
344     /**
345      * Returns the async cache status. An error status indicates either the lateral connection is not
346      * available, or the asyn queue has been unexpectedly destroyed. No lateral invocation.
347      * <p>
348      * @return The status value
349      */
350     @Override
351     public CacheStatus getStatus()
352     {
353         return eventQueue.isWorking() ? cache.getStatus() : CacheStatus.ERROR;
354     }
355 
356     /**
357      * Gets the cacheName attribute of the LateralCacheNoWait object
358      * <p>
359      * @return The cacheName value
360      */
361     @Override
362     public String getCacheName()
363     {
364         return cache.getCacheName();
365     }
366 
367     /**
368      * Replaces the lateral cache service handle with the given handle and reset the queue by
369      * starting up a new instance.
370      * <p>
371      * @param lateral
372      */
373     public void fixCache( final ICacheServiceNonLocal<K, V> lateral )
374     {
375         cache.fixCache( lateral );
376         resetEventQ();
377     }
378 
379     /**
380      * Resets the event q by first destroying the existing one and starting up new one.
381      */
382     public void resetEventQ()
383     {
384         if ( eventQueue.isWorking() )
385         {
386             eventQueue.destroy();
387         }
388         final CacheEventQueueFactory<K, V> fact = new CacheEventQueueFactory<>();
389         this.eventQueue = fact.createCacheEventQueue( new CacheAdaptor<>( cache ),
390                 CacheInfo.listenerId, cache.getCacheName(),
391                 getAuxiliaryCacheAttributes().getEventQueuePoolName(),
392                 getAuxiliaryCacheAttributes().getEventQueueType() );
393     }
394 
395     /**
396      * @return Returns the AuxiliaryCacheAttributes.
397      */
398     @Override
399     public ILateralCacheAttributes getAuxiliaryCacheAttributes()
400     {
401         return cache.getAuxiliaryCacheAttributes();
402     }
403 
404     /**
405      * getStats
406      * @return String
407      */
408     @Override
409     public String getStats()
410     {
411         return getStatistics().toString();
412     }
413 
414     /**
415      * this won't be called since we don't do ICache logging here.
416      * <p>
417      * @return String
418      */
419     @Override
420     public String getEventLoggingExtraInfo()
421     {
422         return "Lateral Cache No Wait";
423     }
424 
425     /**
426      * @return statistics about this communication
427      */
428     @Override
429     public IStats getStatistics()
430     {
431         final IStats stats = new Stats();
432         stats.setTypeName( "Lateral Cache No Wait" );
433 
434         // get the stats from the event queue too
435         final IStats eqStats = this.eventQueue.getStatistics();
436         final ArrayList<IStatElement<?>> elems = new ArrayList<>(eqStats.getStatElements());
437 
438         elems.add(new StatElement<>( "Get Count", Integer.valueOf(this.getCount) ) );
439         elems.add(new StatElement<>( "Remove Count", Integer.valueOf(this.removeCount) ) );
440         elems.add(new StatElement<>( "Put Count", Integer.valueOf(this.putCount) ) );
441         elems.add(new StatElement<>( "Attributes", cache.getAuxiliaryCacheAttributes() ) );
442 
443         stats.setStatElements( elems );
444 
445         return stats;
446     }
447 
448     /**
449      * @return debugging info.
450      */
451     @Override
452     public String toString()
453     {
454         final StringBuilder buf = new StringBuilder();
455         buf.append( " LateralCacheNoWait " );
456         buf.append( " Status = " + this.getStatus() );
457         buf.append( " cache = [" + cache.toString() + "]" );
458         return buf.toString();
459     }
460 }