001    package org.apache.jcs.auxiliary.lateral.socket.tcp;
002    
003    /*
004     * Licensed to the Apache Software Foundation (ASF) under one
005     * or more contributor license agreements.  See the NOTICE file
006     * distributed with this work for additional information
007     * regarding copyright ownership.  The ASF licenses this file
008     * to you under the Apache License, Version 2.0 (the
009     * "License"); you may not use this file except in compliance
010     * with the License.  You may obtain a copy of the License at
011     *
012     *   http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing,
015     * software distributed under the License is distributed on an
016     * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017     * KIND, either express or implied.  See the License for the
018     * specific language governing permissions and limitations
019     * under the License.
020     */
021    
022    import java.io.IOException;
023    import java.io.ObjectInputStream;
024    import java.io.ObjectOutputStream;
025    import java.io.Serializable;
026    import java.net.InetAddress;
027    import java.net.ServerSocket;
028    import java.net.Socket;
029    import java.net.SocketTimeoutException;
030    import java.util.HashMap;
031    import java.util.Map;
032    import java.util.Set;
033    import java.util.concurrent.ExecutorService;
034    import java.util.concurrent.Executors;
035    import java.util.concurrent.ThreadFactory;
036    
037    import org.apache.commons.logging.Log;
038    import org.apache.commons.logging.LogFactory;
039    import org.apache.jcs.access.exception.CacheException;
040    import org.apache.jcs.auxiliary.lateral.LateralCacheInfo;
041    import org.apache.jcs.auxiliary.lateral.LateralCommand;
042    import org.apache.jcs.auxiliary.lateral.LateralElementDescriptor;
043    import org.apache.jcs.auxiliary.lateral.behavior.ILateralCacheListener;
044    import org.apache.jcs.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
045    import org.apache.jcs.engine.behavior.ICacheElement;
046    import org.apache.jcs.engine.behavior.ICompositeCacheManager;
047    import org.apache.jcs.engine.behavior.IShutdownObserver;
048    import org.apache.jcs.engine.control.CompositeCache;
049    import org.apache.jcs.engine.control.CompositeCacheManager;
050    
051    /**
052     * Listens for connections from other TCP lateral caches and handles them. The initialization method
053     * starts a listening thread, which creates a socket server. When messages are received they are
054     * passed to a pooled executor which then calls the appropriate handle method.
055     */
056    public class LateralTCPListener<K extends Serializable, V extends Serializable>
057        implements ILateralCacheListener<K, V>, Serializable, IShutdownObserver
058    {
059        /** Don't change. */
060        private static final long serialVersionUID = -9107062664967131738L;
061    
062        /** The logger */
063        protected final static Log log = LogFactory.getLog( LateralTCPListener.class );
064    
065        /** How long the server will block on an accept(). 0 is infinite. */
066        private final static int acceptTimeOut = 1000;
067    
068        /** The CacheHub this listener is associated with */
069        private transient ICompositeCacheManager cacheManager;
070    
071        /** Map of available instances, keyed by port */
072        protected final static HashMap<String, ILateralCacheListener<?, ?>> instances =
073            new HashMap<String, ILateralCacheListener<?, ?>>();
074    
075        /** The socket listener */
076        private ListenerThread receiver;
077    
078        /** Configuration attributes */
079        private ITCPLateralCacheAttributes tcpLateralCacheAttributes;
080    
081        /** Listening port */
082        protected int port;
083    
084        /** The processor. We should probably use an event queue here. */
085        protected ExecutorService pooledExecutor;
086    
087        /** put count */
088        private int putCnt = 0;
089    
090        /** remove count */
091        private int removeCnt = 0;
092    
093        /** get count */
094        private int getCnt = 0;
095    
096        /**
097         * Use the vmid by default. This can be set for testing. If we ever need to run more than one
098         * per vm, then we need a new technique.
099         */
100        private long listenerId = LateralCacheInfo.listenerId;
101    
102        /** is this shut down? */
103        protected boolean shutdown = false;
104    
105        /** is this terminated? */
106        protected boolean terminated = false;
107    
108        /**
109         * Gets the instance attribute of the LateralCacheTCPListener class.
110         * <p>
111         * @param ilca ITCPLateralCacheAttributes
112         * @param cacheMgr
113         * @return The instance value
114         */
115        public synchronized static <K extends Serializable, V extends Serializable> LateralTCPListener<K, V>
116            getInstance( ITCPLateralCacheAttributes ilca, ICompositeCacheManager cacheMgr )
117        {
118            @SuppressWarnings("unchecked") // Need to cast because of common map for all instances
119            LateralTCPListener<K, V> ins = (LateralTCPListener<K, V>) instances.get( String.valueOf( ilca.getTcpListenerPort() ) );
120    
121            if ( ins == null )
122            {
123                ins = new LateralTCPListener<K, V>( ilca );
124    
125                ins.init();
126    
127                ins.setCacheManager( cacheMgr );
128    
129                instances.put( String.valueOf( ilca.getTcpListenerPort() ), ins );
130    
131                if ( log.isInfoEnabled() )
132                {
133                    log.info( "Created new listener " + ilca.getTcpListenerPort() );
134                }
135            }
136    
137            return ins;
138        }
139    
140        /**
141         * Only need one since it does work for all regions, just reference by multiple region names.
142         * <p>
143         * @param ilca
144         */
145        protected LateralTCPListener( ITCPLateralCacheAttributes ilca )
146        {
147            this.setTcpLateralCacheAttributes( ilca );
148        }
149    
150        /**
151         * This starts the ListenerThread on the specified port.
152         */
153        public void init()
154        {
155            try
156            {
157                this.port = getTcpLateralCacheAttributes().getTcpListenerPort();
158    
159                receiver = new ListenerThread();
160                receiver.setDaemon( true );
161                receiver.start();
162    
163                pooledExecutor = Executors.newCachedThreadPool(new MyThreadFactory());
164                terminated = false;
165                shutdown = false;
166            }
167            catch ( Exception ex )
168            {
169                log.error( ex );
170                throw new IllegalStateException( ex.getMessage() );
171            }
172        }
173    
174        /**
175         * Let the lateral cache set a listener_id. Since there is only one listener for all the
176         * regions and every region gets registered? the id shouldn't be set if it isn't zero. If it is
177         * we assume that it is a reconnect.
178         * <p>
179         * By default, the listener id is the vmid.
180         * <p>
181         * The service should set this value. This value will never be changed by a server we connect
182         * to. It needs to be non static, for unit tests.
183         * <p>
184         * The service will use the value it sets in all send requests to the sender.
185         * <p>
186         * @param id The new listenerId value
187         * @throws IOException
188         */
189        public void setListenerId( long id )
190            throws IOException
191        {
192            this.listenerId = id;
193            if ( log.isDebugEnabled() )
194            {
195                log.debug( "set listenerId = " + id );
196            }
197        }
198    
199        /**
200         * Gets the listenerId attribute of the LateralCacheTCPListener object
201         * <p>
202         * @return The listenerId value
203         * @throws IOException
204         */
205        public long getListenerId()
206            throws IOException
207        {
208            return this.listenerId;
209        }
210    
211        /**
212         * Increments the put count. Gets the cache that was injected by the lateral factory. Calls put
213         * on the cache.
214         * <p>
215         * @see org.apache.jcs.engine.behavior.ICacheListener#handlePut(org.apache.jcs.engine.behavior.ICacheElement)
216         */
217        public void handlePut( ICacheElement<K, V> element )
218            throws IOException
219        {
220            putCnt++;
221            if ( log.isInfoEnabled() )
222            {
223                if ( getPutCnt() % 100 == 0 )
224                {
225                    log.info( "Put Count (port " + getTcpLateralCacheAttributes().getTcpListenerPort() + ") = "
226                        + getPutCnt() );
227                }
228            }
229    
230            if ( log.isDebugEnabled() )
231            {
232                log.debug( "handlePut> cacheName=" + element.getCacheName() + ", key=" + element.getKey() );
233            }
234    
235            getCache( element.getCacheName() ).localUpdate( element );
236        }
237    
238        /**
239         * Increments the remove count. Gets the cache that was injected by the lateral factory. Calls
240         * remove on the cache.
241         * <p>
242         * @see org.apache.jcs.engine.behavior.ICacheListener#handleRemove(java.lang.String,
243         *      java.io.Serializable)
244         */
245        public void handleRemove( String cacheName, K key )
246            throws IOException
247        {
248            removeCnt++;
249            if ( log.isInfoEnabled() )
250            {
251                if ( getRemoveCnt() % 100 == 0 )
252                {
253                    log.info( "Remove Count = " + getRemoveCnt() );
254                }
255            }
256    
257            if ( log.isDebugEnabled() )
258            {
259                log.debug( "handleRemove> cacheName=" + cacheName + ", key=" + key );
260            }
261    
262            getCache( cacheName ).localRemove( key );
263        }
264    
265        /**
266         * Gets the cache that was injected by the lateral factory. Calls removeAll on the cache.
267         * <p>
268         * @see org.apache.jcs.engine.behavior.ICacheListener#handleRemoveAll(java.lang.String)
269         */
270        public void handleRemoveAll( String cacheName )
271            throws IOException
272        {
273            if ( log.isDebugEnabled() )
274            {
275                log.debug( "handleRemoveAll> cacheName=" + cacheName );
276            }
277    
278            getCache( cacheName ).localRemoveAll();
279        }
280    
281        /**
282         * Gets the cache that was injected by the lateral factory. Calls get on the cache.
283         * <p>
284         * @param cacheName
285         * @param key
286         * @return a ICacheElement
287         * @throws IOException
288         */
289        public ICacheElement<K, V> handleGet( String cacheName, K key )
290            throws IOException
291        {
292            getCnt++;
293            if ( log.isInfoEnabled() )
294            {
295                if ( getGetCnt() % 100 == 0 )
296                {
297                    log.info( "Get Count (port " + getTcpLateralCacheAttributes().getTcpListenerPort() + ") = "
298                        + getGetCnt() );
299                }
300            }
301    
302            if ( log.isDebugEnabled() )
303            {
304                log.debug( "handleGet> cacheName=" + cacheName + ", key = " + key );
305            }
306    
307            return getCache( cacheName ).localGet( key );
308        }
309    
310        /**
311         * Gets the cache that was injected by the lateral factory. Calls get on the cache.
312         * <p>
313         * @param cacheName the name of the cache
314         * @param pattern the matching pattern
315         * @return Map
316         * @throws IOException
317         */
318        public Map<K, ICacheElement<K, V>> handleGetMatching( String cacheName, String pattern )
319            throws IOException
320        {
321            getCnt++;
322            if ( log.isInfoEnabled() )
323            {
324                if ( getGetCnt() % 100 == 0 )
325                {
326                    log.info( "GetMatching Count (port " + getTcpLateralCacheAttributes().getTcpListenerPort() + ") = "
327                        + getGetCnt() );
328                }
329            }
330    
331            if ( log.isDebugEnabled() )
332            {
333                log.debug( "handleGetMatching> cacheName=" + cacheName + ", pattern = " + pattern );
334            }
335    
336            return getCache( cacheName ).localGetMatching( pattern );
337        }
338    
339        /**
340         * Gets the cache that was injected by the lateral factory. Calls getGroupKeys on the cache.
341         * <p>
342         * @param cacheName the name of the cache
343         * @param group the group name
344         * @return a set of keys
345         * @throws IOException
346         */
347        public Set<K> handleGetGroupKeys( String cacheName, String group ) throws IOException
348        {
349            return getCache( cacheName ).getGroupKeys(group, true);
350        }
351    
352        /**
353         * Gets the cache that was injected by the lateral factory. Calls getGroupNames on the cache.
354         * <p>
355         * @param cacheName the name of the cache
356         * @return a set of group names
357         * @throws IOException
358         */
359        public Set<String> handleGetGroupNames( String cacheName ) throws IOException
360        {
361            return getCache( cacheName ).getGroupNames(true);
362        }
363    
364        /**
365         * This marks this instance as terminated.
366         * <p>
367         * @see org.apache.jcs.engine.behavior.ICacheListener#handleDispose(java.lang.String)
368         */
369        public void handleDispose( String cacheName )
370            throws IOException
371        {
372            if ( log.isInfoEnabled() )
373            {
374                log.info( "handleDispose > cacheName=" + cacheName + " | Ignoring message.  Do not dispose from remote." );
375            }
376    
377            // TODO handle active deregistration, rather than passive detection
378            synchronized (this)
379            {
380                terminated = true;
381            }
382        }
383    
384        public synchronized void dispose()
385        {
386            terminated = true;
387            notify();
388        }
389    
390        /**
391         * Gets the cacheManager attribute of the LateralCacheTCPListener object.
392         * <p>
393         * Normally this is set by the factory. If it wasn't set the listener defaults to the expected
394         * singleton behavior of the cache manager.
395         * <p>
396         * @param name
397         * @return CompositeCache
398         */
399        protected CompositeCache<K, V> getCache( String name )
400        {
401            if ( getCacheManager() == null )
402            {
403                // revert to singleton on failure
404                try
405                {
406                    setCacheManager( CompositeCacheManager.getInstance() );
407                }
408                catch (CacheException e)
409                {
410                    throw new RuntimeException("Could not retrieve cache manager instance", e);
411                }
412    
413                if ( log.isDebugEnabled() )
414                {
415                    log.debug( "cacheMgr = " + getCacheManager() );
416                }
417            }
418    
419            return getCacheManager().getCache( name );
420        }
421    
422        /**
423         * This is roughly the number of updates the lateral has received.
424         * <p>
425         * @return Returns the putCnt.
426         */
427        public int getPutCnt()
428        {
429            return putCnt;
430        }
431    
432        /**
433         * @return Returns the getCnt.
434         */
435        public int getGetCnt()
436        {
437            return getCnt;
438        }
439    
440        /**
441         * @return Returns the removeCnt.
442         */
443        public int getRemoveCnt()
444        {
445            return removeCnt;
446        }
447    
448        /**
449         * @param cacheMgr The cacheMgr to set.
450         */
451        public void setCacheManager( ICompositeCacheManager cacheMgr )
452        {
453            this.cacheManager = cacheMgr;
454        }
455    
456        /**
457         * @return Returns the cacheMgr.
458         */
459        public ICompositeCacheManager getCacheManager()
460        {
461            return cacheManager;
462        }
463    
464        /**
465         * @param tcpLateralCacheAttributes The tcpLateralCacheAttributes to set.
466         */
467        public void setTcpLateralCacheAttributes( ITCPLateralCacheAttributes tcpLateralCacheAttributes )
468        {
469            this.tcpLateralCacheAttributes = tcpLateralCacheAttributes;
470        }
471    
472        /**
473         * @return Returns the tcpLateralCacheAttributes.
474         */
475        public ITCPLateralCacheAttributes getTcpLateralCacheAttributes()
476        {
477            return tcpLateralCacheAttributes;
478        }
479    
480        /**
481         * Processes commands from the server socket. There should be one listener for each configured
482         * TCP lateral.
483         */
484        public class ListenerThread
485            extends Thread
486        {
487            /** Main processing method for the ListenerThread object */
488            @Override
489            public void run()
490            {
491                try
492                {
493                    log.info( "Listening on port " + port );
494    
495                    ServerSocket serverSocket = new ServerSocket( port );
496                    serverSocket.setSoTimeout( acceptTimeOut );
497    
498                    ConnectionHandler handler;
499    
500                    outer: while ( true )
501                    {
502                        if ( log.isDebugEnabled() )
503                        {
504                            log.debug( "Waiting for clients to connect " );
505                        }
506    
507                        Socket socket = null;
508                        inner: while (true)
509                        {
510                            // Check to see if we've been asked to exit, and exit
511                            synchronized (this)
512                            {
513                                if (terminated)
514                                {
515                                    if (log.isDebugEnabled())
516                                    {
517                                        log.debug("Thread terminated, exiting gracefully");
518                                    }
519                                    break outer;
520                                }
521                            }
522                            try
523                            {
524                                socket = serverSocket.accept();
525                                break inner;
526                            }
527                            catch (SocketTimeoutException e)
528                            {
529                                // No problem! We loop back up!
530                                continue inner;
531                            }
532                        }
533    
534                        if ( socket != null && log.isDebugEnabled() )
535                        {
536                            InetAddress inetAddress = socket.getInetAddress();
537                            log.debug( "Connected to client at " + inetAddress );
538                        }
539    
540                        handler = new ConnectionHandler( socket );
541                        pooledExecutor.execute( handler );
542                    }
543                }
544                catch ( Exception e )
545                {
546                    log.error( "Exception caught in TCP listener", e );
547                }
548            }
549        }
550    
551        /**
552         * A Separate thread that runs when a command comes into the LateralTCPReceiver.
553         */
554        public class ConnectionHandler
555            implements Runnable
556        {
557            /** The socket connection, passed in via constructor */
558            private final Socket socket;
559    
560            /**
561             * Construct for a given socket
562             * @param socket
563             */
564            public ConnectionHandler( Socket socket )
565            {
566                this.socket = socket;
567            }
568    
569            /**
570             * Main processing method for the LateralTCPReceiverConnection object
571             */
572            @SuppressWarnings("unchecked") // Nee to cast from Object
573            public void run()
574            {
575                ObjectInputStream ois;
576    
577                try
578                {
579                    ois = new ObjectInputStream( socket.getInputStream() );
580                }
581                catch ( Exception e )
582                {
583                    log.error( "Could not open ObjectInputStream on " + socket, e );
584    
585                    return;
586                }
587    
588                LateralElementDescriptor<K, V> led;
589    
590                try
591                {
592                    while ( true )
593                    {
594                        led = (LateralElementDescriptor<K, V>) ois.readObject();
595    
596                        if ( led == null )
597                        {
598                            log.debug( "LateralElementDescriptor is null" );
599                            continue;
600                        }
601                        if ( led.requesterId == getListenerId() )
602                        {
603                            log.debug( "from self" );
604                        }
605                        else
606                        {
607                            if ( log.isDebugEnabled() )
608                            {
609                                log.debug( "receiving LateralElementDescriptor from another" + "led = " + led
610                                    + ", led.command = " + led.command + ", led.ce = " + led.ce );
611                            }
612    
613                            handle( led );
614                        }
615                    }
616                }
617                catch ( java.io.EOFException e )
618                {
619                    log.info( "Caught java.io.EOFException closing connection." + e.getMessage() );
620                }
621                catch ( java.net.SocketException e )
622                {
623                    log.info( "Caught java.net.SocketException closing connection." + e.getMessage() );
624                }
625                catch ( Exception e )
626                {
627                    log.error( "Unexpected exception.", e );
628                }
629    
630                try
631                {
632                    ois.close();
633                }
634                catch ( IOException e )
635                {
636                    log.error( "Could not close object input stream.", e );
637                }
638            }
639    
640            /**
641             * This calls the appropriate method, based on the command sent in the Lateral element
642             * descriptor.
643             * <p>
644             * @param led
645             * @throws IOException
646             */
647            private void handle( LateralElementDescriptor<K, V> led )
648                throws IOException
649            {
650                String cacheName = led.ce.getCacheName();
651                K key = led.ce.getKey();
652    
653                if ( led.command == LateralCommand.UPDATE )
654                {
655                    handlePut( led.ce );
656                }
657                else if ( led.command == LateralCommand.REMOVE )
658                {
659                    // if a hashcode was given and filtering is on
660                    // check to see if they are the same
661                    // if so, then don't remvoe, otherwise issue a remove
662                    if ( led.valHashCode != -1 )
663                    {
664                        if ( getTcpLateralCacheAttributes().isFilterRemoveByHashCode() )
665                        {
666                            ICacheElement<K, V> test = getCache( cacheName ).localGet( key );
667                            if ( test != null )
668                            {
669                                if ( test.getVal().hashCode() == led.valHashCode )
670                                {
671                                    if ( log.isDebugEnabled() )
672                                    {
673                                        log.debug( "Filtering detected identical hashCode [" + led.valHashCode
674                                            + "], not issuing a remove for led " + led );
675                                    }
676                                    return;
677                                }
678                                else
679                                {
680                                    if ( log.isDebugEnabled() )
681                                    {
682                                        log.debug( "Different hashcodes, in cache [" + test.getVal().hashCode()
683                                            + "] sent [" + led.valHashCode + "]" );
684                                    }
685                                }
686                            }
687                        }
688                    }
689                    handleRemove( cacheName, key );
690                }
691                else if ( led.command == LateralCommand.REMOVEALL )
692                {
693                    handleRemoveAll( cacheName );
694                }
695                else if ( led.command == LateralCommand.GET )
696                {
697                    Serializable obj = handleGet( cacheName, key );
698    
699                    ObjectOutputStream oos = new ObjectOutputStream( socket.getOutputStream() );
700                    oos.writeObject( obj );
701                    oos.flush();
702                }
703                else if ( led.command == LateralCommand.GET_MATCHING )
704                {
705                    Map<K, ICacheElement<K, V>> obj = handleGetMatching( cacheName, (String) key );
706    
707                    ObjectOutputStream oos = new ObjectOutputStream( socket.getOutputStream() );
708                    oos.writeObject( obj );
709                    oos.flush();
710                }
711                else if ( led.command == LateralCommand.GET_GROUP_KEYS )
712                {
713                    String groupName = (String) key;
714                    Set<K> obj = handleGetGroupKeys(cacheName, groupName);
715    
716                    ObjectOutputStream oos = new ObjectOutputStream( socket.getOutputStream() );
717                    oos.writeObject( obj );
718                    oos.flush();
719                }
720                else if ( led.command == LateralCommand.GET_GROUP_NAMES )
721                {
722                    Set<String> obj = handleGetGroupNames(cacheName);
723    
724                    ObjectOutputStream oos = new ObjectOutputStream( socket.getOutputStream() );
725                    oos.writeObject( obj );
726                    oos.flush();
727                }
728            }
729        }
730    
731        /**
732         * Allows us to set the daemon status on the executor threads
733         * <p>
734         * @author Aaron Smuts
735         */
736        protected static class MyThreadFactory
737            implements ThreadFactory
738        {
739            /**
740             * @param runner
741             * @return daemon thread
742             */
743            public Thread newThread( Runnable runner )
744            {
745                Thread t = new Thread( runner );
746                t.setDaemon( true );
747                t.setPriority( Thread.MIN_PRIORITY );
748                return t;
749            }
750        }
751    
752        /**
753         * Shuts down the receiver.
754         */
755        public void shutdown()
756        {
757            if ( !shutdown )
758            {
759                shutdown = true;
760    
761                if ( log.isInfoEnabled() )
762                {
763                    log.info( "Shutting down TCP Lateral receiver." );
764                }
765                receiver.interrupt();
766            }
767            else
768            {
769                if ( log.isDebugEnabled() )
770                {
771                    log.debug( "Shutdown already called." );
772                }
773            }
774        }
775    }