001package org.apache.commons.jcs.auxiliary.lateral.socket.tcp;
002
003/*
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements.  See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership.  The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License.  You may obtain a copy of the License at
011 *
012 *   http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing,
015 * software distributed under the License is distributed on an
016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017 * KIND, either express or implied.  See the License for the
018 * specific language governing permissions and limitations
019 * under the License.
020 */
021
022import java.io.EOFException;
023import java.io.IOException;
024import java.io.ObjectInputStream;
025import java.io.ObjectOutputStream;
026import java.io.Serializable;
027import java.net.InetAddress;
028import java.net.ServerSocket;
029import java.net.Socket;
030import java.net.SocketException;
031import java.net.SocketTimeoutException;
032import java.util.HashMap;
033import java.util.Map;
034import java.util.Set;
035import java.util.concurrent.ExecutorService;
036import java.util.concurrent.Executors;
037import java.util.concurrent.atomic.AtomicBoolean;
038
039import org.apache.commons.jcs.access.exception.CacheException;
040import org.apache.commons.jcs.auxiliary.lateral.LateralElementDescriptor;
041import org.apache.commons.jcs.auxiliary.lateral.behavior.ILateralCacheListener;
042import org.apache.commons.jcs.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
043import org.apache.commons.jcs.engine.CacheInfo;
044import org.apache.commons.jcs.engine.behavior.ICacheElement;
045import org.apache.commons.jcs.engine.behavior.ICompositeCacheManager;
046import org.apache.commons.jcs.engine.behavior.IShutdownObserver;
047import org.apache.commons.jcs.engine.control.CompositeCache;
048import org.apache.commons.jcs.engine.control.CompositeCacheManager;
049import org.apache.commons.jcs.io.ObjectInputStreamClassLoaderAware;
050import org.apache.commons.jcs.utils.threadpool.DaemonThreadFactory;
051import org.apache.commons.logging.Log;
052import org.apache.commons.logging.LogFactory;
053
054/**
055 * Listens for connections from other TCP lateral caches and handles them. The initialization method
056 * starts a listening thread, which creates a socket server. When messages are received they are
057 * passed to a pooled executor which then calls the appropriate handle method.
058 */
059public class LateralTCPListener<K, V>
060    implements ILateralCacheListener<K, V>, IShutdownObserver
061{
062    /** The logger */
063    private static final Log log = LogFactory.getLog( LateralTCPListener.class );
064
065    /** How long the server will block on an accept(). 0 is infinite. */
066    private static final int acceptTimeOut = 1000;
067
068    /** The CacheHub this listener is associated with */
069    private transient ICompositeCacheManager cacheManager;
070
071    /** Map of available instances, keyed by port */
072    private static final HashMap<String, ILateralCacheListener<?, ?>> instances =
073        new HashMap<String, ILateralCacheListener<?, ?>>();
074
075    /** The socket listener */
076    private ListenerThread receiver;
077
078    /** Configuration attributes */
079    private ITCPLateralCacheAttributes tcpLateralCacheAttributes;
080
081    /** Listening port */
082    private int port;
083
084    /** The processor. We should probably use an event queue here. */
085    private ExecutorService pooledExecutor;
086
087    /** put count */
088    private int putCnt = 0;
089
090    /** remove count */
091    private int removeCnt = 0;
092
093    /** get count */
094    private int getCnt = 0;
095
096    /**
097     * Use the vmid by default. This can be set for testing. If we ever need to run more than one
098     * per vm, then we need a new technique.
099     */
100    private long listenerId = CacheInfo.listenerId;
101
102    /** is this shut down? */
103    private AtomicBoolean shutdown;
104
105    /** is this terminated? */
106    private AtomicBoolean terminated;
107
108    /**
109     * Gets the instance attribute of the LateralCacheTCPListener class.
110     * <p>
111     * @param ilca ITCPLateralCacheAttributes
112     * @param cacheMgr
113     * @return The instance value
114     */
115    public synchronized static <K, V> LateralTCPListener<K, V>
116        getInstance( ITCPLateralCacheAttributes ilca, ICompositeCacheManager cacheMgr )
117    {
118        @SuppressWarnings("unchecked") // Need to cast because of common map for all instances
119        LateralTCPListener<K, V> ins = (LateralTCPListener<K, V>) instances.get( String.valueOf( ilca.getTcpListenerPort() ) );
120
121        if ( ins == null )
122        {
123            ins = new LateralTCPListener<K, V>( ilca );
124
125            ins.init();
126            ins.setCacheManager( cacheMgr );
127
128            instances.put( String.valueOf( ilca.getTcpListenerPort() ), ins );
129
130            if ( log.isInfoEnabled() )
131            {
132                log.info( "Created new listener " + ilca.getTcpListenerPort() );
133            }
134        }
135
136        return ins;
137    }
138
139    /**
140     * Only need one since it does work for all regions, just reference by multiple region names.
141     * <p>
142     * @param ilca
143     */
144    protected LateralTCPListener( ITCPLateralCacheAttributes ilca )
145    {
146        this.setTcpLateralCacheAttributes( ilca );
147    }
148
149    /**
150     * This starts the ListenerThread on the specified port.
151     */
152    @Override
153    public synchronized void init()
154    {
155        try
156        {
157            this.port = getTcpLateralCacheAttributes().getTcpListenerPort();
158
159            pooledExecutor = Executors.newCachedThreadPool(
160                    new DaemonThreadFactory("JCS-LateralTCPListener-"));
161            terminated = new AtomicBoolean(false);
162            shutdown = new AtomicBoolean(false);
163
164            log.info( "Listening on port " + port );
165
166            ServerSocket serverSocket = new ServerSocket( port );
167            serverSocket.setSoTimeout( acceptTimeOut );
168
169            receiver = new ListenerThread(serverSocket);
170            receiver.setDaemon( true );
171            receiver.start();
172        }
173        catch ( IOException ex )
174        {
175            throw new IllegalStateException( ex );
176        }
177    }
178
179    /**
180     * Let the lateral cache set a listener_id. Since there is only one listener for all the
181     * regions and every region gets registered? the id shouldn't be set if it isn't zero. If it is
182     * we assume that it is a reconnect.
183     * <p>
184     * By default, the listener id is the vmid.
185     * <p>
186     * The service should set this value. This value will never be changed by a server we connect
187     * to. It needs to be non static, for unit tests.
188     * <p>
189     * The service will use the value it sets in all send requests to the sender.
190     * <p>
191     * @param id The new listenerId value
192     * @throws IOException
193     */
194    @Override
195    public void setListenerId( long id )
196        throws IOException
197    {
198        this.listenerId = id;
199        if ( log.isDebugEnabled() )
200        {
201            log.debug( "set listenerId = " + id );
202        }
203    }
204
205    /**
206     * Gets the listenerId attribute of the LateralCacheTCPListener object
207     * <p>
208     * @return The listenerId value
209     * @throws IOException
210     */
211    @Override
212    public long getListenerId()
213        throws IOException
214    {
215        return this.listenerId;
216    }
217
218    /**
219     * Increments the put count. Gets the cache that was injected by the lateral factory. Calls put
220     * on the cache.
221     * <p>
222     * @see org.apache.commons.jcs.engine.behavior.ICacheListener#handlePut(org.apache.commons.jcs.engine.behavior.ICacheElement)
223     */
224    @Override
225    public void handlePut( ICacheElement<K, V> element )
226        throws IOException
227    {
228        putCnt++;
229        if ( log.isInfoEnabled() )
230        {
231            if ( getPutCnt() % 100 == 0 )
232            {
233                log.info( "Put Count (port " + getTcpLateralCacheAttributes().getTcpListenerPort() + ") = "
234                    + getPutCnt() );
235            }
236        }
237
238        if ( log.isDebugEnabled() )
239        {
240            log.debug( "handlePut> cacheName=" + element.getCacheName() + ", key=" + element.getKey() );
241        }
242
243        getCache( element.getCacheName() ).localUpdate( element );
244    }
245
246    /**
247     * Increments the remove count. Gets the cache that was injected by the lateral factory. Calls
248     * remove on the cache.
249     * <p>
250     * @see org.apache.commons.jcs.engine.behavior.ICacheListener#handleRemove(java.lang.String,
251     *      Object)
252     */
253    @Override
254    public void handleRemove( String cacheName, K key )
255        throws IOException
256    {
257        removeCnt++;
258        if ( log.isInfoEnabled() )
259        {
260            if ( getRemoveCnt() % 100 == 0 )
261            {
262                log.info( "Remove Count = " + getRemoveCnt() );
263            }
264        }
265
266        if ( log.isDebugEnabled() )
267        {
268            log.debug( "handleRemove> cacheName=" + cacheName + ", key=" + key );
269        }
270
271        getCache( cacheName ).localRemove( key );
272    }
273
274    /**
275     * Gets the cache that was injected by the lateral factory. Calls removeAll on the cache.
276     * <p>
277     * @see org.apache.commons.jcs.engine.behavior.ICacheListener#handleRemoveAll(java.lang.String)
278     */
279    @Override
280    public void handleRemoveAll( String cacheName )
281        throws IOException
282    {
283        if ( log.isDebugEnabled() )
284        {
285            log.debug( "handleRemoveAll> cacheName=" + cacheName );
286        }
287
288        getCache( cacheName ).localRemoveAll();
289    }
290
291    /**
292     * Gets the cache that was injected by the lateral factory. Calls get on the cache.
293     * <p>
294     * @param cacheName
295     * @param key
296     * @return a ICacheElement
297     * @throws IOException
298     */
299    public ICacheElement<K, V> handleGet( String cacheName, K key )
300        throws IOException
301    {
302        getCnt++;
303        if ( log.isInfoEnabled() )
304        {
305            if ( getGetCnt() % 100 == 0 )
306            {
307                log.info( "Get Count (port " + getTcpLateralCacheAttributes().getTcpListenerPort() + ") = "
308                    + getGetCnt() );
309            }
310        }
311
312        if ( log.isDebugEnabled() )
313        {
314            log.debug( "handleGet> cacheName=" + cacheName + ", key = " + key );
315        }
316
317        return getCache( cacheName ).localGet( key );
318    }
319
320    /**
321     * Gets the cache that was injected by the lateral factory. Calls get on the cache.
322     * <p>
323     * @param cacheName the name of the cache
324     * @param pattern the matching pattern
325     * @return Map
326     * @throws IOException
327     */
328    public Map<K, ICacheElement<K, V>> handleGetMatching( String cacheName, String pattern )
329        throws IOException
330    {
331        getCnt++;
332        if ( log.isInfoEnabled() )
333        {
334            if ( getGetCnt() % 100 == 0 )
335            {
336                log.info( "GetMatching Count (port " + getTcpLateralCacheAttributes().getTcpListenerPort() + ") = "
337                    + getGetCnt() );
338            }
339        }
340
341        if ( log.isDebugEnabled() )
342        {
343            log.debug( "handleGetMatching> cacheName=" + cacheName + ", pattern = " + pattern );
344        }
345
346        return getCache( cacheName ).localGetMatching( pattern );
347    }
348
349    /**
350     * Gets the cache that was injected by the lateral factory. Calls getKeySet on the cache.
351     * <p>
352     * @param cacheName the name of the cache
353     * @return a set of keys
354     * @throws IOException
355     */
356    public Set<K> handleGetKeySet( String cacheName ) throws IOException
357    {
358        return getCache( cacheName ).getKeySet(true);
359    }
360
361    /**
362     * This marks this instance as terminated.
363     * <p>
364     * @see org.apache.commons.jcs.engine.behavior.ICacheListener#handleDispose(java.lang.String)
365     */
366    @Override
367    public void handleDispose( String cacheName )
368        throws IOException
369    {
370        if ( log.isInfoEnabled() )
371        {
372            log.info( "handleDispose > cacheName=" + cacheName + " | Ignoring message.  Do not dispose from remote." );
373        }
374
375        // TODO handle active deregistration, rather than passive detection
376        terminated.set(true);
377    }
378
379    @Override
380    public synchronized void dispose()
381    {
382        terminated.set(true);
383        notify();
384
385        pooledExecutor.shutdownNow();
386    }
387
388    /**
389     * Gets the cacheManager attribute of the LateralCacheTCPListener object.
390     * <p>
391     * Normally this is set by the factory. If it wasn't set the listener defaults to the expected
392     * singleton behavior of the cache manager.
393     * <p>
394     * @param name
395     * @return CompositeCache
396     */
397    protected CompositeCache<K, V> getCache( String name )
398    {
399        if ( getCacheManager() == null )
400        {
401            // revert to singleton on failure
402            try
403            {
404                setCacheManager( CompositeCacheManager.getInstance() );
405            }
406            catch (CacheException e)
407            {
408                throw new RuntimeException("Could not retrieve cache manager instance", e);
409            }
410
411            if ( log.isDebugEnabled() )
412            {
413                log.debug( "cacheMgr = " + getCacheManager() );
414            }
415        }
416
417        return getCacheManager().getCache( name );
418    }
419
420    /**
421     * This is roughly the number of updates the lateral has received.
422     * <p>
423     * @return Returns the putCnt.
424     */
425    public int getPutCnt()
426    {
427        return putCnt;
428    }
429
430    /**
431     * @return Returns the getCnt.
432     */
433    public int getGetCnt()
434    {
435        return getCnt;
436    }
437
438    /**
439     * @return Returns the removeCnt.
440     */
441    public int getRemoveCnt()
442    {
443        return removeCnt;
444    }
445
446    /**
447     * @param cacheMgr The cacheMgr to set.
448     */
449    @Override
450    public void setCacheManager( ICompositeCacheManager cacheMgr )
451    {
452        this.cacheManager = cacheMgr;
453    }
454
455    /**
456     * @return Returns the cacheMgr.
457     */
458    @Override
459    public ICompositeCacheManager getCacheManager()
460    {
461        return cacheManager;
462    }
463
464    /**
465     * @param tcpLateralCacheAttributes The tcpLateralCacheAttributes to set.
466     */
467    public void setTcpLateralCacheAttributes( ITCPLateralCacheAttributes tcpLateralCacheAttributes )
468    {
469        this.tcpLateralCacheAttributes = tcpLateralCacheAttributes;
470    }
471
472    /**
473     * @return Returns the tcpLateralCacheAttributes.
474     */
475    public ITCPLateralCacheAttributes getTcpLateralCacheAttributes()
476    {
477        return tcpLateralCacheAttributes;
478    }
479
480    /**
481     * Processes commands from the server socket. There should be one listener for each configured
482     * TCP lateral.
483     */
484    public class ListenerThread
485        extends Thread
486    {
487        /** The socket listener */
488        private final ServerSocket serverSocket;
489
490        /**
491         * Constructor
492         *
493         * @param serverSocket
494         */
495        public ListenerThread(ServerSocket serverSocket)
496        {
497            super();
498            this.serverSocket = serverSocket;
499        }
500
501        /** Main processing method for the ListenerThread object */
502        @SuppressWarnings("synthetic-access")
503        @Override
504        public void run()
505        {
506            try
507            {
508                ConnectionHandler handler;
509
510                outer: while ( true )
511                {
512                    if ( log.isDebugEnabled() )
513                    {
514                        log.debug( "Waiting for clients to connect " );
515                    }
516
517                    Socket socket = null;
518                    inner: while (true)
519                    {
520                        // Check to see if we've been asked to exit, and exit
521                        if (terminated.get())
522                        {
523                            if (log.isDebugEnabled())
524                            {
525                                log.debug("Thread terminated, exiting gracefully");
526                            }
527                            break outer;
528                        }
529
530                        try
531                        {
532                            socket = serverSocket.accept();
533                            break inner;
534                        }
535                        catch (SocketTimeoutException e)
536                        {
537                            // No problem! We loop back up!
538                            continue inner;
539                        }
540                    }
541
542                    if ( socket != null && log.isDebugEnabled() )
543                    {
544                        InetAddress inetAddress = socket.getInetAddress();
545                        log.debug( "Connected to client at " + inetAddress );
546                    }
547
548                    handler = new ConnectionHandler( socket );
549                    pooledExecutor.execute( handler );
550                }
551            }
552            catch ( IOException e )
553            {
554                log.error( "Exception caught in TCP listener", e );
555            }
556            finally
557            {
558                if (serverSocket != null)
559                {
560                        try
561                        {
562                                                serverSocket.close();
563                                        }
564                        catch (IOException e)
565                        {
566                        log.error( "Exception caught closing socket", e );
567                                        }
568                }
569            }
570        }
571    }
572
573    /**
574     * A Separate thread that runs when a command comes into the LateralTCPReceiver.
575     */
576    public class ConnectionHandler
577        implements Runnable
578    {
579        /** The socket connection, passed in via constructor */
580        private final Socket socket;
581
582        /**
583         * Construct for a given socket
584         * @param socket
585         */
586        public ConnectionHandler( Socket socket )
587        {
588            this.socket = socket;
589        }
590
591        /**
592         * Main processing method for the LateralTCPReceiverConnection object
593         */
594        @Override
595        @SuppressWarnings({"unchecked", // Need to cast from Object
596            "synthetic-access" })
597        public void run()
598        {
599            ObjectInputStream ois;
600
601            try
602            {
603                ois = new ObjectInputStreamClassLoaderAware( socket.getInputStream(), null );
604            }
605            catch ( Exception e )
606            {
607                log.error( "Could not open ObjectInputStream on " + socket, e );
608
609                return;
610            }
611
612            LateralElementDescriptor<K, V> led;
613
614            try
615            {
616                while ( true )
617                {
618                    led = (LateralElementDescriptor<K, V>) ois.readObject();
619
620                    if ( led == null )
621                    {
622                        log.debug( "LateralElementDescriptor is null" );
623                        continue;
624                    }
625                    if ( led.requesterId == getListenerId() )
626                    {
627                        log.debug( "from self" );
628                    }
629                    else
630                    {
631                        if ( log.isDebugEnabled() )
632                        {
633                            log.debug( "receiving LateralElementDescriptor from another" + "led = " + led
634                                + ", led.command = " + led.command + ", led.ce = " + led.ce );
635                        }
636
637                        handle( led );
638                    }
639                }
640            }
641            catch ( EOFException e )
642            {
643                log.info( "Caught java.io.EOFException closing connection." + e.getMessage() );
644            }
645            catch ( SocketException e )
646            {
647                log.info( "Caught java.net.SocketException closing connection." + e.getMessage() );
648            }
649            catch ( Exception e )
650            {
651                log.error( "Unexpected exception.", e );
652            }
653
654            try
655            {
656                ois.close();
657            }
658            catch ( IOException e )
659            {
660                log.error( "Could not close object input stream.", e );
661            }
662        }
663
664        /**
665         * This calls the appropriate method, based on the command sent in the Lateral element
666         * descriptor.
667         * <p>
668         * @param led
669         * @throws IOException
670         */
671        @SuppressWarnings("synthetic-access")
672        private void handle( LateralElementDescriptor<K, V> led )
673            throws IOException
674        {
675            String cacheName = led.ce.getCacheName();
676            K key = led.ce.getKey();
677            Serializable obj = null;
678
679            switch (led.command)
680            {
681                case UPDATE:
682                    handlePut( led.ce );
683                    break;
684
685                case REMOVE:
686                    // if a hashcode was given and filtering is on
687                    // check to see if they are the same
688                    // if so, then don't remove, otherwise issue a remove
689                    if ( led.valHashCode != -1 )
690                    {
691                        if ( getTcpLateralCacheAttributes().isFilterRemoveByHashCode() )
692                        {
693                            ICacheElement<K, V> test = getCache( cacheName ).localGet( key );
694                            if ( test != null )
695                            {
696                                if ( test.getVal().hashCode() == led.valHashCode )
697                                {
698                                    if ( log.isDebugEnabled() )
699                                    {
700                                        log.debug( "Filtering detected identical hashCode [" + led.valHashCode
701                                            + "], not issuing a remove for led " + led );
702                                    }
703                                    return;
704                                }
705                                else
706                                {
707                                    if ( log.isDebugEnabled() )
708                                    {
709                                        log.debug( "Different hashcodes, in cache [" + test.getVal().hashCode()
710                                            + "] sent [" + led.valHashCode + "]" );
711                                    }
712                                }
713                            }
714                        }
715                    }
716                    handleRemove( cacheName, key );
717                    break;
718
719                case REMOVEALL:
720                    handleRemoveAll( cacheName );
721                    break;
722
723                case GET:
724                    obj = handleGet( cacheName, key );
725                    break;
726
727                case GET_MATCHING:
728                    obj = (Serializable) handleGetMatching( cacheName, (String) key );
729                    break;
730
731                case GET_KEYSET:
732                        obj = (Serializable) handleGetKeySet(cacheName);
733                    break;
734
735                default: break;
736            }
737
738            if (obj != null)
739            {
740                ObjectOutputStream oos = new ObjectOutputStream( socket.getOutputStream() );
741                oos.writeObject( obj );
742                oos.flush();
743            }
744        }
745    }
746
747    /**
748     * Shuts down the receiver.
749     */
750    @Override
751    public void shutdown()
752    {
753        if ( shutdown.compareAndSet(false, true) )
754        {
755            if ( log.isInfoEnabled() )
756            {
757                log.info( "Shutting down TCP Lateral receiver." );
758            }
759
760            receiver.interrupt();
761        }
762        else
763        {
764            if ( log.isDebugEnabled() )
765            {
766                log.debug( "Shutdown already called." );
767            }
768        }
769    }
770}