View Javadoc

1   package org.apache.jcs.auxiliary.lateral;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.IOException;
23  import java.io.Serializable;
24  import java.util.Collections;
25  import java.util.HashMap;
26  import java.util.Map;
27  import java.util.Set;
28  
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.apache.jcs.auxiliary.AbstractAuxiliaryCacheEventLogging;
32  import org.apache.jcs.auxiliary.AuxiliaryCacheAttributes;
33  import org.apache.jcs.auxiliary.lateral.behavior.ILateralCacheAttributes;
34  import org.apache.jcs.auxiliary.lateral.behavior.ILateralCacheService;
35  import org.apache.jcs.engine.CacheConstants;
36  import org.apache.jcs.engine.behavior.ICacheElement;
37  import org.apache.jcs.engine.behavior.ICacheType;
38  import org.apache.jcs.engine.behavior.IZombie;
39  import org.apache.jcs.engine.stats.Stats;
40  import org.apache.jcs.engine.stats.behavior.IStats;
41  
42  /**
43   * Lateral distributor. Returns null on get by default. Net search not implemented.
44   */
45  public class LateralCache
46      extends AbstractAuxiliaryCacheEventLogging
47  {
48      /** Don't change. */
49      private static final long serialVersionUID = 6274549256562382782L;
50  
51      /** The logger. */
52      private final static Log log = LogFactory.getLog( LateralCache.class );
53  
54      /** generalize this, use another interface */
55      private final ILateralCacheAttributes lateralCacheAttribures;
56  
57      /** The region name */
58      final String cacheName;
59  
60      /** either http, socket.udp, or socket.tcp can set in config */
61      private ILateralCacheService lateralCacheService;
62  
63      /** Monitors the connection. */
64      private LateralCacheMonitor monitor;
65  
66      /**
67       * Constructor for the LateralCache object
68       * <p>
69       * @param cattr
70       * @param lateral
71       * @param monitor
72       */
73      public LateralCache( ILateralCacheAttributes cattr, ILateralCacheService lateral, LateralCacheMonitor monitor )
74      {
75          this.cacheName = cattr.getCacheName();
76          this.lateralCacheAttribures = cattr;
77          this.lateralCacheService = lateral;
78          this.monitor = monitor;
79      }
80  
81      /**
82       * Constructor for the LateralCache object
83       * <p>
84       * @param cattr
85       */
86      public LateralCache( ILateralCacheAttributes cattr )
87      {
88          this.cacheName = cattr.getCacheName();
89          this.lateralCacheAttribures = cattr;
90      }
91  
92      /**
93       * Update lateral.
94       * <p>
95       * @param ce
96       * @throws IOException
97       */
98      @Override
99      protected void processUpdate( ICacheElement ce )
100         throws IOException
101     {
102         try
103         {
104             if ( log.isDebugEnabled() )
105             {
106                 log.debug( "update: lateral = [" + lateralCacheService + "], " + "LateralCacheInfo.listenerId = "
107                     + LateralCacheInfo.listenerId );
108             }
109             lateralCacheService.update( ce, LateralCacheInfo.listenerId );
110         }
111         catch ( NullPointerException npe )
112         {
113             log.error( "Failure updating lateral. lateral = " + lateralCacheService, npe );
114             handleException( npe, "Failed to put [" + ce.getKey() + "] to " + ce.getCacheName() + "@" + lateralCacheAttribures );
115             return;
116         }
117         catch ( Exception ex )
118         {
119             handleException( ex, "Failed to put [" + ce.getKey() + "] to " + ce.getCacheName() + "@" + lateralCacheAttribures );
120         }
121     }
122 
123     /**
124      * The performance costs are too great. It is not recommended that you enable lateral gets.
125      * <p>
126      * @param key
127      * @return ICacheElement or null
128      * @throws IOException
129      */
130     @Override
131     protected ICacheElement processGet( Serializable key )
132         throws IOException
133     {
134         ICacheElement obj = null;
135 
136         if ( this.lateralCacheAttribures.getPutOnlyMode() )
137         {
138             return null;
139         }
140         try
141         {
142             obj = lateralCacheService.get( cacheName, key );
143         }
144         catch ( Exception e )
145         {
146             log.error( e );
147             handleException( e, "Failed to get [" + key + "] from " + lateralCacheAttribures.getCacheName() + "@" + lateralCacheAttribures );
148         }
149         return obj;
150     }
151 
152     /**
153      * @param pattern
154      * @return A map of Serializable key to ICacheElement element, or an empty map if there is no
155      *         data in cache for any of these keys
156      * @throws IOException
157      */
158     @Override
159     protected Map<Serializable, ICacheElement> processGetMatching( String pattern )
160         throws IOException
161     {
162         if ( this.lateralCacheAttribures.getPutOnlyMode() )
163         {
164             return Collections.<Serializable, ICacheElement>emptyMap();
165         }
166         try
167         {
168             return lateralCacheService.getMatching( cacheName, pattern );
169         }
170         catch ( IOException e )
171         {
172             log.error( e );
173             handleException( e, "Failed to getMatching [" + pattern + "] from " + lateralCacheAttribures.getCacheName() + "@" + lateralCacheAttribures );
174             return Collections.<Serializable, ICacheElement>emptyMap();
175         }
176     }
177 
178     /**
179      * Gets multiple items from the cache based on the given set of keys.
180      * <p>
181      * @param keys
182      * @return a map of Serializable key to ICacheElement element, or an empty map if there is no
183      *         data in cache for any of these keys
184      * @throws IOException
185      */
186     @Override
187     protected Map<Serializable, ICacheElement> processGetMultiple( Set<Serializable> keys )
188         throws IOException
189     {
190         Map<Serializable, ICacheElement> elements = new HashMap<Serializable, ICacheElement>();
191 
192         if ( keys != null && !keys.isEmpty() )
193         {
194             for (Serializable key : keys)
195             {
196                 ICacheElement element = get( key );
197 
198                 if ( element != null )
199                 {
200                     elements.put( key, element );
201                 }
202             }
203         }
204 
205         return elements;
206     }
207 
208     /**
209      * @param groupName
210      * @return A set of group keys.
211      * @throws IOException
212      */
213     public Set<Serializable> getGroupKeys( String groupName )
214         throws IOException
215     {
216         try
217         {
218             return lateralCacheService.getGroupKeys( cacheName, groupName );
219         }
220         catch ( Exception ex )
221         {
222             handleException( ex, "Failed to remove groupName [" + groupName + "] from " + lateralCacheAttribures.getCacheName() + "@"
223                 + lateralCacheAttribures );
224         }
225         return Collections.emptySet();
226     }
227 
228     /**
229      * Synchronously remove from the remote cache; if failed, replace the remote handle with a
230      * zombie.
231      * <p>
232      * @param key
233      * @return false always
234      * @throws IOException
235      */
236     @Override
237     protected boolean processRemove( Serializable key )
238         throws IOException
239     {
240         if ( log.isDebugEnabled() )
241         {
242             log.debug( "removing key:" + key );
243         }
244 
245         try
246         {
247             lateralCacheService.remove( cacheName, key, LateralCacheInfo.listenerId );
248         }
249         catch ( Exception ex )
250         {
251             handleException( ex, "Failed to remove " + key + " from " + lateralCacheAttribures.getCacheName() + "@" + lateralCacheAttribures );
252         }
253         return false;
254     }
255 
256     /**
257      * Synchronously removeAll from the remote cache; if failed, replace the remote handle with a
258      * zombie.
259      * <p>
260      * @throws IOException
261      */
262     @Override
263     protected void processRemoveAll()
264         throws IOException
265     {
266         try
267         {
268             lateralCacheService.removeAll( cacheName, LateralCacheInfo.listenerId );
269         }
270         catch ( Exception ex )
271         {
272             handleException( ex, "Failed to remove all from " + lateralCacheAttribures.getCacheName() + "@" + lateralCacheAttribures );
273         }
274     }
275 
276     /**
277      * Synchronously dispose the cache. Not sure we want this.
278      * <p>
279      * @throws IOException
280      */
281     @Override
282     protected void processDispose()
283         throws IOException
284     {
285         log.debug( "Disposing of lateral cache" );
286 
287         ///* HELP: This section did nothing but generate compilation warnings.
288         // TODO: may limit this functionality. It is dangerous.
289         // asmuts -- Added functionality to help with warnings. I'm not getting
290         // any.
291         try
292         {
293             lateralCacheService.dispose( this.lateralCacheAttribures.getCacheName() );
294             // Should remove connection
295         }
296         catch ( Exception ex )
297         {
298             log.error( "Couldn't dispose", ex );
299             handleException( ex, "Failed to dispose " + lateralCacheAttribures.getCacheName() );
300         }
301     }
302 
303     /**
304      * Returns the cache status.
305      * <p>
306      * @return The status value
307      */
308     public int getStatus()
309     {
310         return this.lateralCacheService instanceof IZombie ? CacheConstants.STATUS_ERROR : CacheConstants.STATUS_ALIVE;
311     }
312 
313     /**
314      * Returns the current cache size.
315      * <p>
316      * @return The size value
317      */
318     public int getSize()
319     {
320         return 0;
321     }
322 
323     /**
324      * Gets the cacheType attribute of the LateralCache object
325      * <p>
326      * @return The cacheType value
327      */
328     public int getCacheType()
329     {
330         return ICacheType.LATERAL_CACHE;
331     }
332 
333     /**
334      * Gets the cacheName attribute of the LateralCache object
335      * <p>
336      * @return The cacheName value
337      */
338     public String getCacheName()
339     {
340         return cacheName;
341     }
342 
343     /**
344      * Not yet sure what to do here.
345      * <p>
346      * @param ex
347      * @param msg
348      * @throws IOException
349      */
350     private void handleException( Exception ex, String msg )
351         throws IOException
352     {
353         log.error( "Disabling lateral cache due to error " + msg, ex );
354 
355         lateralCacheService = new ZombieLateralCacheService( lateralCacheAttribures.getZombieQueueMaxSize() );
356         // may want to flush if region specifies
357         // Notify the cache monitor about the error, and kick off the recovery
358         // process.
359         monitor.notifyError();
360 
361         // could stop the net serach if it is built and try to reconnect?
362         if ( ex instanceof IOException )
363         {
364             throw (IOException) ex;
365         }
366         throw new IOException( ex.getMessage() );
367     }
368 
369     /**
370      * Replaces the current remote cache service handle with the given handle.
371      * <p>
372      * @param restoredLateral
373      */
374     public void fixCache( ILateralCacheService restoredLateral )
375     {
376         if ( this.lateralCacheService != null && this.lateralCacheService instanceof ZombieLateralCacheService )
377         {
378             ZombieLateralCacheService zombie = (ZombieLateralCacheService) this.lateralCacheService;
379             this.lateralCacheService = restoredLateral;
380             try
381             {
382                 zombie.propagateEvents( restoredLateral );
383             }
384             catch ( Exception e )
385             {
386                 try
387                 {
388                     handleException( e, "Problem propagating events from Zombie Queue to new Lateral Service." );
389                 }
390                 catch ( IOException e1 )
391                 {
392                     // swallow, since this is just expected kick back.  Handle always throws
393                 }
394             }
395         }
396         else
397         {
398             this.lateralCacheService = restoredLateral;
399         }
400     }
401 
402     /**
403      * getStats
404      * <p>
405      * @return String
406      */
407     public String getStats()
408     {
409         return "";
410     }
411 
412     /**
413      * @return Returns the AuxiliaryCacheAttributes.
414      */
415     public AuxiliaryCacheAttributes getAuxiliaryCacheAttributes()
416     {
417         return lateralCacheAttribures;
418     }
419 
420     /**
421      * @return debugging data.
422      */
423     @Override
424     public String toString()
425     {
426         StringBuffer buf = new StringBuffer();
427         buf.append( "\n LateralCache " );
428         buf.append( "\n Cache Name [" + lateralCacheAttribures.getCacheName() + "]" );
429         buf.append( "\n cattr =  [" + lateralCacheAttribures + "]" );
430         return buf.toString();
431     }
432 
433     /**
434      * @return extra data.
435      */
436     @Override
437     public String getEventLoggingExtraInfo()
438     {
439         return null;
440     }
441 
442     /**
443      * The NoWait on top does not call out to here yet.
444      * <p>
445      * @return almost nothing
446      */
447     public IStats getStatistics()
448     {
449         IStats stats = new Stats();
450         stats.setTypeName( "LateralCache" );
451         return stats;
452     }
453 }