View Javadoc
1   package org.apache.commons.jcs3.auxiliary.lateral.socket.tcp;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.IOException;
23  import java.io.InputStream;
24  import java.io.OutputStream;
25  import java.net.InetSocketAddress;
26  import java.net.ServerSocket;
27  import java.net.Socket;
28  import java.net.SocketAddress;
29  import java.nio.channels.SelectionKey;
30  import java.nio.channels.Selector;
31  import java.nio.channels.ServerSocketChannel;
32  import java.nio.channels.SocketChannel;
33  import java.util.Iterator;
34  import java.util.Map;
35  import java.util.Set;
36  import java.util.concurrent.ConcurrentHashMap;
37  import java.util.concurrent.atomic.AtomicBoolean;
38  
39  import org.apache.commons.jcs3.auxiliary.lateral.LateralElementDescriptor;
40  import org.apache.commons.jcs3.auxiliary.lateral.behavior.ILateralCacheListener;
41  import org.apache.commons.jcs3.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
42  import org.apache.commons.jcs3.engine.CacheInfo;
43  import org.apache.commons.jcs3.engine.behavior.ICacheElement;
44  import org.apache.commons.jcs3.engine.behavior.ICompositeCacheManager;
45  import org.apache.commons.jcs3.engine.behavior.IElementSerializer;
46  import org.apache.commons.jcs3.engine.behavior.IShutdownObserver;
47  import org.apache.commons.jcs3.engine.control.CompositeCache;
48  import org.apache.commons.jcs3.log.Log;
49  import org.apache.commons.jcs3.log.LogManager;
50  import org.apache.commons.jcs3.utils.serialization.StandardSerializer;
51  
52  /**
53   * Listens for connections from other TCP lateral caches and handles them. The initialization method
54   * starts a listening thread, which creates a socket server. When messages are received they are
55   * passed to a pooled executor which then calls the appropriate handle method.
56   */
57  public class LateralTCPListener<K, V>
58      implements ILateralCacheListener<K, V>, IShutdownObserver
59  {
60      /** The logger */
61      private static final Log log = LogManager.getLog( LateralTCPListener.class );
62  
63      /** How long the server will block on an accept(). 0 is infinite. */
64      private static final int acceptTimeOut = 1000;
65  
66      /** The CacheHub this listener is associated with */
67      private transient ICompositeCacheManager cacheManager;
68  
69      /** Map of available instances, keyed by port */
70      private static final ConcurrentHashMap<String, ILateralCacheListener<?, ?>> instances =
71          new ConcurrentHashMap<>();
72  
73      /** Configuration attributes */
74      private ITCPLateralCacheAttributes tcpLateralCacheAttributes;
75  
76      /** The listener thread */
77      private Thread listenerThread;
78  
79      /**
80       * Serializer for reading and writing
81       */
82      private IElementSerializer serializer;
83  
84      /** put count */
85      private int putCnt;
86  
87      /** remove count */
88      private int removeCnt;
89  
90      /** get count */
91      private int getCnt;
92  
93      /**
94       * Use the vmid by default. This can be set for testing. If we ever need to run more than one
95       * per vm, then we need a new technique.
96       */
97      private long listenerId = CacheInfo.listenerId;
98  
99      /** is this shut down? */
100     private final AtomicBoolean shutdown = new AtomicBoolean();
101 
102     /** is this terminated? */
103     private final AtomicBoolean terminated = new AtomicBoolean();
104 
105     /**
106      * Gets the instance attribute of the LateralCacheTCPListener class.
107      * <p>
108      * @param ilca ITCPLateralCacheAttributes
109      * @param cacheMgr
110      * @return The instance value
111      * @deprecated Specify serializer
112      */
113     @Deprecated
114     @SuppressWarnings("unchecked") // Need to cast because of common map for all instances
115     public static <K, V> LateralTCPListener<K, V>
116         getInstance( final ITCPLateralCacheAttributes ilca, final ICompositeCacheManager cacheMgr)
117     {
118         return (LateralTCPListener<K, V>) instances.computeIfAbsent(
119                 String.valueOf( ilca.getTcpListenerPort() ),
120                 k -> {
121                     final LateralTCPListener<K, V> newIns = new LateralTCPListener<>( ilca, new StandardSerializer() );
122 
123                     newIns.init();
124                     newIns.setCacheManager( cacheMgr );
125 
126                     log.info("Created new listener {0}", ilca::getTcpListenerPort);
127 
128                     return newIns;
129                 });
130     }
131 
132     /**
133      * Gets the instance attribute of the LateralCacheTCPListener class.
134      * <p>
135      * @param ilca ITCPLateralCacheAttributes
136      * @param cacheMgr
137      * @param serializer the serializer to use when receiving
138      * @return The instance value
139      */
140     @SuppressWarnings("unchecked") // Need to cast because of common map for all instances
141     public static <K, V> LateralTCPListener<K, V>
142         getInstance( final ITCPLateralCacheAttributes ilca, final ICompositeCacheManager cacheMgr, final IElementSerializer serializer )
143     {
144         return (LateralTCPListener<K, V>) instances.computeIfAbsent(
145                 String.valueOf( ilca.getTcpListenerPort() ),
146                 k -> {
147                     final LateralTCPListener<K, V> newIns = new LateralTCPListener<>( ilca, serializer );
148 
149                     newIns.init();
150                     newIns.setCacheManager( cacheMgr );
151 
152                     log.info("Created new listener {0}", ilca::getTcpListenerPort);
153 
154                     return newIns;
155                 });
156     }
157 
158     /**
159      * Only need one since it does work for all regions, just reference by multiple region names.
160      * <p>
161      * @param ilca ITCPLateralCacheAttributes
162      * @deprecated Specify serializer
163      */
164     @Deprecated
165     protected LateralTCPListener( final ITCPLateralCacheAttributes ilca )
166     {
167         this(ilca, new StandardSerializer());
168     }
169 
170     /**
171      * Only need one since it does work for all regions, just reference by multiple region names.
172      * <p>
173      * @param ilca ITCPLateralCacheAttributes
174      * @param serializer the serializer to use when receiving
175      */
176     protected LateralTCPListener( final ITCPLateralCacheAttributes ilca, final IElementSerializer serializer )
177     {
178         this.setTcpLateralCacheAttributes( ilca );
179         this.serializer = serializer;
180     }
181 
182     /**
183      * This starts the ListenerThread on the specified port.
184      */
185     @Override
186     public synchronized void init()
187     {
188         try
189         {
190             final int port = getTcpLateralCacheAttributes().getTcpListenerPort();
191             final String host = getTcpLateralCacheAttributes().getTcpListenerHost();
192 
193             terminated.set(false);
194             shutdown.set(false);
195 
196             final ServerSocketChannel serverSocket = ServerSocketChannel.open();
197 
198             SocketAddress endPoint;
199 
200             if (host != null && !host.isEmpty())
201             {
202                 log.info( "Listening on {0}:{1}", host, port );
203                 //Bind the SocketAddress with host and port
204                 endPoint = new InetSocketAddress(host, port);
205             }
206             else
207             {
208                 log.info( "Listening on port {0}", port );
209                 endPoint = new InetSocketAddress(port);
210             }
211 
212             serverSocket.bind(endPoint);
213             serverSocket.configureBlocking(false);
214 
215             listenerThread = new Thread(() -> runListener(serverSocket),
216                     "JCS-LateralTCPListener-" + host + ":" + port);
217             listenerThread.setDaemon(true);
218             listenerThread.start();
219         }
220         catch ( final IOException ex )
221         {
222             throw new IllegalStateException( ex );
223         }
224     }
225 
226     /**
227      * Let the lateral cache set a listener_id. Since there is only one listener for all the
228      * regions and every region gets registered? the id shouldn't be set if it isn't zero. If it is
229      * we assume that it is a reconnect.
230      * <p>
231      * By default, the listener id is the vmid.
232      * <p>
233      * The service should set this value. This value will never be changed by a server we connect
234      * to. It needs to be non static, for unit tests.
235      * <p>
236      * The service will use the value it sets in all send requests to the sender.
237      * <p>
238      * @param id The new listenerId value
239      * @throws IOException
240      */
241     @Override
242     public void setListenerId( final long id )
243         throws IOException
244     {
245         this.listenerId = id;
246         log.debug( "set listenerId = {0}", id );
247     }
248 
249     /**
250      * Gets the listenerId attribute of the LateralCacheTCPListener object
251      * <p>
252      * @return The listenerId value
253      * @throws IOException
254      */
255     @Override
256     public long getListenerId()
257         throws IOException
258     {
259         return this.listenerId;
260     }
261 
262     /**
263      * Increments the put count. Gets the cache that was injected by the lateral factory. Calls put
264      * on the cache.
265      * <p>
266      * @see org.apache.commons.jcs3.engine.behavior.ICacheListener#handlePut(org.apache.commons.jcs3.engine.behavior.ICacheElement)
267      */
268     @Override
269     public void handlePut( final ICacheElement<K, V> element )
270         throws IOException
271     {
272         putCnt++;
273         if ( log.isInfoEnabled() && getPutCnt() % 100 == 0 )
274         {
275             log.info( "Put Count (port {0}) = {1}",
276                     () -> getTcpLateralCacheAttributes().getTcpListenerPort(),
277                     this::getPutCnt);
278         }
279 
280         log.debug( "handlePut> cacheName={0}, key={1}",
281                 element::getCacheName, element::getKey);
282 
283         getCache( element.getCacheName() ).localUpdate( element );
284     }
285 
286     /**
287      * Increments the remove count. Gets the cache that was injected by the lateral factory. Calls
288      * remove on the cache.
289      * <p>
290      * @see org.apache.commons.jcs3.engine.behavior.ICacheListener#handleRemove(String,
291      *      Object)
292      */
293     @Override
294     public void handleRemove( final String cacheName, final K key )
295         throws IOException
296     {
297         removeCnt++;
298         if ( log.isInfoEnabled() && getRemoveCnt() % 100 == 0 )
299         {
300             log.info( "Remove Count = {0}", this::getRemoveCnt);
301         }
302 
303         log.debug( "handleRemove> cacheName={0}, key={1}", cacheName, key );
304 
305         getCache( cacheName ).localRemove( key );
306     }
307 
308     /**
309      * Gets the cache that was injected by the lateral factory. Calls removeAll on the cache.
310      * <p>
311      * @see org.apache.commons.jcs3.engine.behavior.ICacheListener#handleRemoveAll(String)
312      */
313     @Override
314     public void handleRemoveAll( final String cacheName )
315         throws IOException
316     {
317         log.debug( "handleRemoveAll> cacheName={0}", cacheName );
318 
319         getCache( cacheName ).localRemoveAll();
320     }
321 
322     /**
323      * Gets the cache that was injected by the lateral factory. Calls get on the cache.
324      * <p>
325      * @param cacheName
326      * @param key
327      * @return a ICacheElement
328      * @throws IOException
329      */
330     public ICacheElement<K, V> handleGet( final String cacheName, final K key )
331         throws IOException
332     {
333         getCnt++;
334         if ( log.isInfoEnabled() && getGetCnt() % 100 == 0 )
335         {
336             log.info( "Get Count (port {0}) = {1}",
337                     () -> getTcpLateralCacheAttributes().getTcpListenerPort(),
338                     this::getGetCnt);
339         }
340 
341         log.debug( "handleGet> cacheName={0}, key={1}", cacheName, key );
342 
343         return getCache( cacheName ).localGet( key );
344     }
345 
346     /**
347      * Gets the cache that was injected by the lateral factory. Calls get on the cache.
348      * <p>
349      * @param cacheName the name of the cache
350      * @param pattern the matching pattern
351      * @return Map
352      * @throws IOException
353      */
354     public Map<K, ICacheElement<K, V>> handleGetMatching( final String cacheName, final String pattern )
355         throws IOException
356     {
357         getCnt++;
358         if ( log.isInfoEnabled() && getGetCnt() % 100 == 0 )
359         {
360             log.info( "GetMatching Count (port {0}) = {1}",
361                     () -> getTcpLateralCacheAttributes().getTcpListenerPort(),
362                     this::getGetCnt);
363         }
364 
365         log.debug( "handleGetMatching> cacheName={0}, pattern={1}", cacheName, pattern );
366 
367         return getCache( cacheName ).localGetMatching( pattern );
368     }
369 
370     /**
371      * Gets the cache that was injected by the lateral factory. Calls getKeySet on the cache.
372      * <p>
373      * @param cacheName the name of the cache
374      * @return a set of keys
375      * @throws IOException
376      */
377     public Set<K> handleGetKeySet( final String cacheName ) throws IOException
378     {
379     	return getCache( cacheName ).getKeySet(true);
380     }
381 
382     /**
383      * This marks this instance as terminated.
384      * <p>
385      * @see org.apache.commons.jcs3.engine.behavior.ICacheListener#handleDispose(String)
386      */
387     @Override
388     public void handleDispose( final String cacheName )
389         throws IOException
390     {
391         log.info( "handleDispose > cacheName={0} | Ignoring message. "
392                 + "Do not dispose from remote.", cacheName );
393 
394         // TODO handle active deregistration, rather than passive detection
395         dispose();
396     }
397 
398     @Override
399     public synchronized void dispose()
400     {
401         if (terminated.compareAndSet(false, true))
402         {
403             notify();
404             listenerThread.interrupt();
405         }
406     }
407 
408     /**
409      * Gets the cacheManager attribute of the LateralCacheTCPListener object.
410      * <p>
411      * Normally this is set by the factory. If it wasn't set the listener defaults to the expected
412      * singleton behavior of the cache manager.
413      * <p>
414      * @param name
415      * @return CompositeCache
416      */
417     protected CompositeCache<K, V> getCache( final String name )
418     {
419         return getCacheManager().getCache( name );
420     }
421 
422     /**
423      * This is roughly the number of updates the lateral has received.
424      * <p>
425      * @return Returns the putCnt.
426      */
427     public int getPutCnt()
428     {
429         return putCnt;
430     }
431 
432     /**
433      * @return Returns the getCnt.
434      */
435     public int getGetCnt()
436     {
437         return getCnt;
438     }
439 
440     /**
441      * @return Returns the removeCnt.
442      */
443     public int getRemoveCnt()
444     {
445         return removeCnt;
446     }
447 
448     /**
449      * @param cacheMgr The cacheMgr to set.
450      */
451     @Override
452     public void setCacheManager( final ICompositeCacheManager cacheMgr )
453     {
454         this.cacheManager = cacheMgr;
455     }
456 
457     /**
458      * @return Returns the cacheMgr.
459      */
460     @Override
461     public ICompositeCacheManager getCacheManager()
462     {
463         return cacheManager;
464     }
465 
466     /**
467      * @param tcpLateralCacheAttributes The tcpLateralCacheAttributes to set.
468      */
469     public void setTcpLateralCacheAttributes( final ITCPLateralCacheAttributes tcpLateralCacheAttributes )
470     {
471         this.tcpLateralCacheAttributes = tcpLateralCacheAttributes;
472     }
473 
474     /**
475      * @return Returns the tcpLateralCacheAttributes.
476      */
477     public ITCPLateralCacheAttributes getTcpLateralCacheAttributes()
478     {
479         return tcpLateralCacheAttributes;
480     }
481 
482     /**
483      * Processes commands from the server socket. There should be one listener for each configured
484      * TCP lateral.
485      * @deprecated No longer used
486      */
487     @Deprecated
488     public class ListenerThread
489         extends Thread
490     {
491         /** The socket listener */
492         private final ServerSocket serverSocket;
493 
494         /**
495          * Constructor
496          *
497          * @param serverSocket
498          */
499         public ListenerThread(final ServerSocket serverSocket)
500         {
501             this.serverSocket = serverSocket;
502         }
503 
504         /** Main processing method for the ListenerThread object */
505         @Override
506         public void run()
507         {
508             runListener(serverSocket.getChannel());
509         }
510     }
511 
512     /**
513      * Processes commands from the server socket. There should be one listener for each configured
514      * TCP lateral.
515      */
516     private void runListener(final ServerSocketChannel serverSocket)
517     {
518         try (Selector selector = Selector.open())
519         {
520             serverSocket.register(selector, SelectionKey.OP_ACCEPT);
521             log.debug("Waiting for clients to connect");
522 
523             // Check to see if we've been asked to exit, and exit
524             while (!terminated.get())
525             {
526                 int activeKeys = selector.select(acceptTimeOut);
527                 if (activeKeys == 0)
528                 {
529                     continue;
530                 }
531 
532                 for (Iterator<SelectionKey> i = selector.selectedKeys().iterator(); i.hasNext();)
533                 {
534                     if (terminated.get())
535                     {
536                         break;
537                     }
538 
539                     SelectionKey key = i.next();
540 
541                     if (!key.isValid())
542                     {
543                         continue;
544                     }
545 
546                     if (key.isAcceptable())
547                     {
548                         ServerSocketChannel server = (ServerSocketChannel) key.channel();
549                         SocketChannel client = server.accept();
550                         if (client == null)
551                         {
552                             //may happen in non-blocking mode
553                             continue;
554                         }
555 
556                         log.info("Connected to client at {0}", client.getRemoteAddress());
557 
558                         client.configureBlocking(false);
559                         client.register(selector, SelectionKey.OP_READ);
560                     }
561 
562                     if (key.isReadable())
563                     {
564                         handleClient(key);
565                     }
566 
567                     i.remove();
568                 }
569             }
570 
571             log.debug("Thread terminated, exiting gracefully");
572 
573             //close all registered channels
574             selector.keys().forEach(key -> {
575                 try
576                 {
577                     key.channel().close();
578                 }
579                 catch (IOException e)
580                 {
581                     log.warn("Problem closing channel", e);
582                 }
583             });
584         }
585         catch (final IOException e)
586         {
587             log.error( "Exception caught in TCP listener", e );
588         }
589         finally
590         {
591             try
592             {
593                 serverSocket.close();
594             }
595             catch (IOException e)
596             {
597                 log.error( "Exception closing TCP listener", e );
598             }
599         }
600     }
601 
602     /**
603      * A Separate thread that runs when a command comes into the LateralTCPReceiver.
604      * @deprecated No longer used
605      */
606     @Deprecated
607     public class ConnectionHandler
608         implements Runnable
609     {
610         /** The socket connection, passed in via constructor */
611         private final Socket socket;
612 
613         /**
614          * Construct for a given socket
615          * @param socket
616          */
617         public ConnectionHandler( final Socket socket )
618         {
619             this.socket = socket;
620         }
621 
622         /**
623          * Main processing method for the LateralTCPReceiverConnection object
624          */
625         @Override
626         public void run()
627         {
628             try (InputStream is = socket.getInputStream())
629             {
630                 while ( true )
631                 {
632                     final LateralElementDescriptor<K, V> led =
633                             serializer.deSerializeFrom(is, null);
634 
635                     if ( led == null )
636                     {
637                         log.debug( "LateralElementDescriptor is null" );
638                         continue;
639                     }
640                     if ( led.getRequesterId() == getListenerId() )
641                     {
642                         log.debug( "from self" );
643                     }
644                     else
645                     {
646                         log.debug( "receiving LateralElementDescriptor from another led = {0}",
647                                 led );
648 
649                         Object obj = handleElement(led);
650                         if (obj != null)
651                         {
652                             OutputStream os = socket.getOutputStream();
653                             serializer.serializeTo(obj, os);
654                             os.flush();
655                         }
656                     }
657                 }
658             }
659             catch (final IOException e)
660             {
661                 log.info("Caught {0}, closing connection.", e.getClass().getSimpleName(), e);
662             }
663             catch (final ClassNotFoundException e)
664             {
665                 log.error( "Deserialization failed reading from socket", e );
666             }
667         }
668     }
669 
670     /**
671      * A Separate thread that runs when a command comes into the LateralTCPReceiver.
672      */
673     private void handleClient(final SelectionKey key)
674     {
675         final SocketChannel socketChannel = (SocketChannel) key.channel();
676 
677         try
678         {
679             final LateralElementDescriptor<K, V> led =
680                     serializer.deSerializeFrom(socketChannel, null);
681 
682             if ( led == null )
683             {
684                 log.debug("LateralElementDescriptor is null");
685                 return;
686             }
687 
688             if ( led.getRequesterId() == getListenerId() )
689             {
690                 log.debug( "from self" );
691             }
692             else
693             {
694                 log.debug( "receiving LateralElementDescriptor from another led = {0}",
695                         led );
696 
697                 Object obj = handleElement(led);
698                 if (obj != null)
699                 {
700                     serializer.serializeTo(obj, socketChannel);
701                 }
702             }
703         }
704         catch (final IOException e)
705         {
706             log.info("Caught {0}, closing connection.", e.getClass().getSimpleName(), e);
707             try
708             {
709                 socketChannel.close();
710             }
711             catch (IOException e1)
712             {
713                 log.error("Error while closing connection", e );
714             }
715         }
716         catch (final ClassNotFoundException e)
717         {
718             log.error( "Deserialization failed reading from socket", e );
719         }
720     }
721 
722     /**
723      * This calls the appropriate method, based on the command sent in the Lateral element
724      * descriptor.
725      * <p>
726      * @param led the lateral element
727      * @return a possible response
728      * @throws IOException
729      */
730     private Object handleElement(final LateralElementDescriptor<K, V> led) throws IOException
731     {
732         final String cacheName = led.getPayload().getCacheName();
733         final K key = led.getPayload().getKey();
734         Object obj = null;
735 
736         switch (led.getCommand())
737         {
738             case UPDATE:
739                 handlePut(led.getPayload());
740                 break;
741 
742             case REMOVE:
743                 // if a hash code was given and filtering is on
744                 // check to see if they are the same
745                 // if so, then don't remove, otherwise issue a remove
746                 if (led.getValHashCode() != -1 &&
747                     getTcpLateralCacheAttributes().isFilterRemoveByHashCode())
748                 {
749                     final ICacheElement<K, V> test = getCache( cacheName ).localGet( key );
750                     if ( test != null )
751                     {
752                         if ( test.getVal().hashCode() == led.getValHashCode() )
753                         {
754                             log.debug( "Filtering detected identical hashCode [{0}], "
755                                     + "not issuing a remove for led {1}",
756                                     led.getValHashCode(), led );
757                             return null;
758                         }
759                         log.debug( "Different hash codes, in cache [{0}] sent [{1}]",
760                                 test.getVal()::hashCode, led::getValHashCode );
761                     }
762                 }
763                 handleRemove( cacheName, key );
764                 break;
765 
766             case REMOVEALL:
767                 handleRemoveAll( cacheName );
768                 break;
769 
770             case GET:
771                 obj = handleGet( cacheName, key );
772                 break;
773 
774             case GET_MATCHING:
775                 obj = handleGetMatching( cacheName, (String) key );
776                 break;
777 
778             case GET_KEYSET:
779                 obj = handleGetKeySet(cacheName);
780                 break;
781 
782             default: break;
783         }
784 
785         return obj;
786     }
787 
788     /**
789      * Shuts down the receiver.
790      */
791     @Override
792     public void shutdown()
793     {
794         if ( shutdown.compareAndSet(false, true) )
795         {
796             log.info( "Shutting down TCP Lateral receiver." );
797             dispose();
798         }
799         else
800         {
801             log.debug( "Shutdown already called." );
802         }
803     }
804 }