View Javadoc

1   package org.apache.jcs.auxiliary.lateral.socket.tcp;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.IOException;
23  import java.io.ObjectInputStream;
24  import java.io.ObjectOutputStream;
25  import java.io.Serializable;
26  import java.net.InetAddress;
27  import java.net.ServerSocket;
28  import java.net.Socket;
29  import java.net.SocketTimeoutException;
30  import java.util.HashMap;
31  import java.util.Map;
32  import java.util.Set;
33  import java.util.concurrent.ExecutorService;
34  import java.util.concurrent.Executors;
35  import java.util.concurrent.ThreadFactory;
36  
37  import org.apache.commons.logging.Log;
38  import org.apache.commons.logging.LogFactory;
39  import org.apache.jcs.access.exception.CacheException;
40  import org.apache.jcs.auxiliary.lateral.LateralCacheInfo;
41  import org.apache.jcs.auxiliary.lateral.LateralCommand;
42  import org.apache.jcs.auxiliary.lateral.LateralElementDescriptor;
43  import org.apache.jcs.auxiliary.lateral.behavior.ILateralCacheListener;
44  import org.apache.jcs.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
45  import org.apache.jcs.engine.behavior.ICacheElement;
46  import org.apache.jcs.engine.behavior.ICompositeCacheManager;
47  import org.apache.jcs.engine.behavior.IShutdownObserver;
48  import org.apache.jcs.engine.control.CompositeCache;
49  import org.apache.jcs.engine.control.CompositeCacheManager;
50  
51  /**
52   * Listens for connections from other TCP lateral caches and handles them. The initialization method
53   * starts a listening thread, which creates a socket server. When messages are received they are
54   * passed to a pooled executor which then calls the appropriate handle method.
55   */
56  public class LateralTCPListener<K extends Serializable, V extends Serializable>
57      implements ILateralCacheListener<K, V>, Serializable, IShutdownObserver
58  {
59      /** Don't change. */
60      private static final long serialVersionUID = -9107062664967131738L;
61  
62      /** The logger */
63      protected final static Log log = LogFactory.getLog( LateralTCPListener.class );
64  
65      /** How long the server will block on an accept(). 0 is infinite. */
66      private final static int acceptTimeOut = 1000;
67  
68      /** The CacheHub this listener is associated with */
69      private transient ICompositeCacheManager cacheManager;
70  
71      /** Map of available instances, keyed by port */
72      protected final static HashMap<String, ILateralCacheListener<?, ?>> instances =
73          new HashMap<String, ILateralCacheListener<?, ?>>();
74  
75      /** The socket listener */
76      private ListenerThread receiver;
77  
78      /** Configuration attributes */
79      private ITCPLateralCacheAttributes tcpLateralCacheAttributes;
80  
81      /** Listening port */
82      protected int port;
83  
84      /** The processor. We should probably use an event queue here. */
85      protected ExecutorService pooledExecutor;
86  
87      /** put count */
88      private int putCnt = 0;
89  
90      /** remove count */
91      private int removeCnt = 0;
92  
93      /** get count */
94      private int getCnt = 0;
95  
96      /**
97       * Use the vmid by default. This can be set for testing. If we ever need to run more than one
98       * per vm, then we need a new technique.
99       */
100     private long listenerId = LateralCacheInfo.listenerId;
101 
102     /** is this shut down? */
103     protected boolean shutdown = false;
104 
105     /** is this terminated? */
106     protected boolean terminated = false;
107 
108     /**
109      * Gets the instance attribute of the LateralCacheTCPListener class.
110      * <p>
111      * @param ilca ITCPLateralCacheAttributes
112      * @param cacheMgr
113      * @return The instance value
114      */
115     public synchronized static <K extends Serializable, V extends Serializable> LateralTCPListener<K, V>
116         getInstance( ITCPLateralCacheAttributes ilca, ICompositeCacheManager cacheMgr )
117     {
118         @SuppressWarnings("unchecked") // Need to cast because of common map for all instances
119         LateralTCPListener<K, V> ins = (LateralTCPListener<K, V>) instances.get( String.valueOf( ilca.getTcpListenerPort() ) );
120 
121         if ( ins == null )
122         {
123             ins = new LateralTCPListener<K, V>( ilca );
124 
125             ins.init();
126 
127             ins.setCacheManager( cacheMgr );
128 
129             instances.put( String.valueOf( ilca.getTcpListenerPort() ), ins );
130 
131             if ( log.isInfoEnabled() )
132             {
133                 log.info( "Created new listener " + ilca.getTcpListenerPort() );
134             }
135         }
136 
137         return ins;
138     }
139 
140     /**
141      * Only need one since it does work for all regions, just reference by multiple region names.
142      * <p>
143      * @param ilca
144      */
145     protected LateralTCPListener( ITCPLateralCacheAttributes ilca )
146     {
147         this.setTcpLateralCacheAttributes( ilca );
148     }
149 
150     /**
151      * This starts the ListenerThread on the specified port.
152      */
153     public void init()
154     {
155         try
156         {
157             this.port = getTcpLateralCacheAttributes().getTcpListenerPort();
158 
159             receiver = new ListenerThread();
160             receiver.setDaemon( true );
161             receiver.start();
162 
163             pooledExecutor = Executors.newCachedThreadPool(new MyThreadFactory());
164             terminated = false;
165             shutdown = false;
166         }
167         catch ( Exception ex )
168         {
169             log.error( ex );
170             throw new IllegalStateException( ex.getMessage() );
171         }
172     }
173 
174     /**
175      * Let the lateral cache set a listener_id. Since there is only one listener for all the
176      * regions and every region gets registered? the id shouldn't be set if it isn't zero. If it is
177      * we assume that it is a reconnect.
178      * <p>
179      * By default, the listener id is the vmid.
180      * <p>
181      * The service should set this value. This value will never be changed by a server we connect
182      * to. It needs to be non static, for unit tests.
183      * <p>
184      * The service will use the value it sets in all send requests to the sender.
185      * <p>
186      * @param id The new listenerId value
187      * @throws IOException
188      */
189     public void setListenerId( long id )
190         throws IOException
191     {
192         this.listenerId = id;
193         if ( log.isDebugEnabled() )
194         {
195             log.debug( "set listenerId = " + id );
196         }
197     }
198 
199     /**
200      * Gets the listenerId attribute of the LateralCacheTCPListener object
201      * <p>
202      * @return The listenerId value
203      * @throws IOException
204      */
205     public long getListenerId()
206         throws IOException
207     {
208         return this.listenerId;
209     }
210 
211     /**
212      * Increments the put count. Gets the cache that was injected by the lateral factory. Calls put
213      * on the cache.
214      * <p>
215      * @see org.apache.jcs.engine.behavior.ICacheListener#handlePut(org.apache.jcs.engine.behavior.ICacheElement)
216      */
217     public void handlePut( ICacheElement<K, V> element )
218         throws IOException
219     {
220         putCnt++;
221         if ( log.isInfoEnabled() )
222         {
223             if ( getPutCnt() % 100 == 0 )
224             {
225                 log.info( "Put Count (port " + getTcpLateralCacheAttributes().getTcpListenerPort() + ") = "
226                     + getPutCnt() );
227             }
228         }
229 
230         if ( log.isDebugEnabled() )
231         {
232             log.debug( "handlePut> cacheName=" + element.getCacheName() + ", key=" + element.getKey() );
233         }
234 
235         getCache( element.getCacheName() ).localUpdate( element );
236     }
237 
238     /**
239      * Increments the remove count. Gets the cache that was injected by the lateral factory. Calls
240      * remove on the cache.
241      * <p>
242      * @see org.apache.jcs.engine.behavior.ICacheListener#handleRemove(java.lang.String,
243      *      java.io.Serializable)
244      */
245     public void handleRemove( String cacheName, K key )
246         throws IOException
247     {
248         removeCnt++;
249         if ( log.isInfoEnabled() )
250         {
251             if ( getRemoveCnt() % 100 == 0 )
252             {
253                 log.info( "Remove Count = " + getRemoveCnt() );
254             }
255         }
256 
257         if ( log.isDebugEnabled() )
258         {
259             log.debug( "handleRemove> cacheName=" + cacheName + ", key=" + key );
260         }
261 
262         getCache( cacheName ).localRemove( key );
263     }
264 
265     /**
266      * Gets the cache that was injected by the lateral factory. Calls removeAll on the cache.
267      * <p>
268      * @see org.apache.jcs.engine.behavior.ICacheListener#handleRemoveAll(java.lang.String)
269      */
270     public void handleRemoveAll( String cacheName )
271         throws IOException
272     {
273         if ( log.isDebugEnabled() )
274         {
275             log.debug( "handleRemoveAll> cacheName=" + cacheName );
276         }
277 
278         getCache( cacheName ).localRemoveAll();
279     }
280 
281     /**
282      * Gets the cache that was injected by the lateral factory. Calls get on the cache.
283      * <p>
284      * @param cacheName
285      * @param key
286      * @return a ICacheElement
287      * @throws IOException
288      */
289     public ICacheElement<K, V> handleGet( String cacheName, K key )
290         throws IOException
291     {
292         getCnt++;
293         if ( log.isInfoEnabled() )
294         {
295             if ( getGetCnt() % 100 == 0 )
296             {
297                 log.info( "Get Count (port " + getTcpLateralCacheAttributes().getTcpListenerPort() + ") = "
298                     + getGetCnt() );
299             }
300         }
301 
302         if ( log.isDebugEnabled() )
303         {
304             log.debug( "handleGet> cacheName=" + cacheName + ", key = " + key );
305         }
306 
307         return getCache( cacheName ).localGet( key );
308     }
309 
310     /**
311      * Gets the cache that was injected by the lateral factory. Calls get on the cache.
312      * <p>
313      * @param cacheName the name of the cache
314      * @param pattern the matching pattern
315      * @return Map
316      * @throws IOException
317      */
318     public Map<K, ICacheElement<K, V>> handleGetMatching( String cacheName, String pattern )
319         throws IOException
320     {
321         getCnt++;
322         if ( log.isInfoEnabled() )
323         {
324             if ( getGetCnt() % 100 == 0 )
325             {
326                 log.info( "GetMatching Count (port " + getTcpLateralCacheAttributes().getTcpListenerPort() + ") = "
327                     + getGetCnt() );
328             }
329         }
330 
331         if ( log.isDebugEnabled() )
332         {
333             log.debug( "handleGetMatching> cacheName=" + cacheName + ", pattern = " + pattern );
334         }
335 
336         return getCache( cacheName ).localGetMatching( pattern );
337     }
338 
339     /**
340      * Gets the cache that was injected by the lateral factory. Calls getGroupKeys on the cache.
341      * <p>
342      * @param cacheName the name of the cache
343      * @param group the group name
344      * @return a set of keys
345      * @throws IOException
346      */
347     public Set<K> handleGetGroupKeys( String cacheName, String group ) throws IOException
348     {
349     	return getCache( cacheName ).getGroupKeys(group, true);
350     }
351 
352     /**
353      * Gets the cache that was injected by the lateral factory. Calls getGroupNames on the cache.
354      * <p>
355      * @param cacheName the name of the cache
356      * @return a set of group names
357      * @throws IOException
358      */
359     public Set<String> handleGetGroupNames( String cacheName ) throws IOException
360     {
361     	return getCache( cacheName ).getGroupNames(true);
362     }
363 
364     /**
365      * This marks this instance as terminated.
366      * <p>
367      * @see org.apache.jcs.engine.behavior.ICacheListener#handleDispose(java.lang.String)
368      */
369     public void handleDispose( String cacheName )
370         throws IOException
371     {
372         if ( log.isInfoEnabled() )
373         {
374             log.info( "handleDispose > cacheName=" + cacheName + " | Ignoring message.  Do not dispose from remote." );
375         }
376 
377         // TODO handle active deregistration, rather than passive detection
378         synchronized (this)
379         {
380             terminated = true;
381         }
382     }
383 
384     public synchronized void dispose()
385     {
386         terminated = true;
387         notify();
388     }
389 
390     /**
391      * Gets the cacheManager attribute of the LateralCacheTCPListener object.
392      * <p>
393      * Normally this is set by the factory. If it wasn't set the listener defaults to the expected
394      * singleton behavior of the cache manager.
395      * <p>
396      * @param name
397      * @return CompositeCache
398      */
399     protected CompositeCache<K, V> getCache( String name )
400     {
401         if ( getCacheManager() == null )
402         {
403             // revert to singleton on failure
404             try
405             {
406                 setCacheManager( CompositeCacheManager.getInstance() );
407             }
408             catch (CacheException e)
409             {
410                 throw new RuntimeException("Could not retrieve cache manager instance", e);
411             }
412 
413             if ( log.isDebugEnabled() )
414             {
415                 log.debug( "cacheMgr = " + getCacheManager() );
416             }
417         }
418 
419         return getCacheManager().getCache( name );
420     }
421 
422     /**
423      * This is roughly the number of updates the lateral has received.
424      * <p>
425      * @return Returns the putCnt.
426      */
427     public int getPutCnt()
428     {
429         return putCnt;
430     }
431 
432     /**
433      * @return Returns the getCnt.
434      */
435     public int getGetCnt()
436     {
437         return getCnt;
438     }
439 
440     /**
441      * @return Returns the removeCnt.
442      */
443     public int getRemoveCnt()
444     {
445         return removeCnt;
446     }
447 
448     /**
449      * @param cacheMgr The cacheMgr to set.
450      */
451     public void setCacheManager( ICompositeCacheManager cacheMgr )
452     {
453         this.cacheManager = cacheMgr;
454     }
455 
456     /**
457      * @return Returns the cacheMgr.
458      */
459     public ICompositeCacheManager getCacheManager()
460     {
461         return cacheManager;
462     }
463 
464     /**
465      * @param tcpLateralCacheAttributes The tcpLateralCacheAttributes to set.
466      */
467     public void setTcpLateralCacheAttributes( ITCPLateralCacheAttributes tcpLateralCacheAttributes )
468     {
469         this.tcpLateralCacheAttributes = tcpLateralCacheAttributes;
470     }
471 
472     /**
473      * @return Returns the tcpLateralCacheAttributes.
474      */
475     public ITCPLateralCacheAttributes getTcpLateralCacheAttributes()
476     {
477         return tcpLateralCacheAttributes;
478     }
479 
480     /**
481      * Processes commands from the server socket. There should be one listener for each configured
482      * TCP lateral.
483      */
484     public class ListenerThread
485         extends Thread
486     {
487         /** Main processing method for the ListenerThread object */
488         @Override
489         public void run()
490         {
491             try
492             {
493                 log.info( "Listening on port " + port );
494 
495                 ServerSocket serverSocket = new ServerSocket( port );
496                 serverSocket.setSoTimeout( acceptTimeOut );
497 
498                 ConnectionHandler handler;
499 
500                 outer: while ( true )
501                 {
502                     if ( log.isDebugEnabled() )
503                     {
504                         log.debug( "Waiting for clients to connect " );
505                     }
506 
507                     Socket socket = null;
508                     inner: while (true)
509                     {
510                         // Check to see if we've been asked to exit, and exit
511                         synchronized (this)
512                         {
513                             if (terminated)
514                             {
515                                 if (log.isDebugEnabled())
516                                 {
517                                     log.debug("Thread terminated, exiting gracefully");
518                                 }
519                                 break outer;
520                             }
521                         }
522                         try
523                         {
524                             socket = serverSocket.accept();
525                             break inner;
526                         }
527                         catch (SocketTimeoutException e)
528                         {
529                             // No problem! We loop back up!
530                             continue inner;
531                         }
532                     }
533 
534                     if ( socket != null && log.isDebugEnabled() )
535                     {
536                         InetAddress inetAddress = socket.getInetAddress();
537                         log.debug( "Connected to client at " + inetAddress );
538                     }
539 
540                     handler = new ConnectionHandler( socket );
541                     pooledExecutor.execute( handler );
542                 }
543             }
544             catch ( Exception e )
545             {
546                 log.error( "Exception caught in TCP listener", e );
547             }
548         }
549     }
550 
551     /**
552      * A Separate thread that runs when a command comes into the LateralTCPReceiver.
553      */
554     public class ConnectionHandler
555         implements Runnable
556     {
557         /** The socket connection, passed in via constructor */
558         private final Socket socket;
559 
560         /**
561          * Construct for a given socket
562          * @param socket
563          */
564         public ConnectionHandler( Socket socket )
565         {
566             this.socket = socket;
567         }
568 
569         /**
570          * Main processing method for the LateralTCPReceiverConnection object
571          */
572         @SuppressWarnings("unchecked") // Nee to cast from Object
573         public void run()
574         {
575             ObjectInputStream ois;
576 
577             try
578             {
579                 ois = new ObjectInputStream( socket.getInputStream() );
580             }
581             catch ( Exception e )
582             {
583                 log.error( "Could not open ObjectInputStream on " + socket, e );
584 
585                 return;
586             }
587 
588             LateralElementDescriptor<K, V> led;
589 
590             try
591             {
592                 while ( true )
593                 {
594                     led = (LateralElementDescriptor<K, V>) ois.readObject();
595 
596                     if ( led == null )
597                     {
598                         log.debug( "LateralElementDescriptor is null" );
599                         continue;
600                     }
601                     if ( led.requesterId == getListenerId() )
602                     {
603                         log.debug( "from self" );
604                     }
605                     else
606                     {
607                         if ( log.isDebugEnabled() )
608                         {
609                             log.debug( "receiving LateralElementDescriptor from another" + "led = " + led
610                                 + ", led.command = " + led.command + ", led.ce = " + led.ce );
611                         }
612 
613                         handle( led );
614                     }
615                 }
616             }
617             catch ( java.io.EOFException e )
618             {
619                 log.info( "Caught java.io.EOFException closing connection." + e.getMessage() );
620             }
621             catch ( java.net.SocketException e )
622             {
623                 log.info( "Caught java.net.SocketException closing connection." + e.getMessage() );
624             }
625             catch ( Exception e )
626             {
627                 log.error( "Unexpected exception.", e );
628             }
629 
630             try
631             {
632                 ois.close();
633             }
634             catch ( IOException e )
635             {
636                 log.error( "Could not close object input stream.", e );
637             }
638         }
639 
640         /**
641          * This calls the appropriate method, based on the command sent in the Lateral element
642          * descriptor.
643          * <p>
644          * @param led
645          * @throws IOException
646          */
647         private void handle( LateralElementDescriptor<K, V> led )
648             throws IOException
649         {
650             String cacheName = led.ce.getCacheName();
651             K key = led.ce.getKey();
652 
653             if ( led.command == LateralCommand.UPDATE )
654             {
655                 handlePut( led.ce );
656             }
657             else if ( led.command == LateralCommand.REMOVE )
658             {
659                 // if a hashcode was given and filtering is on
660                 // check to see if they are the same
661                 // if so, then don't remvoe, otherwise issue a remove
662                 if ( led.valHashCode != -1 )
663                 {
664                     if ( getTcpLateralCacheAttributes().isFilterRemoveByHashCode() )
665                     {
666                         ICacheElement<K, V> test = getCache( cacheName ).localGet( key );
667                         if ( test != null )
668                         {
669                             if ( test.getVal().hashCode() == led.valHashCode )
670                             {
671                                 if ( log.isDebugEnabled() )
672                                 {
673                                     log.debug( "Filtering detected identical hashCode [" + led.valHashCode
674                                         + "], not issuing a remove for led " + led );
675                                 }
676                                 return;
677                             }
678                             else
679                             {
680                                 if ( log.isDebugEnabled() )
681                                 {
682                                     log.debug( "Different hashcodes, in cache [" + test.getVal().hashCode()
683                                         + "] sent [" + led.valHashCode + "]" );
684                                 }
685                             }
686                         }
687                     }
688                 }
689                 handleRemove( cacheName, key );
690             }
691             else if ( led.command == LateralCommand.REMOVEALL )
692             {
693                 handleRemoveAll( cacheName );
694             }
695             else if ( led.command == LateralCommand.GET )
696             {
697                 Serializable obj = handleGet( cacheName, key );
698 
699                 ObjectOutputStream oos = new ObjectOutputStream( socket.getOutputStream() );
700                 oos.writeObject( obj );
701                 oos.flush();
702             }
703             else if ( led.command == LateralCommand.GET_MATCHING )
704             {
705                 Map<K, ICacheElement<K, V>> obj = handleGetMatching( cacheName, (String) key );
706 
707                 ObjectOutputStream oos = new ObjectOutputStream( socket.getOutputStream() );
708                 oos.writeObject( obj );
709                 oos.flush();
710             }
711             else if ( led.command == LateralCommand.GET_GROUP_KEYS )
712             {
713             	String groupName = (String) key;
714             	Set<K> obj = handleGetGroupKeys(cacheName, groupName);
715 
716                 ObjectOutputStream oos = new ObjectOutputStream( socket.getOutputStream() );
717                 oos.writeObject( obj );
718                 oos.flush();
719             }
720             else if ( led.command == LateralCommand.GET_GROUP_NAMES )
721             {
722             	Set<String> obj = handleGetGroupNames(cacheName);
723 
724                 ObjectOutputStream oos = new ObjectOutputStream( socket.getOutputStream() );
725                 oos.writeObject( obj );
726                 oos.flush();
727             }
728         }
729     }
730 
731     /**
732      * Allows us to set the daemon status on the executor threads
733      * <p>
734      * @author Aaron Smuts
735      */
736     protected static class MyThreadFactory
737         implements ThreadFactory
738     {
739         /**
740          * @param runner
741          * @return daemon thread
742          */
743         public Thread newThread( Runnable runner )
744         {
745             Thread t = new Thread( runner );
746             t.setDaemon( true );
747             t.setPriority( Thread.MIN_PRIORITY );
748             return t;
749         }
750     }
751 
752     /**
753      * Shuts down the receiver.
754      */
755     public void shutdown()
756     {
757         if ( !shutdown )
758         {
759             shutdown = true;
760 
761             if ( log.isInfoEnabled() )
762             {
763                 log.info( "Shutting down TCP Lateral receiver." );
764             }
765             receiver.interrupt();
766         }
767         else
768         {
769             if ( log.isDebugEnabled() )
770             {
771                 log.debug( "Shutdown already called." );
772             }
773         }
774     }
775 }