View Javadoc
1   package org.apache.commons.jcs.auxiliary.lateral;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.IOException;
23  import java.util.Collections;
24  import java.util.HashMap;
25  import java.util.Map;
26  import java.util.Set;
27  
28  import org.apache.commons.jcs.auxiliary.AbstractAuxiliaryCacheEventLogging;
29  import org.apache.commons.jcs.auxiliary.AuxiliaryCacheAttributes;
30  import org.apache.commons.jcs.auxiliary.lateral.behavior.ILateralCacheAttributes;
31  import org.apache.commons.jcs.engine.CacheInfo;
32  import org.apache.commons.jcs.engine.CacheStatus;
33  import org.apache.commons.jcs.engine.ZombieCacheServiceNonLocal;
34  import org.apache.commons.jcs.engine.behavior.ICacheElement;
35  import org.apache.commons.jcs.engine.behavior.ICacheServiceNonLocal;
36  import org.apache.commons.jcs.engine.behavior.IZombie;
37  import org.apache.commons.jcs.engine.stats.Stats;
38  import org.apache.commons.jcs.engine.stats.behavior.IStats;
39  import org.apache.commons.logging.Log;
40  import org.apache.commons.logging.LogFactory;
41  
42  /**
43   * Lateral distributor. Returns null on get by default. Net search not implemented.
44   */
45  public class LateralCache<K, V>
46      extends AbstractAuxiliaryCacheEventLogging<K, V>
47  {
48      /** The logger. */
49      private static final Log log = LogFactory.getLog( LateralCache.class );
50  
51      /** generalize this, use another interface */
52      private final ILateralCacheAttributes lateralCacheAttributes;
53  
54      /** The region name */
55      final String cacheName;
56  
57      /** either http, socket.udp, or socket.tcp can set in config */
58      private ICacheServiceNonLocal<K, V> lateralCacheService;
59  
60      /** Monitors the connection. */
61      private LateralCacheMonitor monitor;
62  
63      /**
64       * Constructor for the LateralCache object
65       * <p>
66       * @param cattr
67       * @param lateral
68       * @param monitor
69       */
70      public LateralCache( ILateralCacheAttributes cattr, ICacheServiceNonLocal<K, V> lateral, LateralCacheMonitor monitor )
71      {
72          this.cacheName = cattr.getCacheName();
73          this.lateralCacheAttributes = cattr;
74          this.lateralCacheService = lateral;
75          this.monitor = monitor;
76      }
77  
78      /**
79       * Constructor for the LateralCache object
80       * <p>
81       * @param cattr
82       */
83      public LateralCache( ILateralCacheAttributes cattr )
84      {
85          this.cacheName = cattr.getCacheName();
86          this.lateralCacheAttributes = cattr;
87      }
88  
89      /**
90       * Update lateral.
91       * <p>
92       * @param ce
93       * @throws IOException
94       */
95      @Override
96      protected void processUpdate( ICacheElement<K, V> ce )
97          throws IOException
98      {
99          try
100         {
101             if (ce != null)
102             {
103                 if ( log.isDebugEnabled() )
104                 {
105                     log.debug( "update: lateral = [" + lateralCacheService + "], " + "CacheInfo.listenerId = "
106                         + CacheInfo.listenerId );
107                 }
108                 lateralCacheService.update( ce, CacheInfo.listenerId );
109             }
110         }
111         catch ( IOException ex )
112         {
113             handleException( ex, "Failed to put [" + ce.getKey() + "] to " + ce.getCacheName() + "@" + lateralCacheAttributes );
114         }
115     }
116 
117     /**
118      * The performance costs are too great. It is not recommended that you enable lateral gets.
119      * <p>
120      * @param key
121      * @return ICacheElement&lt;K, V&gt; or null
122      * @throws IOException
123      */
124     @Override
125     protected ICacheElement<K, V> processGet( K key )
126         throws IOException
127     {
128         ICacheElement<K, V> obj = null;
129 
130         if ( this.lateralCacheAttributes.getPutOnlyMode() )
131         {
132             return null;
133         }
134         try
135         {
136             obj = lateralCacheService.get( cacheName, key );
137         }
138         catch ( Exception e )
139         {
140             log.error( e );
141             handleException( e, "Failed to get [" + key + "] from " + lateralCacheAttributes.getCacheName() + "@" + lateralCacheAttributes );
142         }
143         return obj;
144     }
145 
146     /**
147      * @param pattern
148      * @return A map of K key to ICacheElement&lt;K, V&gt; element, or an empty map if there is no
149      *         data in cache for any of these keys
150      * @throws IOException
151      */
152     @Override
153     protected Map<K, ICacheElement<K, V>> processGetMatching( String pattern )
154         throws IOException
155     {
156         if ( this.lateralCacheAttributes.getPutOnlyMode() )
157         {
158             return Collections.emptyMap();
159         }
160         try
161         {
162             return lateralCacheService.getMatching( cacheName, pattern );
163         }
164         catch ( IOException e )
165         {
166             log.error( e );
167             handleException( e, "Failed to getMatching [" + pattern + "] from " + lateralCacheAttributes.getCacheName() + "@" + lateralCacheAttributes );
168             return Collections.emptyMap();
169         }
170     }
171 
172     /**
173      * Gets multiple items from the cache based on the given set of keys.
174      * <p>
175      * @param keys
176      * @return a map of K key to ICacheElement&lt;K, V&gt; element, or an empty map if there is no
177      *         data in cache for any of these keys
178      * @throws IOException
179      */
180     @Override
181     protected Map<K, ICacheElement<K, V>> processGetMultiple( Set<K> keys )
182         throws IOException
183     {
184         Map<K, ICacheElement<K, V>> elements = new HashMap<K, ICacheElement<K, V>>();
185 
186         if ( keys != null && !keys.isEmpty() )
187         {
188             for (K key : keys)
189             {
190                 ICacheElement<K, V> element = get( key );
191 
192                 if ( element != null )
193                 {
194                     elements.put( key, element );
195                 }
196             }
197         }
198 
199         return elements;
200     }
201 
202     /**
203      * Return the keys in this cache.
204      * <p>
205      * @see org.apache.commons.jcs.auxiliary.AuxiliaryCache#getKeySet()
206      */
207     @Override
208     public Set<K> getKeySet() throws IOException
209     {
210         try
211         {
212             return lateralCacheService.getKeySet( cacheName );
213         }
214         catch ( IOException ex )
215         {
216             handleException( ex, "Failed to get key set from " + lateralCacheAttributes.getCacheName() + "@"
217                 + lateralCacheAttributes );
218         }
219         return Collections.emptySet();
220     }
221 
222     /**
223      * Synchronously remove from the remote cache; if failed, replace the remote handle with a
224      * zombie.
225      * <p>
226      * @param key
227      * @return false always
228      * @throws IOException
229      */
230     @Override
231     protected boolean processRemove( K key )
232         throws IOException
233     {
234         if ( log.isDebugEnabled() )
235         {
236             log.debug( "removing key:" + key );
237         }
238 
239         try
240         {
241             lateralCacheService.remove( cacheName, key, CacheInfo.listenerId );
242         }
243         catch ( IOException ex )
244         {
245             handleException( ex, "Failed to remove " + key + " from " + lateralCacheAttributes.getCacheName() + "@" + lateralCacheAttributes );
246         }
247         return false;
248     }
249 
250     /**
251      * Synchronously removeAll from the remote cache; if failed, replace the remote handle with a
252      * zombie.
253      * <p>
254      * @throws IOException
255      */
256     @Override
257     protected void processRemoveAll()
258         throws IOException
259     {
260         try
261         {
262             lateralCacheService.removeAll( cacheName, CacheInfo.listenerId );
263         }
264         catch ( IOException ex )
265         {
266             handleException( ex, "Failed to remove all from " + lateralCacheAttributes.getCacheName() + "@" + lateralCacheAttributes );
267         }
268     }
269 
270     /**
271      * Synchronously dispose the cache. Not sure we want this.
272      * <p>
273      * @throws IOException
274      */
275     @Override
276     protected void processDispose()
277         throws IOException
278     {
279         log.debug( "Disposing of lateral cache" );
280 
281         ///* HELP: This section did nothing but generate compilation warnings.
282         // TODO: may limit this functionality. It is dangerous.
283         // asmuts -- Added functionality to help with warnings. I'm not getting
284         // any.
285         try
286         {
287             lateralCacheService.dispose( this.lateralCacheAttributes.getCacheName() );
288             // Should remove connection
289         }
290         catch ( IOException ex )
291         {
292             log.error( "Couldn't dispose", ex );
293             handleException( ex, "Failed to dispose " + lateralCacheAttributes.getCacheName() );
294         }
295     }
296 
297     /**
298      * Returns the cache status.
299      * <p>
300      * @return The status value
301      */
302     @Override
303     public CacheStatus getStatus()
304     {
305         return this.lateralCacheService instanceof IZombie ? CacheStatus.ERROR : CacheStatus.ALIVE;
306     }
307 
308     /**
309      * Returns the current cache size.
310      * <p>
311      * @return The size value
312      */
313     @Override
314     public int getSize()
315     {
316         return 0;
317     }
318 
319     /**
320      * Gets the cacheType attribute of the LateralCache object
321      * <p>
322      * @return The cacheType value
323      */
324     @Override
325     public CacheType getCacheType()
326     {
327         return CacheType.LATERAL_CACHE;
328     }
329 
330     /**
331      * Gets the cacheName attribute of the LateralCache object
332      * <p>
333      * @return The cacheName value
334      */
335     @Override
336     public String getCacheName()
337     {
338         return cacheName;
339     }
340 
341     /**
342      * Not yet sure what to do here.
343      * <p>
344      * @param ex
345      * @param msg
346      * @throws IOException
347      */
348     private void handleException( Exception ex, String msg )
349         throws IOException
350     {
351         log.error( "Disabling lateral cache due to error " + msg, ex );
352 
353         lateralCacheService = new ZombieCacheServiceNonLocal<K, V>( lateralCacheAttributes.getZombieQueueMaxSize() );
354         // may want to flush if region specifies
355         // Notify the cache monitor about the error, and kick off the recovery
356         // process.
357         monitor.notifyError();
358 
359         // could stop the net search if it is built and try to reconnect?
360         if ( ex instanceof IOException )
361         {
362             throw (IOException) ex;
363         }
364         throw new IOException( ex.getMessage() );
365     }
366 
367     /**
368      * Replaces the current remote cache service handle with the given handle.
369      * <p>
370      * @param restoredLateral
371      */
372     public void fixCache( ICacheServiceNonLocal<K, V> restoredLateral )
373     {
374         if ( this.lateralCacheService != null && this.lateralCacheService instanceof ZombieCacheServiceNonLocal )
375         {
376             ZombieCacheServiceNonLocal<K, V> zombie = (ZombieCacheServiceNonLocal<K, V>) this.lateralCacheService;
377             this.lateralCacheService = restoredLateral;
378             try
379             {
380                 zombie.propagateEvents( restoredLateral );
381             }
382             catch ( Exception e )
383             {
384                 try
385                 {
386                     handleException( e, "Problem propagating events from Zombie Queue to new Lateral Service." );
387                 }
388                 catch ( IOException e1 )
389                 {
390                     // swallow, since this is just expected kick back.  Handle always throws
391                 }
392             }
393         }
394         else
395         {
396             this.lateralCacheService = restoredLateral;
397         }
398     }
399 
400     /**
401      * getStats
402      * <p>
403      * @return String
404      */
405     @Override
406     public String getStats()
407     {
408         return "";
409     }
410 
411     /**
412      * @return Returns the AuxiliaryCacheAttributes.
413      */
414     @Override
415     public AuxiliaryCacheAttributes getAuxiliaryCacheAttributes()
416     {
417         return lateralCacheAttributes;
418     }
419 
420     /**
421      * @return debugging data.
422      */
423     @Override
424     public String toString()
425     {
426         StringBuilder buf = new StringBuilder();
427         buf.append( "\n LateralCache " );
428         buf.append( "\n Cache Name [" + lateralCacheAttributes.getCacheName() + "]" );
429         buf.append( "\n cattr =  [" + lateralCacheAttributes + "]" );
430         return buf.toString();
431     }
432 
433     /**
434      * @return extra data.
435      */
436     @Override
437     public String getEventLoggingExtraInfo()
438     {
439         return null;
440     }
441 
442     /**
443      * The NoWait on top does not call out to here yet.
444      * <p>
445      * @return almost nothing
446      */
447     @Override
448     public IStats getStatistics()
449     {
450         IStats stats = new Stats();
451         stats.setTypeName( "LateralCache" );
452         return stats;
453     }
454 }