001package org.apache.commons.jcs3.auxiliary.lateral.socket.tcp;
002
003/*
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements.  See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership.  The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License.  You may obtain a copy of the License at
011 *
012 *   http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing,
015 * software distributed under the License is distributed on an
016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017 * KIND, either express or implied.  See the License for the
018 * specific language governing permissions and limitations
019 * under the License.
020 */
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.concurrent.ConcurrentHashMap;
025
026import org.apache.commons.jcs3.auxiliary.AbstractAuxiliaryCacheFactory;
027import org.apache.commons.jcs3.auxiliary.AuxiliaryCacheAttributes;
028import org.apache.commons.jcs3.auxiliary.lateral.LateralCache;
029import org.apache.commons.jcs3.auxiliary.lateral.LateralCacheMonitor;
030import org.apache.commons.jcs3.auxiliary.lateral.LateralCacheNoWait;
031import org.apache.commons.jcs3.auxiliary.lateral.LateralCacheNoWaitFacade;
032import org.apache.commons.jcs3.auxiliary.lateral.behavior.ILateralCacheListener;
033import org.apache.commons.jcs3.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
034import org.apache.commons.jcs3.engine.CacheWatchRepairable;
035import org.apache.commons.jcs3.engine.ZombieCacheServiceNonLocal;
036import org.apache.commons.jcs3.engine.ZombieCacheWatch;
037import org.apache.commons.jcs3.engine.behavior.ICacheServiceNonLocal;
038import org.apache.commons.jcs3.engine.behavior.ICompositeCacheManager;
039import org.apache.commons.jcs3.engine.behavior.IElementSerializer;
040import org.apache.commons.jcs3.engine.behavior.IShutdownObserver;
041import org.apache.commons.jcs3.engine.control.CompositeCacheManager;
042import org.apache.commons.jcs3.engine.logging.behavior.ICacheEventLogger;
043import org.apache.commons.jcs3.log.Log;
044import org.apache.commons.jcs3.log.LogManager;
045import org.apache.commons.jcs3.utils.discovery.UDPDiscoveryManager;
046import org.apache.commons.jcs3.utils.discovery.UDPDiscoveryService;
047import org.apache.commons.jcs3.utils.serialization.StandardSerializer;
048
049/**
050 * Constructs a LateralCacheNoWaitFacade for the given configuration. Each lateral service / local
051 * relationship is managed by one manager. This manager can have multiple caches. The remote
052 * relationships are consolidated and restored via these managers.
053 * <p>
054 * The facade provides a front to the composite cache so the implementation is transparent.
055 */
056public class LateralTCPCacheFactory
057    extends AbstractAuxiliaryCacheFactory
058{
059    /** The logger */
060    private static final Log log = LogManager.getLog( LateralTCPCacheFactory.class );
061
062    /** Address to service map. */
063    private ConcurrentHashMap<String, ICacheServiceNonLocal<?, ?>> csnlInstances;
064
065    /** Map of available discovery listener instances, keyed by port. */
066    private ConcurrentHashMap<String, LateralTCPDiscoveryListener> lTCPDLInstances;
067
068    /** Monitor thread */
069    private LateralCacheMonitor monitor;
070
071    /**
072     * Wrapper of the lateral cache watch service; or wrapper of a zombie
073     * service if failed to connect.
074     */
075    private CacheWatchRepairable lateralWatch;
076
077    /**
078     * Creates a TCP lateral.
079     * <p>
080     * @param <K> cache key type
081     * @param <V> cache value type
082     * @param iaca the cache configuration object
083     * @param cacheMgr the cache manager
084     * @param cacheEventLogger the event logger
085     * @param elementSerializer the serializer to use when sending or receiving
086     * @return a LateralCacheNoWaitFacade
087     */
088    @Override
089    public <K, V> LateralCacheNoWaitFacade<K, V> createCache(
090            final AuxiliaryCacheAttributes iaca, final ICompositeCacheManager cacheMgr,
091           final ICacheEventLogger cacheEventLogger, final IElementSerializer elementSerializer )
092    {
093        final ITCPLateralCacheAttributes lac = (ITCPLateralCacheAttributes) iaca;
094        final ArrayList<LateralCacheNoWait<K, V>> noWaits = new ArrayList<>();
095
096        // pairs up the tcp servers and set the tcpServer value and
097        // get the manager and then get the cache
098        // no servers are required.
099        if (lac.getTcpServers() != null && !lac.getTcpServers().isEmpty())
100        {
101            final String servers[] = lac.getTcpServers().split("\\s*,\\s*");
102            log.debug( "Configured for [{0}] servers.", servers.length );
103
104            for (final String server : servers)
105            {
106                log.debug( "tcp server = {0}", server );
107                final ITCPLateralCacheAttributes lacClone = (ITCPLateralCacheAttributes) lac.clone();
108                lacClone.setTcpServer( server );
109
110                final LateralCacheNoWait<K, V> lateralNoWait = createCacheNoWait(lacClone, cacheEventLogger, elementSerializer);
111
112                addListenerIfNeeded( lacClone, cacheMgr, elementSerializer );
113                monitorCache(lateralNoWait);
114                noWaits.add( lateralNoWait );
115            }
116        }
117
118        final ILateralCacheListener<K, V> listener = createListener( lac, cacheMgr, elementSerializer );
119
120        // create the no wait facade.
121        final LateralCacheNoWaitFacade<K, V> lcnwf =
122            new LateralCacheNoWaitFacade<>(listener, noWaits, lac);
123
124        // create udp discovery if available.
125        createDiscoveryService( lac, lcnwf, cacheMgr, cacheEventLogger, elementSerializer );
126
127        return lcnwf;
128    }
129
130    /**
131     * Create a LateralCacheNoWait for the server configured in lca
132     *
133     * @param <K> cache key type
134     * @param <V> cache value type
135     * @param lca the cache configuration object
136     * @param cacheEventLogger the event logger
137     * @param elementSerializer the serializer to use when sending or receiving
138     * @return a LateralCacheNoWait
139     */
140    public <K, V> LateralCacheNoWait<K, V> createCacheNoWait( final ITCPLateralCacheAttributes lca,
141            final ICacheEventLogger cacheEventLogger, final IElementSerializer elementSerializer )
142    {
143        final ICacheServiceNonLocal<K, V> lateralService = getCSNLInstance(lca, elementSerializer);
144
145        final LateralCache<K, V> cache = new LateralCache<>( lca, lateralService, this.monitor );
146        cache.setCacheEventLogger( cacheEventLogger );
147        cache.setElementSerializer( elementSerializer );
148
149        log.debug( "Created cache for noWait, cache [{0}]", cache );
150
151        final LateralCacheNoWait<K, V> lateralNoWait = new LateralCacheNoWait<>( cache );
152        lateralNoWait.setIdentityKey(lca.getTcpServer());
153
154        log.info( "Created LateralCacheNoWait for [{0}] LateralCacheNoWait = [{1}]",
155                lca, lateralNoWait );
156
157        return lateralNoWait;
158    }
159
160    /**
161     * Initialize this factory
162     */
163    @Override
164    public void initialize()
165    {
166        this.csnlInstances = new ConcurrentHashMap<>();
167        this.lTCPDLInstances = new ConcurrentHashMap<>();
168
169        // Create the monitoring daemon thread
170        this.monitor = new LateralCacheMonitor(this);
171        this.monitor.setDaemon( true );
172        this.monitor.start();
173
174        this.lateralWatch = new CacheWatchRepairable();
175        this.lateralWatch.setCacheWatch( new ZombieCacheWatch() );
176    }
177
178    /**
179     * Dispose of this factory, clean up shared resources
180     */
181    @Override
182    public void dispose()
183    {
184        for (final ICacheServiceNonLocal<?, ?> service : this.csnlInstances.values())
185        {
186            try
187            {
188                service.dispose("");
189            }
190            catch (final IOException e)
191            {
192                log.error("Could not dispose service " + service, e);
193            }
194        }
195
196        this.csnlInstances.clear();
197
198        // TODO: shut down discovery listeners
199        this.lTCPDLInstances.clear();
200
201        if (this.monitor != null)
202        {
203            this.monitor.notifyShutdown();
204            try
205            {
206                this.monitor.join(5000);
207            }
208            catch (final InterruptedException e)
209            {
210                // swallow
211            }
212            this.monitor = null;
213        }
214    }
215
216    /**
217     * Returns an instance of the cache service.
218     * <p>
219     * @param <K> cache key type
220     * @param <V> cache value type
221     * @param lca configuration for the creation of a new service instance
222     *
223     * @return ICacheServiceNonLocal&lt;K, V&gt;
224     *
225     * @deprecated Specify serializer
226     */
227    @Deprecated
228    public <K, V> ICacheServiceNonLocal<K, V> getCSNLInstance( final ITCPLateralCacheAttributes lca )
229    {
230        return getCSNLInstance(lca, new StandardSerializer());
231    }
232
233    /**
234     * Returns an instance of the cache service.
235     * <p>
236     * @param <K> cache key type
237     * @param <V> cache value type
238     * @param lca configuration for the creation of a new service instance
239     * @param elementSerializer the serializer to use when sending or receiving
240     *
241     * @return ICacheServiceNonLocal&lt;K, V&gt;
242     * @since 3.1
243     */
244    // Need to cast because of common map for all cache services
245    @SuppressWarnings("unchecked")
246    public <K, V> ICacheServiceNonLocal<K, V> getCSNLInstance(final ITCPLateralCacheAttributes lca,
247            final IElementSerializer elementSerializer)
248    {
249        final String key = lca.getTcpServer();
250
251        return (ICacheServiceNonLocal<K, V>) csnlInstances.compute(key, (name, service) -> {
252
253            ICacheServiceNonLocal<?, ?> newService = service;
254
255            // If service creation did not succeed last time, force retry
256            if (service instanceof ZombieCacheServiceNonLocal)
257            {
258                log.info("Disposing of zombie service instance for [{0}]", name);
259                newService = null;
260            }
261
262            if (newService == null)
263            {
264                log.info( "Instance for [{0}] is null, creating", name );
265
266                // Create the service
267                try
268                {
269                    log.info( "Creating TCP service, lca = {0}", lca );
270
271                    newService = new LateralTCPService<>(lca, elementSerializer);
272                }
273                catch ( final IOException ex )
274                {
275                    // Failed to connect to the lateral server.
276                    // Configure this LateralCacheManager instance to use the
277                    // "zombie" services.
278                    log.error( "Failure, lateral instance will use zombie service", ex );
279
280                    newService = new ZombieCacheServiceNonLocal<>(lca.getZombieQueueMaxSize());
281
282                    // Notify the cache monitor about the error, and kick off
283                    // the recovery process.
284                    monitor.notifyError();
285                }
286            }
287
288            return newService;
289        });
290    }
291
292    /**
293     * Add cache instance to monitor
294     *
295     * @param cache the cache instance
296     * @since 3.1
297     */
298    public void monitorCache(final LateralCacheNoWait<?, ?> cache)
299    {
300        monitor.addCache(cache);
301    }
302
303    /**
304     * Gets the instance attribute of the LateralCacheTCPListener class.
305     * <p>
306     * @param ilca ITCPLateralCacheAttributes
307     * @param cacheManager a reference to the global cache manager
308     * @param cacheEventLogger Reference to the cache event logger for auxiliary cache creation
309     * @param elementSerializer Reference to the cache element serializer for auxiliary cache
310     *
311     * @return The instance value
312     */
313    private LateralTCPDiscoveryListener getDiscoveryListener(final ITCPLateralCacheAttributes ilca,
314            final ICompositeCacheManager cacheManager, final ICacheEventLogger cacheEventLogger,
315            final IElementSerializer elementSerializer)
316    {
317        final String key = ilca.getUdpDiscoveryAddr() + ":" + ilca.getUdpDiscoveryPort();
318
319        return lTCPDLInstances.computeIfAbsent(key, key1 -> {
320            log.info("Created new discovery listener for cacheName {0} and request {1}",
321                    ilca.getCacheName(), key1);
322            return new LateralTCPDiscoveryListener( this.getName(),
323                    (CompositeCacheManager) cacheManager,
324                    cacheEventLogger, elementSerializer);
325        });
326    }
327
328    /**
329     * Add listener for receivers
330     * <p>
331     * @param iaca cache configuration attributes
332     * @param cacheMgr the composite cache manager
333     * @param serializer the serializer to use when receiving
334     */
335    private void addListenerIfNeeded( final ITCPLateralCacheAttributes iaca, final ICompositeCacheManager cacheMgr, final IElementSerializer elementSerializer )
336    {
337        // don't create a listener if we are not receiving.
338        if ( iaca.isReceive() )
339        {
340            try
341            {
342                addLateralCacheListener(iaca.getCacheName(), createListener(iaca, cacheMgr, elementSerializer));
343            }
344            catch ( final IOException ioe )
345            {
346                log.error("Problem creating lateral listener", ioe);
347            }
348        }
349        else
350        {
351            log.debug( "Not creating a listener since we are not receiving." );
352        }
353    }
354
355    /**
356     * Adds the lateral cache listener to the underlying cache-watch service.
357     * <p>
358     * @param cacheName The feature to be added to the LateralCacheListener attribute
359     * @param listener The feature to be added to the LateralCacheListener attribute
360     * @throws IOException
361     */
362    private <K, V> void addLateralCacheListener( final String cacheName, final ILateralCacheListener<K, V> listener )
363        throws IOException
364    {
365        synchronized ( this.lateralWatch )
366        {
367            lateralWatch.addCacheListener( cacheName, listener );
368        }
369    }
370
371    /**
372     * Makes sure a listener gets created. It will get monitored as soon as it
373     * is used.
374     * <p>
375     * This should be called by create cache.
376     * <p>
377     * @param attr  ITCPLateralCacheAttributes
378     * @param cacheMgr the composite cache manager
379     * @param serializer the serializer to use when receiving
380     *
381     * @return the listener if created, else null
382     */
383    private static <K, V> ILateralCacheListener<K, V> createListener( final ITCPLateralCacheAttributes attr,
384            final ICompositeCacheManager cacheMgr, final IElementSerializer elementSerializer )
385    {
386        ILateralCacheListener<K, V> listener = null;
387
388        // don't create a listener if we are not receiving.
389        if ( attr.isReceive() )
390        {
391            log.info( "Getting listener for {0}", attr );
392
393            // make a listener. if one doesn't exist
394            listener = LateralTCPListener.getInstance( attr, cacheMgr, elementSerializer );
395
396            // register for shutdown notification
397            cacheMgr.registerShutdownObserver( (IShutdownObserver) listener );
398        }
399        else
400        {
401            log.debug( "Not creating a listener since we are not receiving." );
402        }
403
404        return listener;
405    }
406
407    /**
408     * Creates the discovery service. Only creates this for tcp laterals right now.
409     * <p>
410     * @param lac ITCPLateralCacheAttributes
411     * @param lcnwf the lateral facade
412     * @param cacheMgr a reference to the global cache manager
413     * @param cacheEventLogger Reference to the cache event logger for auxiliary cache creation
414     * @param elementSerializer Reference to the cache element serializer for auxiliary cache
415     */
416    private synchronized <K, V> void createDiscoveryService(
417            final ITCPLateralCacheAttributes lac,
418            final LateralCacheNoWaitFacade<K, V> lcnwf,
419            final ICompositeCacheManager cacheMgr,
420            final ICacheEventLogger cacheEventLogger,
421            final IElementSerializer elementSerializer )
422    {
423        UDPDiscoveryService discovery = null;
424
425        // create the UDP discovery for the TCP lateral
426        if ( lac.isUdpDiscoveryEnabled() )
427        {
428            // One can be used for all regions
429            final LateralTCPDiscoveryListener discoveryListener =
430                    getDiscoveryListener(lac, cacheMgr, cacheEventLogger, elementSerializer);
431            discoveryListener.addNoWaitFacade( lac.getCacheName(), lcnwf );
432
433            // need a factory for this so it doesn't
434            // get dereferenced, also we don't want one for every region.
435            discovery = UDPDiscoveryManager.getInstance().getService(
436                    lac.getUdpDiscoveryAddr(), lac.getUdpDiscoveryPort(),
437                    lac.getTcpListenerHost(), lac.getTcpListenerPort(), lac.getUdpTTL(),
438                    cacheMgr, elementSerializer);
439
440            discovery.addParticipatingCacheName( lac.getCacheName() );
441            discovery.addDiscoveryListener( discoveryListener );
442
443            log.info( "Registered TCP lateral cache [{0}] with UDPDiscoveryService.",
444                    lac::getCacheName);
445        }
446    }
447}