001package org.apache.commons.jcs.auxiliary.lateral.socket.tcp;
002
003/*
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements.  See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership.  The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License.  You may obtain a copy of the License at
011 *
012 *   http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing,
015 * software distributed under the License is distributed on an
016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017 * KIND, either express or implied.  See the License for the
018 * specific language governing permissions and limitations
019 * under the License.
020 */
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.StringTokenizer;
025import java.util.concurrent.ConcurrentHashMap;
026import java.util.concurrent.locks.ReentrantLock;
027
028import org.apache.commons.jcs.auxiliary.AbstractAuxiliaryCacheFactory;
029import org.apache.commons.jcs.auxiliary.AuxiliaryCacheAttributes;
030import org.apache.commons.jcs.auxiliary.lateral.LateralCache;
031import org.apache.commons.jcs.auxiliary.lateral.LateralCacheMonitor;
032import org.apache.commons.jcs.auxiliary.lateral.LateralCacheNoWait;
033import org.apache.commons.jcs.auxiliary.lateral.LateralCacheNoWaitFacade;
034import org.apache.commons.jcs.auxiliary.lateral.behavior.ILateralCacheListener;
035import org.apache.commons.jcs.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
036import org.apache.commons.jcs.engine.CacheWatchRepairable;
037import org.apache.commons.jcs.engine.ZombieCacheServiceNonLocal;
038import org.apache.commons.jcs.engine.ZombieCacheWatch;
039import org.apache.commons.jcs.engine.behavior.ICache;
040import org.apache.commons.jcs.engine.behavior.ICacheServiceNonLocal;
041import org.apache.commons.jcs.engine.behavior.ICompositeCacheManager;
042import org.apache.commons.jcs.engine.behavior.IElementSerializer;
043import org.apache.commons.jcs.engine.behavior.IShutdownObserver;
044import org.apache.commons.jcs.engine.logging.behavior.ICacheEventLogger;
045import org.apache.commons.jcs.utils.discovery.UDPDiscoveryManager;
046import org.apache.commons.jcs.utils.discovery.UDPDiscoveryService;
047import org.apache.commons.logging.Log;
048import org.apache.commons.logging.LogFactory;
049
050/**
051 * Constructs a LateralCacheNoWaitFacade for the given configuration. Each lateral service / local
052 * relationship is managed by one manager. This manager can have multiple caches. The remote
053 * relationships are consolidated and restored via these managers.
054 * <p>
055 * The facade provides a front to the composite cache so the implementation is transparent.
056 */
057public class LateralTCPCacheFactory
058    extends AbstractAuxiliaryCacheFactory
059{
060    /** The logger */
061    private static final Log log = LogFactory.getLog( LateralTCPCacheFactory.class );
062
063    /** Address to service map. */
064    private ConcurrentHashMap<String, ICacheServiceNonLocal<?, ?>> csnlInstances;
065
066    /** Lock for initialization of address to service map */
067    private ReentrantLock csnlLock;
068
069    /** Map of available discovery listener instances, keyed by port. */
070    private ConcurrentHashMap<String, LateralTCPDiscoveryListener> lTCPDLInstances;
071
072    /** Monitor thread */
073    private LateralCacheMonitor monitor;
074
075    /**
076     * Wrapper of the lateral cache watch service; or wrapper of a zombie
077     * service if failed to connect.
078     */
079    private CacheWatchRepairable lateralWatch;
080
081    /**
082     * Creates a TCP lateral.
083     * <p>
084     * @param iaca
085     * @param cacheMgr
086     * @param cacheEventLogger
087     * @param elementSerializer
088     * @return LateralCacheNoWaitFacade
089     */
090    @Override
091    public <K, V> LateralCacheNoWaitFacade<K, V> createCache(
092            AuxiliaryCacheAttributes iaca, ICompositeCacheManager cacheMgr,
093           ICacheEventLogger cacheEventLogger, IElementSerializer elementSerializer )
094    {
095        ITCPLateralCacheAttributes lac = (ITCPLateralCacheAttributes) iaca;
096        ArrayList<ICache<K, V>> noWaits = new ArrayList<ICache<K, V>>();
097
098        // pairs up the tcp servers and set the tcpServer value and
099        // get the manager and then get the cache
100        // no servers are required.
101        if ( lac.getTcpServers() != null )
102        {
103            StringTokenizer it = new StringTokenizer( lac.getTcpServers(), "," );
104            if ( log.isDebugEnabled() )
105            {
106                log.debug( "Configured for [" + it.countTokens() + "]  servers." );
107            }
108            while ( it.hasMoreElements() )
109            {
110                String server = (String) it.nextElement();
111                if ( log.isDebugEnabled() )
112                {
113                    log.debug( "tcp server = " + server );
114                }
115                ITCPLateralCacheAttributes lacC = (ITCPLateralCacheAttributes) lac.clone();
116                lacC.setTcpServer( server );
117
118                LateralCacheNoWait<K, V> lateralNoWait = createCacheNoWait(lacC, cacheEventLogger, elementSerializer);
119
120                addListenerIfNeeded( lacC, cacheMgr );
121                monitor.addCache(lateralNoWait);
122                noWaits.add( lateralNoWait );
123            }
124        }
125
126        ILateralCacheListener<K, V> listener = createListener( lac, cacheMgr );
127
128        // create the no wait facade.
129        @SuppressWarnings("unchecked") // No generic arrays in java
130        LateralCacheNoWait<K, V>[] lcnwArray = noWaits.toArray( new LateralCacheNoWait[0] );
131        LateralCacheNoWaitFacade<K, V> lcnwf =
132            new LateralCacheNoWaitFacade<K, V>(listener, lcnwArray, lac );
133
134        // create udp discovery if available.
135        createDiscoveryService( lac, lcnwf, cacheMgr, cacheEventLogger, elementSerializer );
136
137        return lcnwf;
138    }
139
140    protected <K, V> LateralCacheNoWait<K, V> createCacheNoWait( ITCPLateralCacheAttributes lca,
141            ICacheEventLogger cacheEventLogger, IElementSerializer elementSerializer )
142    {
143        ICacheServiceNonLocal<K, V> lateralService = getCSNLInstance(lca);
144
145        LateralCache<K, V> cache = new LateralCache<K, V>( lca, lateralService, this.monitor );
146        cache.setCacheEventLogger( cacheEventLogger );
147        cache.setElementSerializer( elementSerializer );
148
149        if ( log.isDebugEnabled() )
150        {
151            log.debug( "Created cache for noWait, cache [" + cache + "]" );
152        }
153
154        LateralCacheNoWait<K, V> lateralNoWait = new LateralCacheNoWait<K, V>( cache );
155        lateralNoWait.setCacheEventLogger( cacheEventLogger );
156        lateralNoWait.setElementSerializer( elementSerializer );
157
158        if ( log.isInfoEnabled() )
159        {
160            log.info( "Created LateralCacheNoWait for [" + lca + "] LateralCacheNoWait = [" + lateralNoWait
161                + "]" );
162        }
163
164        return lateralNoWait;
165    }
166
167    /**
168     * Initialize this factory
169     */
170    @Override
171    public void initialize()
172    {
173        this.csnlInstances = new ConcurrentHashMap<String, ICacheServiceNonLocal<?, ?>>();
174        this.csnlLock = new ReentrantLock();
175        this.lTCPDLInstances = new ConcurrentHashMap<String, LateralTCPDiscoveryListener>();
176
177        // Create the monitoring daemon thread
178        this.monitor = new LateralCacheMonitor(this);
179        this.monitor.setDaemon( true );
180        this.monitor.start();
181
182        this.lateralWatch = new CacheWatchRepairable();
183        this.lateralWatch.setCacheWatch( new ZombieCacheWatch() );
184    }
185
186    /**
187     * Dispose of this factory, clean up shared resources
188     */
189    @Override
190    public void dispose()
191    {
192        for (ICacheServiceNonLocal<?, ?> service : this.csnlInstances.values())
193        {
194            try
195            {
196                service.dispose("");
197            }
198            catch (IOException e)
199            {
200                log.error("Could not dispose service " + service, e);
201            }
202        }
203
204        this.csnlInstances.clear();
205
206        // TODO: shut down discovery listeners
207        this.lTCPDLInstances.clear();
208
209        if (this.monitor != null)
210        {
211            this.monitor.notifyShutdown();
212            try
213            {
214                this.monitor.join(5000);
215            }
216            catch (InterruptedException e)
217            {
218                // swallow
219            }
220            this.monitor = null;
221        }
222    }
223
224    /**
225     * Returns an instance of the cache service.
226     * <p>
227     * @param lca configuration for the creation of a new service instance
228     *
229     * @return ICacheServiceNonLocal&lt;K, V&gt;
230     */
231    // Need to cast because of common map for all cache services
232    @SuppressWarnings("unchecked")
233    public <K, V> ICacheServiceNonLocal<K, V> getCSNLInstance( ITCPLateralCacheAttributes lca )
234    {
235        String key = lca.getTcpServer();
236
237        ICacheServiceNonLocal<K, V> service = (ICacheServiceNonLocal<K, V>)csnlInstances.get( key );
238
239        if ( service == null || service instanceof ZombieCacheServiceNonLocal )
240        {
241            csnlLock.lock();
242
243            try
244            {
245                // double check
246                service = (ICacheServiceNonLocal<K, V>)csnlInstances.get( key );
247
248                // If service creation did not succeed last time, force retry
249                if ( service instanceof ZombieCacheServiceNonLocal)
250                {
251                    service = null;
252                    log.info("Disposing of zombie service instance for [" + key + "]");
253                }
254
255                if ( service == null )
256                {
257                    log.info( "Instance for [" + key + "] is null, creating" );
258
259                    // Create the service
260                    try
261                    {
262                        if ( log.isInfoEnabled() )
263                        {
264                            log.info( "Creating TCP service, lca = " + lca );
265                        }
266
267                        service = new LateralTCPService<K, V>( lca );
268                    }
269                    catch ( IOException ex )
270                    {
271                        // Failed to connect to the lateral server.
272                        // Configure this LateralCacheManager instance to use the
273                        // "zombie" services.
274                        log.error( "Failure, lateral instance will use zombie service", ex );
275
276                        service = new ZombieCacheServiceNonLocal<K, V>( lca.getZombieQueueMaxSize() );
277
278                        // Notify the cache monitor about the error, and kick off
279                        // the recovery process.
280                        monitor.notifyError();
281                    }
282
283                    csnlInstances.put( key, service );
284                }
285            }
286            finally
287            {
288                csnlLock.unlock();
289            }
290        }
291
292        return service;
293    }
294
295    /**
296     * Gets the instance attribute of the LateralCacheTCPListener class.
297     * <p>
298     * @param ilca ITCPLateralCacheAttributes
299     * @param cacheManager a reference to the global cache manager
300     *
301     * @return The instance value
302     */
303    private LateralTCPDiscoveryListener getDiscoveryListener( ITCPLateralCacheAttributes ilca, ICompositeCacheManager cacheManager )
304    {
305        String key = ilca.getUdpDiscoveryAddr() + ":" + ilca.getUdpDiscoveryPort();
306        LateralTCPDiscoveryListener ins = null;
307
308        LateralTCPDiscoveryListener newListener = new LateralTCPDiscoveryListener( this.getName(),  cacheManager);
309        ins = lTCPDLInstances.putIfAbsent(key, newListener );
310
311        if ( ins == null )
312        {
313            ins = newListener;
314
315            if ( log.isInfoEnabled() )
316            {
317                log.info( "Created new discovery listener for " + key + " cacheName for request " + ilca.getCacheName() );
318            }
319        }
320
321        return ins;
322    }
323
324    /**
325     * Add listener for receivers
326     * <p>
327     * @param iaca cache configuration attributes
328     * @param cacheMgr the composite cache manager
329     */
330    private void addListenerIfNeeded( ITCPLateralCacheAttributes iaca, ICompositeCacheManager cacheMgr )
331    {
332        // don't create a listener if we are not receiving.
333        if ( iaca.isReceive() )
334        {
335            try
336            {
337                addLateralCacheListener( iaca.getCacheName(), LateralTCPListener.getInstance( iaca, cacheMgr ) );
338            }
339            catch ( IOException ioe )
340            {
341                log.error( "Problem creating lateral listener", ioe );
342            }
343        }
344        else
345        {
346            if ( log.isDebugEnabled() )
347            {
348                log.debug( "Not creating a listener since we are not receiving." );
349            }
350        }
351    }
352
353    /**
354     * Adds the lateral cache listener to the underlying cache-watch service.
355     * <p>
356     * @param cacheName The feature to be added to the LateralCacheListener attribute
357     * @param listener The feature to be added to the LateralCacheListener attribute
358     * @throws IOException
359     */
360    private <K, V> void addLateralCacheListener( String cacheName, ILateralCacheListener<K, V> listener )
361        throws IOException
362    {
363        synchronized ( this.lateralWatch )
364        {
365            lateralWatch.addCacheListener( cacheName, listener );
366        }
367    }
368
369    /**
370     * Makes sure a listener gets created. It will get monitored as soon as it
371     * is used.
372     * <p>
373     * This should be called by create cache.
374     * <p>
375     * @param attr  ITCPLateralCacheAttributes
376     * @param cacheMgr
377     *
378     * @return the listener if created, else null
379     */
380    private <K, V> ILateralCacheListener<K, V> createListener( ITCPLateralCacheAttributes attr,
381            ICompositeCacheManager cacheMgr )
382    {
383        ILateralCacheListener<K, V> listener = null;
384
385        // don't create a listener if we are not receiving.
386        if ( attr.isReceive() )
387        {
388            if ( log.isInfoEnabled() )
389            {
390                log.info( "Getting listener for " + attr );
391            }
392
393            // make a listener. if one doesn't exist
394            listener = LateralTCPListener.getInstance( attr, cacheMgr );
395
396            // register for shutdown notification
397            cacheMgr.registerShutdownObserver( (IShutdownObserver) listener );
398        }
399        else
400        {
401            if ( log.isDebugEnabled() )
402            {
403                log.debug( "Not creating a listener since we are not receiving." );
404            }
405        }
406
407        return listener;
408    }
409
410    /**
411     * Creates the discovery service. Only creates this for tcp laterals right now.
412     * <p>
413     * @param lac ITCPLateralCacheAttributes
414     * @param lcnwf
415     * @param cacheMgr
416     * @param cacheEventLogger
417     * @param elementSerializer
418     * @return null if none is created.
419     */
420    private synchronized <K, V> UDPDiscoveryService createDiscoveryService(
421            ITCPLateralCacheAttributes lac,
422            LateralCacheNoWaitFacade<K, V> lcnwf,
423            ICompositeCacheManager cacheMgr,
424            ICacheEventLogger cacheEventLogger,
425            IElementSerializer elementSerializer )
426    {
427        UDPDiscoveryService discovery = null;
428
429        // create the UDP discovery for the TCP lateral
430        if ( lac.isUdpDiscoveryEnabled() )
431        {
432            // One can be used for all regions
433            LateralTCPDiscoveryListener discoveryListener = getDiscoveryListener( lac, cacheMgr );
434            discoveryListener.addNoWaitFacade( lac.getCacheName(), lcnwf );
435
436            // need a factory for this so it doesn't
437            // get dereferenced, also we don't want one for every region.
438            discovery = UDPDiscoveryManager.getInstance().getService( lac.getUdpDiscoveryAddr(),
439                                                                      lac.getUdpDiscoveryPort(),
440                                                                      lac.getTcpListenerPort(), cacheMgr);
441
442            discovery.addParticipatingCacheName( lac.getCacheName() );
443            discovery.addDiscoveryListener( discoveryListener );
444
445            if ( log.isInfoEnabled() )
446            {
447                log.info( "Registered TCP lateral cache [" + lac.getCacheName() + "] with UDPDiscoveryService." );
448            }
449        }
450        return discovery;
451    }
452}