View Javadoc
1   package org.apache.commons.jcs3.auxiliary.lateral.socket.tcp;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.concurrent.ConcurrentHashMap;
25  
26  import org.apache.commons.jcs3.auxiliary.AbstractAuxiliaryCacheFactory;
27  import org.apache.commons.jcs3.auxiliary.AuxiliaryCacheAttributes;
28  import org.apache.commons.jcs3.auxiliary.lateral.LateralCache;
29  import org.apache.commons.jcs3.auxiliary.lateral.LateralCacheMonitor;
30  import org.apache.commons.jcs3.auxiliary.lateral.LateralCacheNoWait;
31  import org.apache.commons.jcs3.auxiliary.lateral.LateralCacheNoWaitFacade;
32  import org.apache.commons.jcs3.auxiliary.lateral.behavior.ILateralCacheListener;
33  import org.apache.commons.jcs3.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
34  import org.apache.commons.jcs3.engine.CacheWatchRepairable;
35  import org.apache.commons.jcs3.engine.ZombieCacheServiceNonLocal;
36  import org.apache.commons.jcs3.engine.ZombieCacheWatch;
37  import org.apache.commons.jcs3.engine.behavior.ICacheServiceNonLocal;
38  import org.apache.commons.jcs3.engine.behavior.ICompositeCacheManager;
39  import org.apache.commons.jcs3.engine.behavior.IElementSerializer;
40  import org.apache.commons.jcs3.engine.behavior.IShutdownObserver;
41  import org.apache.commons.jcs3.engine.control.CompositeCacheManager;
42  import org.apache.commons.jcs3.engine.logging.behavior.ICacheEventLogger;
43  import org.apache.commons.jcs3.log.Log;
44  import org.apache.commons.jcs3.log.LogManager;
45  import org.apache.commons.jcs3.utils.discovery.UDPDiscoveryManager;
46  import org.apache.commons.jcs3.utils.discovery.UDPDiscoveryService;
47  import org.apache.commons.jcs3.utils.serialization.StandardSerializer;
48  
49  /**
50   * Constructs a LateralCacheNoWaitFacade for the given configuration. Each lateral service / local
51   * relationship is managed by one manager. This manager can have multiple caches. The remote
52   * relationships are consolidated and restored via these managers.
53   * <p>
54   * The facade provides a front to the composite cache so the implementation is transparent.
55   */
56  public class LateralTCPCacheFactory
57      extends AbstractAuxiliaryCacheFactory
58  {
59      /** The logger */
60      private static final Log log = LogManager.getLog( LateralTCPCacheFactory.class );
61  
62      /** Address to service map. */
63      private ConcurrentHashMap<String, ICacheServiceNonLocal<?, ?>> csnlInstances;
64  
65      /** Map of available discovery listener instances, keyed by port. */
66      private ConcurrentHashMap<String, LateralTCPDiscoveryListener> lTCPDLInstances;
67  
68      /** Monitor thread */
69      private LateralCacheMonitor monitor;
70  
71      /**
72       * Wrapper of the lateral cache watch service; or wrapper of a zombie
73       * service if failed to connect.
74       */
75      private CacheWatchRepairable lateralWatch;
76  
77      /**
78       * Creates a TCP lateral.
79       * <p>
80       * @param <K> cache key type
81       * @param <V> cache value type
82       * @param iaca the cache configuration object
83       * @param cacheMgr the cache manager
84       * @param cacheEventLogger the event logger
85       * @param elementSerializer the serializer to use when sending or receiving
86       * @return a LateralCacheNoWaitFacade
87       */
88      @Override
89      public <K, V> LateralCacheNoWaitFacade<K, V> createCache(
90              final AuxiliaryCacheAttributes iaca, final ICompositeCacheManager cacheMgr,
91             final ICacheEventLogger cacheEventLogger, final IElementSerializer elementSerializer )
92      {
93          final ITCPLateralCacheAttributes lac = (ITCPLateralCacheAttributes) iaca;
94          final ArrayList<LateralCacheNoWait<K, V>> noWaits = new ArrayList<>();
95  
96          // pairs up the tcp servers and set the tcpServer value and
97          // get the manager and then get the cache
98          // no servers are required.
99          if (lac.getTcpServers() != null && !lac.getTcpServers().isEmpty())
100         {
101             final String servers[] = lac.getTcpServers().split("\\s*,\\s*");
102             log.debug( "Configured for [{0}] servers.", servers.length );
103 
104             for (final String server : servers)
105             {
106                 log.debug( "tcp server = {0}", server );
107                 final ITCPLateralCacheAttributes lacClone = (ITCPLateralCacheAttributes) lac.clone();
108                 lacClone.setTcpServer( server );
109 
110                 final LateralCacheNoWait<K, V> lateralNoWait = createCacheNoWait(lacClone, cacheEventLogger, elementSerializer);
111 
112                 addListenerIfNeeded( lacClone, cacheMgr, elementSerializer );
113                 monitorCache(lateralNoWait);
114                 noWaits.add( lateralNoWait );
115             }
116         }
117 
118         final ILateralCacheListener<K, V> listener = createListener( lac, cacheMgr, elementSerializer );
119 
120         // create the no wait facade.
121         final LateralCacheNoWaitFacade<K, V> lcnwf =
122             new LateralCacheNoWaitFacade<>(listener, noWaits, lac);
123 
124         // create udp discovery if available.
125         createDiscoveryService( lac, lcnwf, cacheMgr, cacheEventLogger, elementSerializer );
126 
127         return lcnwf;
128     }
129 
130     /**
131      * Create a LateralCacheNoWait for the server configured in lca
132      *
133      * @param <K> cache key type
134      * @param <V> cache value type
135      * @param lca the cache configuration object
136      * @param cacheEventLogger the event logger
137      * @param elementSerializer the serializer to use when sending or receiving
138      * @return a LateralCacheNoWait
139      */
140     public <K, V> LateralCacheNoWait<K, V> createCacheNoWait( final ITCPLateralCacheAttributes lca,
141             final ICacheEventLogger cacheEventLogger, final IElementSerializer elementSerializer )
142     {
143         final ICacheServiceNonLocal<K, V> lateralService = getCSNLInstance(lca, elementSerializer);
144 
145         final LateralCache<K, V> cache = new LateralCache<>( lca, lateralService, this.monitor );
146         cache.setCacheEventLogger( cacheEventLogger );
147         cache.setElementSerializer( elementSerializer );
148 
149         log.debug( "Created cache for noWait, cache [{0}]", cache );
150 
151         final LateralCacheNoWait<K, V> lateralNoWait = new LateralCacheNoWait<>( cache );
152         lateralNoWait.setIdentityKey(lca.getTcpServer());
153 
154         log.info( "Created LateralCacheNoWait for [{0}] LateralCacheNoWait = [{1}]",
155                 lca, lateralNoWait );
156 
157         return lateralNoWait;
158     }
159 
160     /**
161      * Initialize this factory
162      */
163     @Override
164     public void initialize()
165     {
166         this.csnlInstances = new ConcurrentHashMap<>();
167         this.lTCPDLInstances = new ConcurrentHashMap<>();
168 
169         // Create the monitoring daemon thread
170         this.monitor = new LateralCacheMonitor(this);
171         this.monitor.setDaemon( true );
172         this.monitor.start();
173 
174         this.lateralWatch = new CacheWatchRepairable();
175         this.lateralWatch.setCacheWatch( new ZombieCacheWatch() );
176     }
177 
178     /**
179      * Dispose of this factory, clean up shared resources
180      */
181     @Override
182     public void dispose()
183     {
184         for (final ICacheServiceNonLocal<?, ?> service : this.csnlInstances.values())
185         {
186             try
187             {
188                 service.dispose("");
189             }
190             catch (final IOException e)
191             {
192                 log.error("Could not dispose service " + service, e);
193             }
194         }
195 
196         this.csnlInstances.clear();
197 
198         // TODO: shut down discovery listeners
199         this.lTCPDLInstances.clear();
200 
201         if (this.monitor != null)
202         {
203             this.monitor.notifyShutdown();
204             try
205             {
206                 this.monitor.join(5000);
207             }
208             catch (final InterruptedException e)
209             {
210                 // swallow
211             }
212             this.monitor = null;
213         }
214     }
215 
216     /**
217      * Returns an instance of the cache service.
218      * <p>
219      * @param <K> cache key type
220      * @param <V> cache value type
221      * @param lca configuration for the creation of a new service instance
222      *
223      * @return ICacheServiceNonLocal&lt;K, V&gt;
224      *
225      * @deprecated Specify serializer
226      */
227     @Deprecated
228     public <K, V> ICacheServiceNonLocal<K, V> getCSNLInstance( final ITCPLateralCacheAttributes lca )
229     {
230         return getCSNLInstance(lca, new StandardSerializer());
231     }
232 
233     /**
234      * Returns an instance of the cache service.
235      * <p>
236      * @param <K> cache key type
237      * @param <V> cache value type
238      * @param lca configuration for the creation of a new service instance
239      * @param elementSerializer the serializer to use when sending or receiving
240      *
241      * @return ICacheServiceNonLocal&lt;K, V&gt;
242      * @since 3.1
243      */
244     // Need to cast because of common map for all cache services
245     @SuppressWarnings("unchecked")
246     public <K, V> ICacheServiceNonLocal<K, V> getCSNLInstance(final ITCPLateralCacheAttributes lca,
247             final IElementSerializer elementSerializer)
248     {
249         final String key = lca.getTcpServer();
250 
251         return (ICacheServiceNonLocal<K, V>) csnlInstances.compute(key, (name, service) -> {
252 
253             ICacheServiceNonLocal<?, ?> newService = service;
254 
255             // If service creation did not succeed last time, force retry
256             if (service instanceof ZombieCacheServiceNonLocal)
257             {
258                 log.info("Disposing of zombie service instance for [{0}]", name);
259                 newService = null;
260             }
261 
262             if (newService == null)
263             {
264                 log.info( "Instance for [{0}] is null, creating", name );
265 
266                 // Create the service
267                 try
268                 {
269                     log.info( "Creating TCP service, lca = {0}", lca );
270 
271                     newService = new LateralTCPService<>(lca, elementSerializer);
272                 }
273                 catch ( final IOException ex )
274                 {
275                     // Failed to connect to the lateral server.
276                     // Configure this LateralCacheManager instance to use the
277                     // "zombie" services.
278                     log.error( "Failure, lateral instance will use zombie service", ex );
279 
280                     newService = new ZombieCacheServiceNonLocal<>(lca.getZombieQueueMaxSize());
281 
282                     // Notify the cache monitor about the error, and kick off
283                     // the recovery process.
284                     monitor.notifyError();
285                 }
286             }
287 
288             return newService;
289         });
290     }
291 
292     /**
293      * Add cache instance to monitor
294      *
295      * @param cache the cache instance
296      * @since 3.1
297      */
298     public void monitorCache(final LateralCacheNoWait<?, ?> cache)
299     {
300         monitor.addCache(cache);
301     }
302 
303     /**
304      * Gets the instance attribute of the LateralCacheTCPListener class.
305      * <p>
306      * @param ilca ITCPLateralCacheAttributes
307      * @param cacheManager a reference to the global cache manager
308      * @param cacheEventLogger Reference to the cache event logger for auxiliary cache creation
309      * @param elementSerializer Reference to the cache element serializer for auxiliary cache
310      *
311      * @return The instance value
312      */
313     private LateralTCPDiscoveryListener getDiscoveryListener(final ITCPLateralCacheAttributes ilca,
314             final ICompositeCacheManager cacheManager, final ICacheEventLogger cacheEventLogger,
315             final IElementSerializer elementSerializer)
316     {
317         final String key = ilca.getUdpDiscoveryAddr() + ":" + ilca.getUdpDiscoveryPort();
318 
319         return lTCPDLInstances.computeIfAbsent(key, key1 -> {
320             log.info("Created new discovery listener for cacheName {0} and request {1}",
321                     ilca.getCacheName(), key1);
322             return new LateralTCPDiscoveryListener( this.getName(),
323                     (CompositeCacheManager) cacheManager,
324                     cacheEventLogger, elementSerializer);
325         });
326     }
327 
328     /**
329      * Add listener for receivers
330      * <p>
331      * @param iaca cache configuration attributes
332      * @param cacheMgr the composite cache manager
333      * @param serializer the serializer to use when receiving
334      */
335     private void addListenerIfNeeded( final ITCPLateralCacheAttributes iaca, final ICompositeCacheManager cacheMgr, final IElementSerializer elementSerializer )
336     {
337         // don't create a listener if we are not receiving.
338         if ( iaca.isReceive() )
339         {
340             try
341             {
342                 addLateralCacheListener(iaca.getCacheName(), createListener(iaca, cacheMgr, elementSerializer));
343             }
344             catch ( final IOException ioe )
345             {
346                 log.error("Problem creating lateral listener", ioe);
347             }
348         }
349         else
350         {
351             log.debug( "Not creating a listener since we are not receiving." );
352         }
353     }
354 
355     /**
356      * Adds the lateral cache listener to the underlying cache-watch service.
357      * <p>
358      * @param cacheName The feature to be added to the LateralCacheListener attribute
359      * @param listener The feature to be added to the LateralCacheListener attribute
360      * @throws IOException
361      */
362     private <K, V> void addLateralCacheListener( final String cacheName, final ILateralCacheListener<K, V> listener )
363         throws IOException
364     {
365         synchronized ( this.lateralWatch )
366         {
367             lateralWatch.addCacheListener( cacheName, listener );
368         }
369     }
370 
371     /**
372      * Makes sure a listener gets created. It will get monitored as soon as it
373      * is used.
374      * <p>
375      * This should be called by create cache.
376      * <p>
377      * @param attr  ITCPLateralCacheAttributes
378      * @param cacheMgr the composite cache manager
379      * @param serializer the serializer to use when receiving
380      *
381      * @return the listener if created, else null
382      */
383     private static <K, V> ILateralCacheListener<K, V> createListener( final ITCPLateralCacheAttributes attr,
384             final ICompositeCacheManager cacheMgr, final IElementSerializer elementSerializer )
385     {
386         ILateralCacheListener<K, V> listener = null;
387 
388         // don't create a listener if we are not receiving.
389         if ( attr.isReceive() )
390         {
391             log.info( "Getting listener for {0}", attr );
392 
393             // make a listener. if one doesn't exist
394             listener = LateralTCPListener.getInstance( attr, cacheMgr, elementSerializer );
395 
396             // register for shutdown notification
397             cacheMgr.registerShutdownObserver( (IShutdownObserver) listener );
398         }
399         else
400         {
401             log.debug( "Not creating a listener since we are not receiving." );
402         }
403 
404         return listener;
405     }
406 
407     /**
408      * Creates the discovery service. Only creates this for tcp laterals right now.
409      * <p>
410      * @param lac ITCPLateralCacheAttributes
411      * @param lcnwf the lateral facade
412      * @param cacheMgr a reference to the global cache manager
413      * @param cacheEventLogger Reference to the cache event logger for auxiliary cache creation
414      * @param elementSerializer Reference to the cache element serializer for auxiliary cache
415      */
416     private synchronized <K, V> void createDiscoveryService(
417             final ITCPLateralCacheAttributes lac,
418             final LateralCacheNoWaitFacade<K, V> lcnwf,
419             final ICompositeCacheManager cacheMgr,
420             final ICacheEventLogger cacheEventLogger,
421             final IElementSerializer elementSerializer )
422     {
423         UDPDiscoveryService discovery = null;
424 
425         // create the UDP discovery for the TCP lateral
426         if ( lac.isUdpDiscoveryEnabled() )
427         {
428             // One can be used for all regions
429             final LateralTCPDiscoveryListener discoveryListener =
430                     getDiscoveryListener(lac, cacheMgr, cacheEventLogger, elementSerializer);
431             discoveryListener.addNoWaitFacade( lac.getCacheName(), lcnwf );
432 
433             // need a factory for this so it doesn't
434             // get dereferenced, also we don't want one for every region.
435             discovery = UDPDiscoveryManager.getInstance().getService(
436                     lac.getUdpDiscoveryAddr(), lac.getUdpDiscoveryPort(),
437                     lac.getTcpListenerHost(), lac.getTcpListenerPort(), lac.getUdpTTL(),
438                     cacheMgr, elementSerializer);
439 
440             discovery.addParticipatingCacheName( lac.getCacheName() );
441             discovery.addDiscoveryListener( discoveryListener );
442 
443             log.info( "Registered TCP lateral cache [{0}] with UDPDiscoveryService.",
444                     lac::getCacheName);
445         }
446     }
447 }