View Javadoc
1   package org.apache.commons.jcs.auxiliary.lateral.socket.tcp;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.EOFException;
23  import java.io.IOException;
24  import java.io.ObjectInputStream;
25  import java.io.ObjectOutputStream;
26  import java.io.Serializable;
27  import java.net.InetAddress;
28  import java.net.ServerSocket;
29  import java.net.Socket;
30  import java.net.SocketException;
31  import java.net.SocketTimeoutException;
32  import java.util.HashMap;
33  import java.util.Map;
34  import java.util.Set;
35  import java.util.concurrent.ExecutorService;
36  import java.util.concurrent.Executors;
37  import java.util.concurrent.atomic.AtomicBoolean;
38  
39  import org.apache.commons.jcs.access.exception.CacheException;
40  import org.apache.commons.jcs.auxiliary.lateral.LateralElementDescriptor;
41  import org.apache.commons.jcs.auxiliary.lateral.behavior.ILateralCacheListener;
42  import org.apache.commons.jcs.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
43  import org.apache.commons.jcs.engine.CacheInfo;
44  import org.apache.commons.jcs.engine.behavior.ICacheElement;
45  import org.apache.commons.jcs.engine.behavior.ICompositeCacheManager;
46  import org.apache.commons.jcs.engine.behavior.IShutdownObserver;
47  import org.apache.commons.jcs.engine.control.CompositeCache;
48  import org.apache.commons.jcs.engine.control.CompositeCacheManager;
49  import org.apache.commons.jcs.io.ObjectInputStreamClassLoaderAware;
50  import org.apache.commons.jcs.utils.threadpool.DaemonThreadFactory;
51  import org.apache.commons.logging.Log;
52  import org.apache.commons.logging.LogFactory;
53  
54  /**
55   * Listens for connections from other TCP lateral caches and handles them. The initialization method
56   * starts a listening thread, which creates a socket server. When messages are received they are
57   * passed to a pooled executor which then calls the appropriate handle method.
58   */
59  public class LateralTCPListener<K, V>
60      implements ILateralCacheListener<K, V>, IShutdownObserver
61  {
62      /** The logger */
63      private static final Log log = LogFactory.getLog( LateralTCPListener.class );
64  
65      /** How long the server will block on an accept(). 0 is infinite. */
66      private static final int acceptTimeOut = 1000;
67  
68      /** The CacheHub this listener is associated with */
69      private transient ICompositeCacheManager cacheManager;
70  
71      /** Map of available instances, keyed by port */
72      private static final HashMap<String, ILateralCacheListener<?, ?>> instances =
73          new HashMap<String, ILateralCacheListener<?, ?>>();
74  
75      /** The socket listener */
76      private ListenerThread receiver;
77  
78      /** Configuration attributes */
79      private ITCPLateralCacheAttributes tcpLateralCacheAttributes;
80  
81      /** Listening port */
82      private int port;
83  
84      /** The processor. We should probably use an event queue here. */
85      private ExecutorService pooledExecutor;
86  
87      /** put count */
88      private int putCnt = 0;
89  
90      /** remove count */
91      private int removeCnt = 0;
92  
93      /** get count */
94      private int getCnt = 0;
95  
96      /**
97       * Use the vmid by default. This can be set for testing. If we ever need to run more than one
98       * per vm, then we need a new technique.
99       */
100     private long listenerId = CacheInfo.listenerId;
101 
102     /** is this shut down? */
103     private AtomicBoolean shutdown;
104 
105     /** is this terminated? */
106     private AtomicBoolean terminated;
107 
108     /**
109      * Gets the instance attribute of the LateralCacheTCPListener class.
110      * <p>
111      * @param ilca ITCPLateralCacheAttributes
112      * @param cacheMgr
113      * @return The instance value
114      */
115     public synchronized static <K, V> LateralTCPListener<K, V>
116         getInstance( ITCPLateralCacheAttributes ilca, ICompositeCacheManager cacheMgr )
117     {
118         @SuppressWarnings("unchecked") // Need to cast because of common map for all instances
119         LateralTCPListener<K, V> ins = (LateralTCPListener<K, V>) instances.get( String.valueOf( ilca.getTcpListenerPort() ) );
120 
121         if ( ins == null )
122         {
123             ins = new LateralTCPListener<K, V>( ilca );
124 
125             ins.init();
126             ins.setCacheManager( cacheMgr );
127 
128             instances.put( String.valueOf( ilca.getTcpListenerPort() ), ins );
129 
130             if ( log.isInfoEnabled() )
131             {
132                 log.info( "Created new listener " + ilca.getTcpListenerPort() );
133             }
134         }
135 
136         return ins;
137     }
138 
139     /**
140      * Only need one since it does work for all regions, just reference by multiple region names.
141      * <p>
142      * @param ilca
143      */
144     protected LateralTCPListener( ITCPLateralCacheAttributes ilca )
145     {
146         this.setTcpLateralCacheAttributes( ilca );
147     }
148 
149     /**
150      * This starts the ListenerThread on the specified port.
151      */
152     @Override
153     public synchronized void init()
154     {
155         try
156         {
157             this.port = getTcpLateralCacheAttributes().getTcpListenerPort();
158 
159             pooledExecutor = Executors.newCachedThreadPool(
160                     new DaemonThreadFactory("JCS-LateralTCPListener-"));
161             terminated = new AtomicBoolean(false);
162             shutdown = new AtomicBoolean(false);
163 
164             log.info( "Listening on port " + port );
165 
166             ServerSocket serverSocket = new ServerSocket( port );
167             serverSocket.setSoTimeout( acceptTimeOut );
168 
169             receiver = new ListenerThread(serverSocket);
170             receiver.setDaemon( true );
171             receiver.start();
172         }
173         catch ( IOException ex )
174         {
175             throw new IllegalStateException( ex );
176         }
177     }
178 
179     /**
180      * Let the lateral cache set a listener_id. Since there is only one listener for all the
181      * regions and every region gets registered? the id shouldn't be set if it isn't zero. If it is
182      * we assume that it is a reconnect.
183      * <p>
184      * By default, the listener id is the vmid.
185      * <p>
186      * The service should set this value. This value will never be changed by a server we connect
187      * to. It needs to be non static, for unit tests.
188      * <p>
189      * The service will use the value it sets in all send requests to the sender.
190      * <p>
191      * @param id The new listenerId value
192      * @throws IOException
193      */
194     @Override
195     public void setListenerId( long id )
196         throws IOException
197     {
198         this.listenerId = id;
199         if ( log.isDebugEnabled() )
200         {
201             log.debug( "set listenerId = " + id );
202         }
203     }
204 
205     /**
206      * Gets the listenerId attribute of the LateralCacheTCPListener object
207      * <p>
208      * @return The listenerId value
209      * @throws IOException
210      */
211     @Override
212     public long getListenerId()
213         throws IOException
214     {
215         return this.listenerId;
216     }
217 
218     /**
219      * Increments the put count. Gets the cache that was injected by the lateral factory. Calls put
220      * on the cache.
221      * <p>
222      * @see org.apache.commons.jcs.engine.behavior.ICacheListener#handlePut(org.apache.commons.jcs.engine.behavior.ICacheElement)
223      */
224     @Override
225     public void handlePut( ICacheElement<K, V> element )
226         throws IOException
227     {
228         putCnt++;
229         if ( log.isInfoEnabled() )
230         {
231             if ( getPutCnt() % 100 == 0 )
232             {
233                 log.info( "Put Count (port " + getTcpLateralCacheAttributes().getTcpListenerPort() + ") = "
234                     + getPutCnt() );
235             }
236         }
237 
238         if ( log.isDebugEnabled() )
239         {
240             log.debug( "handlePut> cacheName=" + element.getCacheName() + ", key=" + element.getKey() );
241         }
242 
243         getCache( element.getCacheName() ).localUpdate( element );
244     }
245 
246     /**
247      * Increments the remove count. Gets the cache that was injected by the lateral factory. Calls
248      * remove on the cache.
249      * <p>
250      * @see org.apache.commons.jcs.engine.behavior.ICacheListener#handleRemove(java.lang.String,
251      *      Object)
252      */
253     @Override
254     public void handleRemove( String cacheName, K key )
255         throws IOException
256     {
257         removeCnt++;
258         if ( log.isInfoEnabled() )
259         {
260             if ( getRemoveCnt() % 100 == 0 )
261             {
262                 log.info( "Remove Count = " + getRemoveCnt() );
263             }
264         }
265 
266         if ( log.isDebugEnabled() )
267         {
268             log.debug( "handleRemove> cacheName=" + cacheName + ", key=" + key );
269         }
270 
271         getCache( cacheName ).localRemove( key );
272     }
273 
274     /**
275      * Gets the cache that was injected by the lateral factory. Calls removeAll on the cache.
276      * <p>
277      * @see org.apache.commons.jcs.engine.behavior.ICacheListener#handleRemoveAll(java.lang.String)
278      */
279     @Override
280     public void handleRemoveAll( String cacheName )
281         throws IOException
282     {
283         if ( log.isDebugEnabled() )
284         {
285             log.debug( "handleRemoveAll> cacheName=" + cacheName );
286         }
287 
288         getCache( cacheName ).localRemoveAll();
289     }
290 
291     /**
292      * Gets the cache that was injected by the lateral factory. Calls get on the cache.
293      * <p>
294      * @param cacheName
295      * @param key
296      * @return a ICacheElement
297      * @throws IOException
298      */
299     public ICacheElement<K, V> handleGet( String cacheName, K key )
300         throws IOException
301     {
302         getCnt++;
303         if ( log.isInfoEnabled() )
304         {
305             if ( getGetCnt() % 100 == 0 )
306             {
307                 log.info( "Get Count (port " + getTcpLateralCacheAttributes().getTcpListenerPort() + ") = "
308                     + getGetCnt() );
309             }
310         }
311 
312         if ( log.isDebugEnabled() )
313         {
314             log.debug( "handleGet> cacheName=" + cacheName + ", key = " + key );
315         }
316 
317         return getCache( cacheName ).localGet( key );
318     }
319 
320     /**
321      * Gets the cache that was injected by the lateral factory. Calls get on the cache.
322      * <p>
323      * @param cacheName the name of the cache
324      * @param pattern the matching pattern
325      * @return Map
326      * @throws IOException
327      */
328     public Map<K, ICacheElement<K, V>> handleGetMatching( String cacheName, String pattern )
329         throws IOException
330     {
331         getCnt++;
332         if ( log.isInfoEnabled() )
333         {
334             if ( getGetCnt() % 100 == 0 )
335             {
336                 log.info( "GetMatching Count (port " + getTcpLateralCacheAttributes().getTcpListenerPort() + ") = "
337                     + getGetCnt() );
338             }
339         }
340 
341         if ( log.isDebugEnabled() )
342         {
343             log.debug( "handleGetMatching> cacheName=" + cacheName + ", pattern = " + pattern );
344         }
345 
346         return getCache( cacheName ).localGetMatching( pattern );
347     }
348 
349     /**
350      * Gets the cache that was injected by the lateral factory. Calls getKeySet on the cache.
351      * <p>
352      * @param cacheName the name of the cache
353      * @return a set of keys
354      * @throws IOException
355      */
356     public Set<K> handleGetKeySet( String cacheName ) throws IOException
357     {
358     	return getCache( cacheName ).getKeySet(true);
359     }
360 
361     /**
362      * This marks this instance as terminated.
363      * <p>
364      * @see org.apache.commons.jcs.engine.behavior.ICacheListener#handleDispose(java.lang.String)
365      */
366     @Override
367     public void handleDispose( String cacheName )
368         throws IOException
369     {
370         if ( log.isInfoEnabled() )
371         {
372             log.info( "handleDispose > cacheName=" + cacheName + " | Ignoring message.  Do not dispose from remote." );
373         }
374 
375         // TODO handle active deregistration, rather than passive detection
376         terminated.set(true);
377     }
378 
379     @Override
380     public synchronized void dispose()
381     {
382         terminated.set(true);
383         notify();
384 
385         pooledExecutor.shutdownNow();
386     }
387 
388     /**
389      * Gets the cacheManager attribute of the LateralCacheTCPListener object.
390      * <p>
391      * Normally this is set by the factory. If it wasn't set the listener defaults to the expected
392      * singleton behavior of the cache manager.
393      * <p>
394      * @param name
395      * @return CompositeCache
396      */
397     protected CompositeCache<K, V> getCache( String name )
398     {
399         if ( getCacheManager() == null )
400         {
401             // revert to singleton on failure
402             try
403             {
404                 setCacheManager( CompositeCacheManager.getInstance() );
405             }
406             catch (CacheException e)
407             {
408                 throw new RuntimeException("Could not retrieve cache manager instance", e);
409             }
410 
411             if ( log.isDebugEnabled() )
412             {
413                 log.debug( "cacheMgr = " + getCacheManager() );
414             }
415         }
416 
417         return getCacheManager().getCache( name );
418     }
419 
420     /**
421      * This is roughly the number of updates the lateral has received.
422      * <p>
423      * @return Returns the putCnt.
424      */
425     public int getPutCnt()
426     {
427         return putCnt;
428     }
429 
430     /**
431      * @return Returns the getCnt.
432      */
433     public int getGetCnt()
434     {
435         return getCnt;
436     }
437 
438     /**
439      * @return Returns the removeCnt.
440      */
441     public int getRemoveCnt()
442     {
443         return removeCnt;
444     }
445 
446     /**
447      * @param cacheMgr The cacheMgr to set.
448      */
449     @Override
450     public void setCacheManager( ICompositeCacheManager cacheMgr )
451     {
452         this.cacheManager = cacheMgr;
453     }
454 
455     /**
456      * @return Returns the cacheMgr.
457      */
458     @Override
459     public ICompositeCacheManager getCacheManager()
460     {
461         return cacheManager;
462     }
463 
464     /**
465      * @param tcpLateralCacheAttributes The tcpLateralCacheAttributes to set.
466      */
467     public void setTcpLateralCacheAttributes( ITCPLateralCacheAttributes tcpLateralCacheAttributes )
468     {
469         this.tcpLateralCacheAttributes = tcpLateralCacheAttributes;
470     }
471 
472     /**
473      * @return Returns the tcpLateralCacheAttributes.
474      */
475     public ITCPLateralCacheAttributes getTcpLateralCacheAttributes()
476     {
477         return tcpLateralCacheAttributes;
478     }
479 
480     /**
481      * Processes commands from the server socket. There should be one listener for each configured
482      * TCP lateral.
483      */
484     public class ListenerThread
485         extends Thread
486     {
487         /** The socket listener */
488         private final ServerSocket serverSocket;
489 
490         /**
491          * Constructor
492          *
493          * @param serverSocket
494          */
495         public ListenerThread(ServerSocket serverSocket)
496         {
497             super();
498             this.serverSocket = serverSocket;
499         }
500 
501         /** Main processing method for the ListenerThread object */
502         @SuppressWarnings("synthetic-access")
503         @Override
504         public void run()
505         {
506             try
507             {
508                 ConnectionHandler handler;
509 
510                 outer: while ( true )
511                 {
512                     if ( log.isDebugEnabled() )
513                     {
514                         log.debug( "Waiting for clients to connect " );
515                     }
516 
517                     Socket socket = null;
518                     inner: while (true)
519                     {
520                         // Check to see if we've been asked to exit, and exit
521                         if (terminated.get())
522                         {
523                             if (log.isDebugEnabled())
524                             {
525                                 log.debug("Thread terminated, exiting gracefully");
526                             }
527                             break outer;
528                         }
529 
530                         try
531                         {
532                             socket = serverSocket.accept();
533                             break inner;
534                         }
535                         catch (SocketTimeoutException e)
536                         {
537                             // No problem! We loop back up!
538                             continue inner;
539                         }
540                     }
541 
542                     if ( socket != null && log.isDebugEnabled() )
543                     {
544                         InetAddress inetAddress = socket.getInetAddress();
545                         log.debug( "Connected to client at " + inetAddress );
546                     }
547 
548                     handler = new ConnectionHandler( socket );
549                     pooledExecutor.execute( handler );
550                 }
551             }
552             catch ( IOException e )
553             {
554                 log.error( "Exception caught in TCP listener", e );
555             }
556             finally
557             {
558             	if (serverSocket != null)
559             	{
560             		try
561             		{
562 						serverSocket.close();
563 					}
564             		catch (IOException e)
565             		{
566                         log.error( "Exception caught closing socket", e );
567 					}
568             	}
569             }
570         }
571     }
572 
573     /**
574      * A Separate thread that runs when a command comes into the LateralTCPReceiver.
575      */
576     public class ConnectionHandler
577         implements Runnable
578     {
579         /** The socket connection, passed in via constructor */
580         private final Socket socket;
581 
582         /**
583          * Construct for a given socket
584          * @param socket
585          */
586         public ConnectionHandler( Socket socket )
587         {
588             this.socket = socket;
589         }
590 
591         /**
592          * Main processing method for the LateralTCPReceiverConnection object
593          */
594         @Override
595         @SuppressWarnings({"unchecked", // Need to cast from Object
596             "synthetic-access" })
597         public void run()
598         {
599             ObjectInputStream ois;
600 
601             try
602             {
603                 ois = new ObjectInputStreamClassLoaderAware( socket.getInputStream(), null );
604             }
605             catch ( Exception e )
606             {
607                 log.error( "Could not open ObjectInputStream on " + socket, e );
608 
609                 return;
610             }
611 
612             LateralElementDescriptor<K, V> led;
613 
614             try
615             {
616                 while ( true )
617                 {
618                     led = (LateralElementDescriptor<K, V>) ois.readObject();
619 
620                     if ( led == null )
621                     {
622                         log.debug( "LateralElementDescriptor is null" );
623                         continue;
624                     }
625                     if ( led.requesterId == getListenerId() )
626                     {
627                         log.debug( "from self" );
628                     }
629                     else
630                     {
631                         if ( log.isDebugEnabled() )
632                         {
633                             log.debug( "receiving LateralElementDescriptor from another" + "led = " + led
634                                 + ", led.command = " + led.command + ", led.ce = " + led.ce );
635                         }
636 
637                         handle( led );
638                     }
639                 }
640             }
641             catch ( EOFException e )
642             {
643                 log.info( "Caught java.io.EOFException closing connection." + e.getMessage() );
644             }
645             catch ( SocketException e )
646             {
647                 log.info( "Caught java.net.SocketException closing connection." + e.getMessage() );
648             }
649             catch ( Exception e )
650             {
651                 log.error( "Unexpected exception.", e );
652             }
653 
654             try
655             {
656                 ois.close();
657             }
658             catch ( IOException e )
659             {
660                 log.error( "Could not close object input stream.", e );
661             }
662         }
663 
664         /**
665          * This calls the appropriate method, based on the command sent in the Lateral element
666          * descriptor.
667          * <p>
668          * @param led
669          * @throws IOException
670          */
671         @SuppressWarnings("synthetic-access")
672         private void handle( LateralElementDescriptor<K, V> led )
673             throws IOException
674         {
675             String cacheName = led.ce.getCacheName();
676             K key = led.ce.getKey();
677             Serializable obj = null;
678 
679             switch (led.command)
680             {
681                 case UPDATE:
682                     handlePut( led.ce );
683                     break;
684 
685                 case REMOVE:
686                     // if a hashcode was given and filtering is on
687                     // check to see if they are the same
688                     // if so, then don't remove, otherwise issue a remove
689                     if ( led.valHashCode != -1 )
690                     {
691                         if ( getTcpLateralCacheAttributes().isFilterRemoveByHashCode() )
692                         {
693                             ICacheElement<K, V> test = getCache( cacheName ).localGet( key );
694                             if ( test != null )
695                             {
696                                 if ( test.getVal().hashCode() == led.valHashCode )
697                                 {
698                                     if ( log.isDebugEnabled() )
699                                     {
700                                         log.debug( "Filtering detected identical hashCode [" + led.valHashCode
701                                             + "], not issuing a remove for led " + led );
702                                     }
703                                     return;
704                                 }
705                                 else
706                                 {
707                                     if ( log.isDebugEnabled() )
708                                     {
709                                         log.debug( "Different hashcodes, in cache [" + test.getVal().hashCode()
710                                             + "] sent [" + led.valHashCode + "]" );
711                                     }
712                                 }
713                             }
714                         }
715                     }
716                     handleRemove( cacheName, key );
717                     break;
718 
719                 case REMOVEALL:
720                     handleRemoveAll( cacheName );
721                     break;
722 
723                 case GET:
724                     obj = handleGet( cacheName, key );
725                     break;
726 
727                 case GET_MATCHING:
728                     obj = (Serializable) handleGetMatching( cacheName, (String) key );
729                     break;
730 
731                 case GET_KEYSET:
732                 	obj = (Serializable) handleGetKeySet(cacheName);
733                     break;
734 
735                 default: break;
736             }
737 
738             if (obj != null)
739             {
740                 ObjectOutputStream oos = new ObjectOutputStream( socket.getOutputStream() );
741                 oos.writeObject( obj );
742                 oos.flush();
743             }
744         }
745     }
746 
747     /**
748      * Shuts down the receiver.
749      */
750     @Override
751     public void shutdown()
752     {
753         if ( shutdown.compareAndSet(false, true) )
754         {
755             if ( log.isInfoEnabled() )
756             {
757                 log.info( "Shutting down TCP Lateral receiver." );
758             }
759 
760             receiver.interrupt();
761         }
762         else
763         {
764             if ( log.isDebugEnabled() )
765             {
766                 log.debug( "Shutdown already called." );
767             }
768         }
769     }
770 }