View Javadoc

1   package org.apache.jcs.auxiliary.lateral.socket.tcp;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.IOException;
23  import java.io.Serializable;
24  import java.util.Collections;
25  import java.util.HashMap;
26  import java.util.Map;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.jcs.auxiliary.lateral.LateralCache;
31  import org.apache.jcs.auxiliary.lateral.LateralCacheAbstractManager;
32  import org.apache.jcs.auxiliary.lateral.LateralCacheAttributes;
33  import org.apache.jcs.auxiliary.lateral.LateralCacheMonitor;
34  import org.apache.jcs.auxiliary.lateral.LateralCacheNoWait;
35  import org.apache.jcs.auxiliary.lateral.LateralCacheWatchRepairable;
36  import org.apache.jcs.auxiliary.lateral.ZombieLateralCacheWatch;
37  import org.apache.jcs.auxiliary.lateral.behavior.ILateralCacheListener;
38  import org.apache.jcs.auxiliary.lateral.behavior.ILateralCacheManager;
39  import org.apache.jcs.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
40  import org.apache.jcs.engine.ZombieCacheServiceNonLocal;
41  import org.apache.jcs.engine.behavior.ICacheServiceNonLocal;
42  import org.apache.jcs.engine.behavior.ICompositeCacheManager;
43  import org.apache.jcs.engine.behavior.IElementSerializer;
44  import org.apache.jcs.engine.logging.behavior.ICacheEventLogger;
45  
46  /**
47   * Creates lateral caches. Lateral caches are primarily used for removing non laterally configured
48   * caches. Non laterally configured cache regions should still be able to participate in removal.
49   * But if there is a non laterally configured cache hub, then lateral removals may be necessary. For
50   * flat webserver production environments, without a strong machine at the app server level,
51   * distribution and search may need to occur at the lateral cache level. This is currently not
52   * implemented in the lateral cache.
53   * <p>
54   * TODO: - need freeCache, release, getStats - need to find an interface acceptable for all - cache
55   * managers or a manager within a type
56   */
57  public class LateralTCPCacheManager
58      extends LateralCacheAbstractManager
59  {
60      /** Don't change. */
61      private static final long serialVersionUID = -9213011856644392480L;
62  
63      /** The logger. */
64      private final static Log log = LogFactory.getLog( LateralTCPCacheManager.class );
65  
66      /** The monitor */
67      private static LateralCacheMonitor monitor;
68  
69      /** Address to instance map. */
70      protected static Map<String, LateralTCPCacheManager> instances =
71          Collections.synchronizedMap( new HashMap<String, LateralTCPCacheManager>() );
72  
73      /** ITCPLateralCacheAttributes */
74      protected ITCPLateralCacheAttributes lateralCacheAttributes;
75  
76      /** number of clients, we can remove this. */
77      private int clients;
78  
79      /**
80       * Handle to the lateral cache service; or a zombie handle if failed to connect.
81       */
82      private ICacheServiceNonLocal<? extends Serializable, ? extends Serializable> lateralService;
83  
84      /**
85       * Wrapper of the lateral cache watch service; or wrapper of a zombie service if failed to
86       * connect.
87       */
88      private LateralCacheWatchRepairable lateralWatch;
89  
90      /** This is set in the constructor. */
91      private final ICompositeCacheManager cacheMgr;
92  
93      /**
94       * Returns an instance of the LateralCacheManager.
95       * <p>
96       * @param lca
97       * @param cacheMgr this allows the auxiliary to be passed a cache manager.
98       * @param cacheEventLogger
99       * @param elementSerializer
100      * @return LateralTCPCacheManager
101      */
102     public static LateralTCPCacheManager getInstance( ITCPLateralCacheAttributes lca, ICompositeCacheManager cacheMgr,
103                                                       ICacheEventLogger cacheEventLogger,
104                                                       IElementSerializer elementSerializer )
105     {
106         synchronized ( instances )
107         {
108             String key = lca.getTcpServer();
109             LateralTCPCacheManager ins = instances.get( key );
110             if ( ins == null )
111             {
112                 log.info( "Instance for [" + key + "] is null, creating" );
113 
114                 ins = instances.get( lca.getTcpServer() );
115                 if ( ins == null )
116                 {
117                     ins = new LateralTCPCacheManager( lca, cacheMgr, cacheEventLogger, elementSerializer );
118                     instances.put( key, ins );
119                 }
120 
121                 createMonitor( ins );
122             }
123             ins.clients++;
124 
125             return ins;
126         }
127     }
128 
129     /**
130      * The monitor needs reference to one instance, actually just a type.
131      * <p>
132      * TODO refactor this.
133      * <p>
134      * @param instance
135      */
136     private static synchronized void createMonitor( ILateralCacheManager instance )
137     {
138         // only want one monitor per lateral type
139         // Fires up the monitoring daemon.
140         if ( monitor == null )
141         {
142             monitor = new LateralCacheMonitor( instance );
143             Thread t = new Thread( monitor );
144             t.setDaemon( true );
145             t.start();
146         }
147     }
148 
149     /**
150      * Constructor for the LateralCacheManager object.
151      * <p>
152      * @param lcaA
153      * @param cacheMgr
154      * @param cacheEventLogger
155      * @param elementSerializer
156      */
157     private LateralTCPCacheManager( ITCPLateralCacheAttributes lcaA, ICompositeCacheManager cacheMgr,
158                                     ICacheEventLogger cacheEventLogger, IElementSerializer elementSerializer )
159     {
160         this.lateralCacheAttributes = lcaA;
161         this.cacheMgr = cacheMgr;
162         this.cacheEventLogger = cacheEventLogger;
163         this.elementSerializer = elementSerializer;
164 
165         this.lateralWatch = new LateralCacheWatchRepairable();
166         this.lateralWatch.setCacheWatch( new ZombieLateralCacheWatch() );
167 
168         if ( log.isDebugEnabled() )
169         {
170             log.debug( "Creating lateral cache service, lca = " + this.lateralCacheAttributes );
171         }
172 
173         // Create the service
174         try
175         {
176             if ( log.isInfoEnabled() )
177             {
178                 log.info( "Creating TCP service, lca = " + this.lateralCacheAttributes );
179             }
180 
181             this.lateralService = new LateralTCPService<Serializable, Serializable>( this.lateralCacheAttributes );
182         }
183         catch ( Exception ex )
184         {
185             // Failed to connect to the lateral server.
186             // Configure this LateralCacheManager instance to use the
187             // "zombie" services.
188             log.error( "Failure, lateral instance will use zombie service", ex );
189 
190             this.lateralService = new ZombieCacheServiceNonLocal<Serializable, Serializable>( lateralCacheAttributes.getZombieQueueMaxSize() );
191 
192             // Notify the cache monitor about the error, and kick off
193             // the recovery process.
194             createMonitor( this );
195             monitor.notifyError();
196         }
197     }
198 
199     /**
200      * Adds the lateral cache listener to the underlying cache-watch service.
201      * <p>
202      * @param cacheName The feature to be added to the LateralCacheListener attribute
203      * @param listener The feature to be added to the LateralCacheListener attribute
204      * @exception IOException
205      */
206     @Override
207     public <K extends Serializable, V extends Serializable> void addLateralCacheListener( String cacheName, ILateralCacheListener<K, V> listener )
208         throws IOException
209     {
210         synchronized ( this.caches )
211         {
212             lateralWatch.addCacheListener( cacheName, listener );
213         }
214     }
215 
216     /**
217      * Called to access a precreated region or construct one with defaults. Since all aux cache
218      * access goes through the manager, this will never be called.
219      * <p>
220      * After getting the manager instance for a server, the factory gets a cache for the region name
221      * it is constructing.
222      * <p>
223      * There should be one manager per server and one cache per region per manager.
224      * <p>
225      * @return AuxiliaryCache
226      * @param cacheName
227      */
228     @SuppressWarnings("unchecked")
229     @Override
230     public <K extends Serializable, V extends Serializable> LateralCacheNoWait<K, V> getCache( String cacheName )
231     {
232         LateralCacheNoWait<K, V> lateralNoWait = null;
233         synchronized ( caches )
234         {
235             // Need to cast because of common map for all caches
236             lateralNoWait = (LateralCacheNoWait<K, V>) caches.get( cacheName );
237             if ( lateralNoWait == null )
238             {
239                 LateralCacheAttributes attr = (LateralCacheAttributes) lateralCacheAttributes.copy();
240                 attr.setCacheName( cacheName );
241 
242                 // Need to cast to specific type
243                 LateralCache<K, V> cache = new LateralCache<K, V>( attr,
244                         (ICacheServiceNonLocal<K, V>)this.lateralService, monitor );
245                 cache.setCacheEventLogger( cacheEventLogger );
246                 cache.setElementSerializer( elementSerializer );
247 
248                 if ( log.isDebugEnabled() )
249                 {
250                     log.debug( "Created cache for noWait, cache [" + cache + "]" );
251                 }
252 
253                 lateralNoWait = new LateralCacheNoWait<K, V>( cache );
254                 lateralNoWait.setCacheEventLogger( cacheEventLogger );
255                 lateralNoWait.setElementSerializer( elementSerializer );
256 
257                 caches.put( cacheName, lateralNoWait );
258 
259                 if ( log.isInfoEnabled() )
260                 {
261                     log.info( "Created LateralCacheNoWait for [" + lateralCacheAttributes + "] LateralCacheNoWait = [" + lateralNoWait
262                         + "]" );
263                 }
264 
265                 // this used to be called every getCache. i move it in here.
266                 addListenerIfNeeded( cacheName );
267             }
268         }
269 
270         return lateralNoWait;
271     }
272 
273     /**
274      * TODO see if this belongs here or in the factory.
275      * <p>
276      * @param cacheName
277      */
278     private void addListenerIfNeeded( String cacheName )
279     {
280         // don't create a listener if we are not receiving.
281         if ( lateralCacheAttributes.isReceive() )
282         {
283             try
284             {
285                 addLateralCacheListener( cacheName, LateralTCPListener.getInstance( lateralCacheAttributes, cacheMgr ) );
286             }
287             catch ( IOException ioe )
288             {
289                 log.error( "Problem creating lateral listener", ioe );
290             }
291             catch ( Exception e )
292             {
293                 log.error( "Problem creating lateral listener", e );
294             }
295         }
296         else
297         {
298             if ( log.isDebugEnabled() )
299             {
300                 log.debug( "Not creating a listener since we are not receiving." );
301             }
302         }
303     }
304 
305     /**
306      * @return Map
307      */
308     public Map<String, ? extends ILateralCacheManager> getInstances()
309     {
310         return instances;
311     }
312 
313     /**
314      * @return a new service
315      * @throws IOException
316      */
317     public Object fixService()
318         throws IOException
319     {
320         Object service = null;
321         try
322         {
323             service = new LateralTCPService<Serializable, Serializable>( lateralCacheAttributes );
324         }
325         catch ( Exception ex )
326         {
327             log.error( "Can't fix " + ex.getMessage() );
328             throw new IOException( "Can't fix " + ex.getMessage() );
329         }
330         return service;
331     }
332 
333     /**
334      * Shuts down the lateral sender. This does not shutdown the listener. This can be called if the
335      * end point is taken out of service.
336      */
337     public void shutdown()
338     {
339         // TODO revisit this.
340         // the name here doesn't matter.
341         try
342         {
343             lateralService.dispose( "ALL" );
344         }
345         catch ( IOException e )
346         {
347             log.error( "Problem disposing of service", e );
348         }
349 
350         // shut down monitor
351         if (monitor != null)
352         {
353             monitor.notifyShutdown();
354         }
355     }
356 }