View Javadoc
1   package org.apache.commons.jcs.auxiliary.lateral.socket.tcp;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.StringTokenizer;
25  import java.util.concurrent.ConcurrentHashMap;
26  import java.util.concurrent.locks.ReentrantLock;
27  
28  import org.apache.commons.jcs.auxiliary.AbstractAuxiliaryCacheFactory;
29  import org.apache.commons.jcs.auxiliary.AuxiliaryCacheAttributes;
30  import org.apache.commons.jcs.auxiliary.lateral.LateralCache;
31  import org.apache.commons.jcs.auxiliary.lateral.LateralCacheMonitor;
32  import org.apache.commons.jcs.auxiliary.lateral.LateralCacheNoWait;
33  import org.apache.commons.jcs.auxiliary.lateral.LateralCacheNoWaitFacade;
34  import org.apache.commons.jcs.auxiliary.lateral.behavior.ILateralCacheListener;
35  import org.apache.commons.jcs.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
36  import org.apache.commons.jcs.engine.CacheWatchRepairable;
37  import org.apache.commons.jcs.engine.ZombieCacheServiceNonLocal;
38  import org.apache.commons.jcs.engine.ZombieCacheWatch;
39  import org.apache.commons.jcs.engine.behavior.ICache;
40  import org.apache.commons.jcs.engine.behavior.ICacheServiceNonLocal;
41  import org.apache.commons.jcs.engine.behavior.ICompositeCacheManager;
42  import org.apache.commons.jcs.engine.behavior.IElementSerializer;
43  import org.apache.commons.jcs.engine.behavior.IShutdownObserver;
44  import org.apache.commons.jcs.engine.logging.behavior.ICacheEventLogger;
45  import org.apache.commons.jcs.utils.discovery.UDPDiscoveryManager;
46  import org.apache.commons.jcs.utils.discovery.UDPDiscoveryService;
47  import org.apache.commons.logging.Log;
48  import org.apache.commons.logging.LogFactory;
49  
50  /**
51   * Constructs a LateralCacheNoWaitFacade for the given configuration. Each lateral service / local
52   * relationship is managed by one manager. This manager can have multiple caches. The remote
53   * relationships are consolidated and restored via these managers.
54   * <p>
55   * The facade provides a front to the composite cache so the implementation is transparent.
56   */
57  public class LateralTCPCacheFactory
58      extends AbstractAuxiliaryCacheFactory
59  {
60      /** The logger */
61      private static final Log log = LogFactory.getLog( LateralTCPCacheFactory.class );
62  
63      /** Address to service map. */
64      private ConcurrentHashMap<String, ICacheServiceNonLocal<?, ?>> csnlInstances;
65  
66      /** Lock for initialization of address to service map */
67      private ReentrantLock csnlLock;
68  
69      /** Map of available discovery listener instances, keyed by port. */
70      private ConcurrentHashMap<String, LateralTCPDiscoveryListener> lTCPDLInstances;
71  
72      /** Monitor thread */
73      private LateralCacheMonitor monitor;
74  
75      /**
76       * Wrapper of the lateral cache watch service; or wrapper of a zombie
77       * service if failed to connect.
78       */
79      private CacheWatchRepairable lateralWatch;
80  
81      /**
82       * Creates a TCP lateral.
83       * <p>
84       * @param iaca
85       * @param cacheMgr
86       * @param cacheEventLogger
87       * @param elementSerializer
88       * @return LateralCacheNoWaitFacade
89       */
90      @Override
91      public <K, V> LateralCacheNoWaitFacade<K, V> createCache(
92              AuxiliaryCacheAttributes iaca, ICompositeCacheManager cacheMgr,
93             ICacheEventLogger cacheEventLogger, IElementSerializer elementSerializer )
94      {
95          ITCPLateralCacheAttributes lac = (ITCPLateralCacheAttributes) iaca;
96          ArrayList<ICache<K, V>> noWaits = new ArrayList<ICache<K, V>>();
97  
98          // pairs up the tcp servers and set the tcpServer value and
99          // get the manager and then get the cache
100         // no servers are required.
101         if ( lac.getTcpServers() != null )
102         {
103             StringTokenizer it = new StringTokenizer( lac.getTcpServers(), "," );
104             if ( log.isDebugEnabled() )
105             {
106                 log.debug( "Configured for [" + it.countTokens() + "]  servers." );
107             }
108             while ( it.hasMoreElements() )
109             {
110                 String server = (String) it.nextElement();
111                 if ( log.isDebugEnabled() )
112                 {
113                     log.debug( "tcp server = " + server );
114                 }
115                 ITCPLateralCacheAttributes lacC = (ITCPLateralCacheAttributes) lac.clone();
116                 lacC.setTcpServer( server );
117 
118                 LateralCacheNoWait<K, V> lateralNoWait = createCacheNoWait(lacC, cacheEventLogger, elementSerializer);
119 
120                 addListenerIfNeeded( lacC, cacheMgr );
121                 monitor.addCache(lateralNoWait);
122                 noWaits.add( lateralNoWait );
123             }
124         }
125 
126         ILateralCacheListener<K, V> listener = createListener( lac, cacheMgr );
127 
128         // create the no wait facade.
129         @SuppressWarnings("unchecked") // No generic arrays in java
130         LateralCacheNoWait<K, V>[] lcnwArray = noWaits.toArray( new LateralCacheNoWait[0] );
131         LateralCacheNoWaitFacade<K, V> lcnwf =
132             new LateralCacheNoWaitFacade<K, V>(listener, lcnwArray, lac );
133 
134         // create udp discovery if available.
135         createDiscoveryService( lac, lcnwf, cacheMgr, cacheEventLogger, elementSerializer );
136 
137         return lcnwf;
138     }
139 
140     protected <K, V> LateralCacheNoWait<K, V> createCacheNoWait( ITCPLateralCacheAttributes lca,
141             ICacheEventLogger cacheEventLogger, IElementSerializer elementSerializer )
142     {
143         ICacheServiceNonLocal<K, V> lateralService = getCSNLInstance(lca);
144 
145         LateralCache<K, V> cache = new LateralCache<K, V>( lca, lateralService, this.monitor );
146         cache.setCacheEventLogger( cacheEventLogger );
147         cache.setElementSerializer( elementSerializer );
148 
149         if ( log.isDebugEnabled() )
150         {
151             log.debug( "Created cache for noWait, cache [" + cache + "]" );
152         }
153 
154         LateralCacheNoWait<K, V> lateralNoWait = new LateralCacheNoWait<K, V>( cache );
155         lateralNoWait.setCacheEventLogger( cacheEventLogger );
156         lateralNoWait.setElementSerializer( elementSerializer );
157 
158         if ( log.isInfoEnabled() )
159         {
160             log.info( "Created LateralCacheNoWait for [" + lca + "] LateralCacheNoWait = [" + lateralNoWait
161                 + "]" );
162         }
163 
164         return lateralNoWait;
165     }
166 
167     /**
168      * Initialize this factory
169      */
170     @Override
171     public void initialize()
172     {
173         this.csnlInstances = new ConcurrentHashMap<String, ICacheServiceNonLocal<?, ?>>();
174         this.csnlLock = new ReentrantLock();
175         this.lTCPDLInstances = new ConcurrentHashMap<String, LateralTCPDiscoveryListener>();
176 
177         // Create the monitoring daemon thread
178         this.monitor = new LateralCacheMonitor(this);
179         this.monitor.setDaemon( true );
180         this.monitor.start();
181 
182         this.lateralWatch = new CacheWatchRepairable();
183         this.lateralWatch.setCacheWatch( new ZombieCacheWatch() );
184     }
185 
186     /**
187      * Dispose of this factory, clean up shared resources
188      */
189     @Override
190     public void dispose()
191     {
192         for (ICacheServiceNonLocal<?, ?> service : this.csnlInstances.values())
193         {
194             try
195             {
196                 service.dispose("");
197             }
198             catch (IOException e)
199             {
200                 log.error("Could not dispose service " + service, e);
201             }
202         }
203 
204         this.csnlInstances.clear();
205 
206         // TODO: shut down discovery listeners
207         this.lTCPDLInstances.clear();
208 
209         if (this.monitor != null)
210         {
211             this.monitor.notifyShutdown();
212             try
213             {
214                 this.monitor.join(5000);
215             }
216             catch (InterruptedException e)
217             {
218                 // swallow
219             }
220             this.monitor = null;
221         }
222     }
223 
224     /**
225      * Returns an instance of the cache service.
226      * <p>
227      * @param lca configuration for the creation of a new service instance
228      *
229      * @return ICacheServiceNonLocal&lt;K, V&gt;
230      */
231     // Need to cast because of common map for all cache services
232     @SuppressWarnings("unchecked")
233     public <K, V> ICacheServiceNonLocal<K, V> getCSNLInstance( ITCPLateralCacheAttributes lca )
234     {
235         String key = lca.getTcpServer();
236 
237         ICacheServiceNonLocal<K, V> service = (ICacheServiceNonLocal<K, V>)csnlInstances.get( key );
238 
239         if ( service == null || service instanceof ZombieCacheServiceNonLocal )
240         {
241             csnlLock.lock();
242 
243             try
244             {
245                 // double check
246                 service = (ICacheServiceNonLocal<K, V>)csnlInstances.get( key );
247 
248                 // If service creation did not succeed last time, force retry
249                 if ( service instanceof ZombieCacheServiceNonLocal)
250                 {
251                     service = null;
252                     log.info("Disposing of zombie service instance for [" + key + "]");
253                 }
254 
255                 if ( service == null )
256                 {
257                     log.info( "Instance for [" + key + "] is null, creating" );
258 
259                     // Create the service
260                     try
261                     {
262                         if ( log.isInfoEnabled() )
263                         {
264                             log.info( "Creating TCP service, lca = " + lca );
265                         }
266 
267                         service = new LateralTCPService<K, V>( lca );
268                     }
269                     catch ( IOException ex )
270                     {
271                         // Failed to connect to the lateral server.
272                         // Configure this LateralCacheManager instance to use the
273                         // "zombie" services.
274                         log.error( "Failure, lateral instance will use zombie service", ex );
275 
276                         service = new ZombieCacheServiceNonLocal<K, V>( lca.getZombieQueueMaxSize() );
277 
278                         // Notify the cache monitor about the error, and kick off
279                         // the recovery process.
280                         monitor.notifyError();
281                     }
282 
283                     csnlInstances.put( key, service );
284                 }
285             }
286             finally
287             {
288                 csnlLock.unlock();
289             }
290         }
291 
292         return service;
293     }
294 
295     /**
296      * Gets the instance attribute of the LateralCacheTCPListener class.
297      * <p>
298      * @param ilca ITCPLateralCacheAttributes
299      * @param cacheManager a reference to the global cache manager
300      *
301      * @return The instance value
302      */
303     private LateralTCPDiscoveryListener getDiscoveryListener( ITCPLateralCacheAttributes ilca, ICompositeCacheManager cacheManager )
304     {
305         String key = ilca.getUdpDiscoveryAddr() + ":" + ilca.getUdpDiscoveryPort();
306         LateralTCPDiscoveryListener ins = null;
307 
308         LateralTCPDiscoveryListener newListener = new LateralTCPDiscoveryListener( this.getName(),  cacheManager);
309         ins = lTCPDLInstances.putIfAbsent(key, newListener );
310 
311         if ( ins == null )
312         {
313             ins = newListener;
314 
315             if ( log.isInfoEnabled() )
316             {
317                 log.info( "Created new discovery listener for " + key + " cacheName for request " + ilca.getCacheName() );
318             }
319         }
320 
321         return ins;
322     }
323 
324     /**
325      * Add listener for receivers
326      * <p>
327      * @param iaca cache configuration attributes
328      * @param cacheMgr the composite cache manager
329      */
330     private void addListenerIfNeeded( ITCPLateralCacheAttributes iaca, ICompositeCacheManager cacheMgr )
331     {
332         // don't create a listener if we are not receiving.
333         if ( iaca.isReceive() )
334         {
335             try
336             {
337                 addLateralCacheListener( iaca.getCacheName(), LateralTCPListener.getInstance( iaca, cacheMgr ) );
338             }
339             catch ( IOException ioe )
340             {
341                 log.error( "Problem creating lateral listener", ioe );
342             }
343         }
344         else
345         {
346             if ( log.isDebugEnabled() )
347             {
348                 log.debug( "Not creating a listener since we are not receiving." );
349             }
350         }
351     }
352 
353     /**
354      * Adds the lateral cache listener to the underlying cache-watch service.
355      * <p>
356      * @param cacheName The feature to be added to the LateralCacheListener attribute
357      * @param listener The feature to be added to the LateralCacheListener attribute
358      * @throws IOException
359      */
360     private <K, V> void addLateralCacheListener( String cacheName, ILateralCacheListener<K, V> listener )
361         throws IOException
362     {
363         synchronized ( this.lateralWatch )
364         {
365             lateralWatch.addCacheListener( cacheName, listener );
366         }
367     }
368 
369     /**
370      * Makes sure a listener gets created. It will get monitored as soon as it
371      * is used.
372      * <p>
373      * This should be called by create cache.
374      * <p>
375      * @param attr  ITCPLateralCacheAttributes
376      * @param cacheMgr
377      *
378      * @return the listener if created, else null
379      */
380     private <K, V> ILateralCacheListener<K, V> createListener( ITCPLateralCacheAttributes attr,
381             ICompositeCacheManager cacheMgr )
382     {
383         ILateralCacheListener<K, V> listener = null;
384 
385         // don't create a listener if we are not receiving.
386         if ( attr.isReceive() )
387         {
388             if ( log.isInfoEnabled() )
389             {
390                 log.info( "Getting listener for " + attr );
391             }
392 
393             // make a listener. if one doesn't exist
394             listener = LateralTCPListener.getInstance( attr, cacheMgr );
395 
396             // register for shutdown notification
397             cacheMgr.registerShutdownObserver( (IShutdownObserver) listener );
398         }
399         else
400         {
401             if ( log.isDebugEnabled() )
402             {
403                 log.debug( "Not creating a listener since we are not receiving." );
404             }
405         }
406 
407         return listener;
408     }
409 
410     /**
411      * Creates the discovery service. Only creates this for tcp laterals right now.
412      * <p>
413      * @param lac ITCPLateralCacheAttributes
414      * @param lcnwf
415      * @param cacheMgr
416      * @param cacheEventLogger
417      * @param elementSerializer
418      * @return null if none is created.
419      */
420     private synchronized <K, V> UDPDiscoveryService createDiscoveryService(
421             ITCPLateralCacheAttributes lac,
422             LateralCacheNoWaitFacade<K, V> lcnwf,
423             ICompositeCacheManager cacheMgr,
424             ICacheEventLogger cacheEventLogger,
425             IElementSerializer elementSerializer )
426     {
427         UDPDiscoveryService discovery = null;
428 
429         // create the UDP discovery for the TCP lateral
430         if ( lac.isUdpDiscoveryEnabled() )
431         {
432             // One can be used for all regions
433             LateralTCPDiscoveryListener discoveryListener = getDiscoveryListener( lac, cacheMgr );
434             discoveryListener.addNoWaitFacade( lac.getCacheName(), lcnwf );
435 
436             // need a factory for this so it doesn't
437             // get dereferenced, also we don't want one for every region.
438             discovery = UDPDiscoveryManager.getInstance().getService( lac.getUdpDiscoveryAddr(),
439                                                                       lac.getUdpDiscoveryPort(),
440                                                                       lac.getTcpListenerPort(), cacheMgr);
441 
442             discovery.addParticipatingCacheName( lac.getCacheName() );
443             discovery.addDiscoveryListener( discoveryListener );
444 
445             if ( log.isInfoEnabled() )
446             {
447                 log.info( "Registered TCP lateral cache [" + lac.getCacheName() + "] with UDPDiscoveryService." );
448             }
449         }
450         return discovery;
451     }
452 }