001package org.apache.commons.jcs3.auxiliary.lateral.socket.tcp;
002
003/*
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements.  See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership.  The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License.  You may obtain a copy of the License at
011 *
012 *   http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing,
015 * software distributed under the License is distributed on an
016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017 * KIND, either express or implied.  See the License for the
018 * specific language governing permissions and limitations
019 * under the License.
020 */
021
022import java.io.IOException;
023import java.io.InputStream;
024import java.io.OutputStream;
025import java.net.InetSocketAddress;
026import java.net.ServerSocket;
027import java.net.Socket;
028import java.net.SocketAddress;
029import java.nio.channels.SelectionKey;
030import java.nio.channels.Selector;
031import java.nio.channels.ServerSocketChannel;
032import java.nio.channels.SocketChannel;
033import java.util.Iterator;
034import java.util.Map;
035import java.util.Set;
036import java.util.concurrent.ConcurrentHashMap;
037import java.util.concurrent.atomic.AtomicBoolean;
038
039import org.apache.commons.jcs3.auxiliary.lateral.LateralElementDescriptor;
040import org.apache.commons.jcs3.auxiliary.lateral.behavior.ILateralCacheListener;
041import org.apache.commons.jcs3.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
042import org.apache.commons.jcs3.engine.CacheInfo;
043import org.apache.commons.jcs3.engine.behavior.ICacheElement;
044import org.apache.commons.jcs3.engine.behavior.ICompositeCacheManager;
045import org.apache.commons.jcs3.engine.behavior.IElementSerializer;
046import org.apache.commons.jcs3.engine.behavior.IShutdownObserver;
047import org.apache.commons.jcs3.engine.control.CompositeCache;
048import org.apache.commons.jcs3.log.Log;
049import org.apache.commons.jcs3.log.LogManager;
050import org.apache.commons.jcs3.utils.serialization.StandardSerializer;
051
052/**
053 * Listens for connections from other TCP lateral caches and handles them. The initialization method
054 * starts a listening thread, which creates a socket server. When messages are received they are
055 * passed to a pooled executor which then calls the appropriate handle method.
056 */
057public class LateralTCPListener<K, V>
058    implements ILateralCacheListener<K, V>, IShutdownObserver
059{
060    /** The logger */
061    private static final Log log = LogManager.getLog( LateralTCPListener.class );
062
063    /** How long the server will block on an accept(). 0 is infinite. */
064    private static final int acceptTimeOut = 1000;
065
066    /** The CacheHub this listener is associated with */
067    private transient ICompositeCacheManager cacheManager;
068
069    /** Map of available instances, keyed by port */
070    private static final ConcurrentHashMap<String, ILateralCacheListener<?, ?>> instances =
071        new ConcurrentHashMap<>();
072
073    /** Configuration attributes */
074    private ITCPLateralCacheAttributes tcpLateralCacheAttributes;
075
076    /** The listener thread */
077    private Thread listenerThread;
078
079    /**
080     * Serializer for reading and writing
081     */
082    private IElementSerializer serializer;
083
084    /** put count */
085    private int putCnt;
086
087    /** remove count */
088    private int removeCnt;
089
090    /** get count */
091    private int getCnt;
092
093    /**
094     * Use the vmid by default. This can be set for testing. If we ever need to run more than one
095     * per vm, then we need a new technique.
096     */
097    private long listenerId = CacheInfo.listenerId;
098
099    /** is this shut down? */
100    private final AtomicBoolean shutdown = new AtomicBoolean();
101
102    /** is this terminated? */
103    private final AtomicBoolean terminated = new AtomicBoolean();
104
105    /**
106     * Gets the instance attribute of the LateralCacheTCPListener class.
107     * <p>
108     * @param ilca ITCPLateralCacheAttributes
109     * @param cacheMgr
110     * @return The instance value
111     * @deprecated Specify serializer
112     */
113    @Deprecated
114    @SuppressWarnings("unchecked") // Need to cast because of common map for all instances
115    public static <K, V> LateralTCPListener<K, V>
116        getInstance( final ITCPLateralCacheAttributes ilca, final ICompositeCacheManager cacheMgr)
117    {
118        return (LateralTCPListener<K, V>) instances.computeIfAbsent(
119                String.valueOf( ilca.getTcpListenerPort() ),
120                k -> {
121                    final LateralTCPListener<K, V> newIns = new LateralTCPListener<>( ilca, new StandardSerializer() );
122
123                    newIns.init();
124                    newIns.setCacheManager( cacheMgr );
125
126                    log.info("Created new listener {0}", ilca::getTcpListenerPort);
127
128                    return newIns;
129                });
130    }
131
132    /**
133     * Gets the instance attribute of the LateralCacheTCPListener class.
134     * <p>
135     * @param ilca ITCPLateralCacheAttributes
136     * @param cacheMgr
137     * @param serializer the serializer to use when receiving
138     * @return The instance value
139     */
140    @SuppressWarnings("unchecked") // Need to cast because of common map for all instances
141    public static <K, V> LateralTCPListener<K, V>
142        getInstance( final ITCPLateralCacheAttributes ilca, final ICompositeCacheManager cacheMgr, final IElementSerializer serializer )
143    {
144        return (LateralTCPListener<K, V>) instances.computeIfAbsent(
145                String.valueOf( ilca.getTcpListenerPort() ),
146                k -> {
147                    final LateralTCPListener<K, V> newIns = new LateralTCPListener<>( ilca, serializer );
148
149                    newIns.init();
150                    newIns.setCacheManager( cacheMgr );
151
152                    log.info("Created new listener {0}", ilca::getTcpListenerPort);
153
154                    return newIns;
155                });
156    }
157
158    /**
159     * Only need one since it does work for all regions, just reference by multiple region names.
160     * <p>
161     * @param ilca ITCPLateralCacheAttributes
162     * @deprecated Specify serializer
163     */
164    @Deprecated
165    protected LateralTCPListener( final ITCPLateralCacheAttributes ilca )
166    {
167        this(ilca, new StandardSerializer());
168    }
169
170    /**
171     * Only need one since it does work for all regions, just reference by multiple region names.
172     * <p>
173     * @param ilca ITCPLateralCacheAttributes
174     * @param serializer the serializer to use when receiving
175     */
176    protected LateralTCPListener( final ITCPLateralCacheAttributes ilca, final IElementSerializer serializer )
177    {
178        this.setTcpLateralCacheAttributes( ilca );
179        this.serializer = serializer;
180    }
181
182    /**
183     * This starts the ListenerThread on the specified port.
184     */
185    @Override
186    public synchronized void init()
187    {
188        try
189        {
190            final int port = getTcpLateralCacheAttributes().getTcpListenerPort();
191            final String host = getTcpLateralCacheAttributes().getTcpListenerHost();
192
193            terminated.set(false);
194            shutdown.set(false);
195
196            final ServerSocketChannel serverSocket = ServerSocketChannel.open();
197
198            SocketAddress endPoint;
199
200            if (host != null && !host.isEmpty())
201            {
202                log.info( "Listening on {0}:{1}", host, port );
203                //Bind the SocketAddress with host and port
204                endPoint = new InetSocketAddress(host, port);
205            }
206            else
207            {
208                log.info( "Listening on port {0}", port );
209                endPoint = new InetSocketAddress(port);
210            }
211
212            serverSocket.bind(endPoint);
213            serverSocket.configureBlocking(false);
214
215            listenerThread = new Thread(() -> runListener(serverSocket),
216                    "JCS-LateralTCPListener-" + host + ":" + port);
217            listenerThread.setDaemon(true);
218            listenerThread.start();
219        }
220        catch ( final IOException ex )
221        {
222            throw new IllegalStateException( ex );
223        }
224    }
225
226    /**
227     * Let the lateral cache set a listener_id. Since there is only one listener for all the
228     * regions and every region gets registered? the id shouldn't be set if it isn't zero. If it is
229     * we assume that it is a reconnect.
230     * <p>
231     * By default, the listener id is the vmid.
232     * <p>
233     * The service should set this value. This value will never be changed by a server we connect
234     * to. It needs to be non static, for unit tests.
235     * <p>
236     * The service will use the value it sets in all send requests to the sender.
237     * <p>
238     * @param id The new listenerId value
239     * @throws IOException
240     */
241    @Override
242    public void setListenerId( final long id )
243        throws IOException
244    {
245        this.listenerId = id;
246        log.debug( "set listenerId = {0}", id );
247    }
248
249    /**
250     * Gets the listenerId attribute of the LateralCacheTCPListener object
251     * <p>
252     * @return The listenerId value
253     * @throws IOException
254     */
255    @Override
256    public long getListenerId()
257        throws IOException
258    {
259        return this.listenerId;
260    }
261
262    /**
263     * Increments the put count. Gets the cache that was injected by the lateral factory. Calls put
264     * on the cache.
265     * <p>
266     * @see org.apache.commons.jcs3.engine.behavior.ICacheListener#handlePut(org.apache.commons.jcs3.engine.behavior.ICacheElement)
267     */
268    @Override
269    public void handlePut( final ICacheElement<K, V> element )
270        throws IOException
271    {
272        putCnt++;
273        if ( log.isInfoEnabled() && getPutCnt() % 100 == 0 )
274        {
275            log.info( "Put Count (port {0}) = {1}",
276                    () -> getTcpLateralCacheAttributes().getTcpListenerPort(),
277                    this::getPutCnt);
278        }
279
280        log.debug( "handlePut> cacheName={0}, key={1}",
281                element::getCacheName, element::getKey);
282
283        getCache( element.getCacheName() ).localUpdate( element );
284    }
285
286    /**
287     * Increments the remove count. Gets the cache that was injected by the lateral factory. Calls
288     * remove on the cache.
289     * <p>
290     * @see org.apache.commons.jcs3.engine.behavior.ICacheListener#handleRemove(String,
291     *      Object)
292     */
293    @Override
294    public void handleRemove( final String cacheName, final K key )
295        throws IOException
296    {
297        removeCnt++;
298        if ( log.isInfoEnabled() && getRemoveCnt() % 100 == 0 )
299        {
300            log.info( "Remove Count = {0}", this::getRemoveCnt);
301        }
302
303        log.debug( "handleRemove> cacheName={0}, key={1}", cacheName, key );
304
305        getCache( cacheName ).localRemove( key );
306    }
307
308    /**
309     * Gets the cache that was injected by the lateral factory. Calls removeAll on the cache.
310     * <p>
311     * @see org.apache.commons.jcs3.engine.behavior.ICacheListener#handleRemoveAll(String)
312     */
313    @Override
314    public void handleRemoveAll( final String cacheName )
315        throws IOException
316    {
317        log.debug( "handleRemoveAll> cacheName={0}", cacheName );
318
319        getCache( cacheName ).localRemoveAll();
320    }
321
322    /**
323     * Gets the cache that was injected by the lateral factory. Calls get on the cache.
324     * <p>
325     * @param cacheName
326     * @param key
327     * @return a ICacheElement
328     * @throws IOException
329     */
330    public ICacheElement<K, V> handleGet( final String cacheName, final K key )
331        throws IOException
332    {
333        getCnt++;
334        if ( log.isInfoEnabled() && getGetCnt() % 100 == 0 )
335        {
336            log.info( "Get Count (port {0}) = {1}",
337                    () -> getTcpLateralCacheAttributes().getTcpListenerPort(),
338                    this::getGetCnt);
339        }
340
341        log.debug( "handleGet> cacheName={0}, key={1}", cacheName, key );
342
343        return getCache( cacheName ).localGet( key );
344    }
345
346    /**
347     * Gets the cache that was injected by the lateral factory. Calls get on the cache.
348     * <p>
349     * @param cacheName the name of the cache
350     * @param pattern the matching pattern
351     * @return Map
352     * @throws IOException
353     */
354    public Map<K, ICacheElement<K, V>> handleGetMatching( final String cacheName, final String pattern )
355        throws IOException
356    {
357        getCnt++;
358        if ( log.isInfoEnabled() && getGetCnt() % 100 == 0 )
359        {
360            log.info( "GetMatching Count (port {0}) = {1}",
361                    () -> getTcpLateralCacheAttributes().getTcpListenerPort(),
362                    this::getGetCnt);
363        }
364
365        log.debug( "handleGetMatching> cacheName={0}, pattern={1}", cacheName, pattern );
366
367        return getCache( cacheName ).localGetMatching( pattern );
368    }
369
370    /**
371     * Gets the cache that was injected by the lateral factory. Calls getKeySet on the cache.
372     * <p>
373     * @param cacheName the name of the cache
374     * @return a set of keys
375     * @throws IOException
376     */
377    public Set<K> handleGetKeySet( final String cacheName ) throws IOException
378    {
379        return getCache( cacheName ).getKeySet(true);
380    }
381
382    /**
383     * This marks this instance as terminated.
384     * <p>
385     * @see org.apache.commons.jcs3.engine.behavior.ICacheListener#handleDispose(String)
386     */
387    @Override
388    public void handleDispose( final String cacheName )
389        throws IOException
390    {
391        log.info( "handleDispose > cacheName={0} | Ignoring message. "
392                + "Do not dispose from remote.", cacheName );
393
394        // TODO handle active deregistration, rather than passive detection
395        dispose();
396    }
397
398    @Override
399    public synchronized void dispose()
400    {
401        if (terminated.compareAndSet(false, true))
402        {
403            notify();
404            listenerThread.interrupt();
405        }
406    }
407
408    /**
409     * Gets the cacheManager attribute of the LateralCacheTCPListener object.
410     * <p>
411     * Normally this is set by the factory. If it wasn't set the listener defaults to the expected
412     * singleton behavior of the cache manager.
413     * <p>
414     * @param name
415     * @return CompositeCache
416     */
417    protected CompositeCache<K, V> getCache( final String name )
418    {
419        return getCacheManager().getCache( name );
420    }
421
422    /**
423     * This is roughly the number of updates the lateral has received.
424     * <p>
425     * @return Returns the putCnt.
426     */
427    public int getPutCnt()
428    {
429        return putCnt;
430    }
431
432    /**
433     * @return Returns the getCnt.
434     */
435    public int getGetCnt()
436    {
437        return getCnt;
438    }
439
440    /**
441     * @return Returns the removeCnt.
442     */
443    public int getRemoveCnt()
444    {
445        return removeCnt;
446    }
447
448    /**
449     * @param cacheMgr The cacheMgr to set.
450     */
451    @Override
452    public void setCacheManager( final ICompositeCacheManager cacheMgr )
453    {
454        this.cacheManager = cacheMgr;
455    }
456
457    /**
458     * @return Returns the cacheMgr.
459     */
460    @Override
461    public ICompositeCacheManager getCacheManager()
462    {
463        return cacheManager;
464    }
465
466    /**
467     * @param tcpLateralCacheAttributes The tcpLateralCacheAttributes to set.
468     */
469    public void setTcpLateralCacheAttributes( final ITCPLateralCacheAttributes tcpLateralCacheAttributes )
470    {
471        this.tcpLateralCacheAttributes = tcpLateralCacheAttributes;
472    }
473
474    /**
475     * @return Returns the tcpLateralCacheAttributes.
476     */
477    public ITCPLateralCacheAttributes getTcpLateralCacheAttributes()
478    {
479        return tcpLateralCacheAttributes;
480    }
481
482    /**
483     * Processes commands from the server socket. There should be one listener for each configured
484     * TCP lateral.
485     * @deprecated No longer used
486     */
487    @Deprecated
488    public class ListenerThread
489        extends Thread
490    {
491        /** The socket listener */
492        private final ServerSocket serverSocket;
493
494        /**
495         * Constructor
496         *
497         * @param serverSocket
498         */
499        public ListenerThread(final ServerSocket serverSocket)
500        {
501            this.serverSocket = serverSocket;
502        }
503
504        /** Main processing method for the ListenerThread object */
505        @Override
506        public void run()
507        {
508            runListener(serverSocket.getChannel());
509        }
510    }
511
512    /**
513     * Processes commands from the server socket. There should be one listener for each configured
514     * TCP lateral.
515     */
516    private void runListener(final ServerSocketChannel serverSocket)
517    {
518        try (Selector selector = Selector.open())
519        {
520            serverSocket.register(selector, SelectionKey.OP_ACCEPT);
521            log.debug("Waiting for clients to connect");
522
523            // Check to see if we've been asked to exit, and exit
524            while (!terminated.get())
525            {
526                int activeKeys = selector.select(acceptTimeOut);
527                if (activeKeys == 0)
528                {
529                    continue;
530                }
531
532                for (Iterator<SelectionKey> i = selector.selectedKeys().iterator(); i.hasNext();)
533                {
534                    if (terminated.get())
535                    {
536                        break;
537                    }
538
539                    SelectionKey key = i.next();
540
541                    if (!key.isValid())
542                    {
543                        continue;
544                    }
545
546                    if (key.isAcceptable())
547                    {
548                        ServerSocketChannel server = (ServerSocketChannel) key.channel();
549                        SocketChannel client = server.accept();
550                        if (client == null)
551                        {
552                            //may happen in non-blocking mode
553                            continue;
554                        }
555
556                        log.info("Connected to client at {0}", client.getRemoteAddress());
557
558                        client.configureBlocking(false);
559                        client.register(selector, SelectionKey.OP_READ);
560                    }
561
562                    if (key.isReadable())
563                    {
564                        handleClient(key);
565                    }
566
567                    i.remove();
568                }
569            }
570
571            log.debug("Thread terminated, exiting gracefully");
572
573            //close all registered channels
574            selector.keys().forEach(key -> {
575                try
576                {
577                    key.channel().close();
578                }
579                catch (IOException e)
580                {
581                    log.warn("Problem closing channel", e);
582                }
583            });
584        }
585        catch (final IOException e)
586        {
587            log.error( "Exception caught in TCP listener", e );
588        }
589        finally
590        {
591            try
592            {
593                serverSocket.close();
594            }
595            catch (IOException e)
596            {
597                log.error( "Exception closing TCP listener", e );
598            }
599        }
600    }
601
602    /**
603     * A Separate thread that runs when a command comes into the LateralTCPReceiver.
604     * @deprecated No longer used
605     */
606    @Deprecated
607    public class ConnectionHandler
608        implements Runnable
609    {
610        /** The socket connection, passed in via constructor */
611        private final Socket socket;
612
613        /**
614         * Construct for a given socket
615         * @param socket
616         */
617        public ConnectionHandler( final Socket socket )
618        {
619            this.socket = socket;
620        }
621
622        /**
623         * Main processing method for the LateralTCPReceiverConnection object
624         */
625        @Override
626        public void run()
627        {
628            try (InputStream is = socket.getInputStream())
629            {
630                while ( true )
631                {
632                    final LateralElementDescriptor<K, V> led =
633                            serializer.deSerializeFrom(is, null);
634
635                    if ( led == null )
636                    {
637                        log.debug( "LateralElementDescriptor is null" );
638                        continue;
639                    }
640                    if ( led.getRequesterId() == getListenerId() )
641                    {
642                        log.debug( "from self" );
643                    }
644                    else
645                    {
646                        log.debug( "receiving LateralElementDescriptor from another led = {0}",
647                                led );
648
649                        Object obj = handleElement(led);
650                        if (obj != null)
651                        {
652                            OutputStream os = socket.getOutputStream();
653                            serializer.serializeTo(obj, os);
654                            os.flush();
655                        }
656                    }
657                }
658            }
659            catch (final IOException e)
660            {
661                log.info("Caught {0}, closing connection.", e.getClass().getSimpleName(), e);
662            }
663            catch (final ClassNotFoundException e)
664            {
665                log.error( "Deserialization failed reading from socket", e );
666            }
667        }
668    }
669
670    /**
671     * A Separate thread that runs when a command comes into the LateralTCPReceiver.
672     */
673    private void handleClient(final SelectionKey key)
674    {
675        final SocketChannel socketChannel = (SocketChannel) key.channel();
676
677        try
678        {
679            final LateralElementDescriptor<K, V> led =
680                    serializer.deSerializeFrom(socketChannel, null);
681
682            if ( led == null )
683            {
684                log.debug("LateralElementDescriptor is null");
685                return;
686            }
687
688            if ( led.getRequesterId() == getListenerId() )
689            {
690                log.debug( "from self" );
691            }
692            else
693            {
694                log.debug( "receiving LateralElementDescriptor from another led = {0}",
695                        led );
696
697                Object obj = handleElement(led);
698                if (obj != null)
699                {
700                    serializer.serializeTo(obj, socketChannel);
701                }
702            }
703        }
704        catch (final IOException e)
705        {
706            log.info("Caught {0}, closing connection.", e.getClass().getSimpleName(), e);
707            try
708            {
709                socketChannel.close();
710            }
711            catch (IOException e1)
712            {
713                log.error("Error while closing connection", e );
714            }
715        }
716        catch (final ClassNotFoundException e)
717        {
718            log.error( "Deserialization failed reading from socket", e );
719        }
720    }
721
722    /**
723     * This calls the appropriate method, based on the command sent in the Lateral element
724     * descriptor.
725     * <p>
726     * @param led the lateral element
727     * @return a possible response
728     * @throws IOException
729     */
730    private Object handleElement(final LateralElementDescriptor<K, V> led) throws IOException
731    {
732        final String cacheName = led.getPayload().getCacheName();
733        final K key = led.getPayload().getKey();
734        Object obj = null;
735
736        switch (led.getCommand())
737        {
738            case UPDATE:
739                handlePut(led.getPayload());
740                break;
741
742            case REMOVE:
743                // if a hash code was given and filtering is on
744                // check to see if they are the same
745                // if so, then don't remove, otherwise issue a remove
746                if (led.getValHashCode() != -1 &&
747                    getTcpLateralCacheAttributes().isFilterRemoveByHashCode())
748                {
749                    final ICacheElement<K, V> test = getCache( cacheName ).localGet( key );
750                    if ( test != null )
751                    {
752                        if ( test.getVal().hashCode() == led.getValHashCode() )
753                        {
754                            log.debug( "Filtering detected identical hashCode [{0}], "
755                                    + "not issuing a remove for led {1}",
756                                    led.getValHashCode(), led );
757                            return null;
758                        }
759                        log.debug( "Different hash codes, in cache [{0}] sent [{1}]",
760                                test.getVal()::hashCode, led::getValHashCode );
761                    }
762                }
763                handleRemove( cacheName, key );
764                break;
765
766            case REMOVEALL:
767                handleRemoveAll( cacheName );
768                break;
769
770            case GET:
771                obj = handleGet( cacheName, key );
772                break;
773
774            case GET_MATCHING:
775                obj = handleGetMatching( cacheName, (String) key );
776                break;
777
778            case GET_KEYSET:
779                obj = handleGetKeySet(cacheName);
780                break;
781
782            default: break;
783        }
784
785        return obj;
786    }
787
788    /**
789     * Shuts down the receiver.
790     */
791    @Override
792    public void shutdown()
793    {
794        if ( shutdown.compareAndSet(false, true) )
795        {
796            log.info( "Shutting down TCP Lateral receiver." );
797            dispose();
798        }
799        else
800        {
801            log.debug( "Shutdown already called." );
802        }
803    }
804}