001 package org.apache.jcs.auxiliary.lateral.socket.tcp;
002
003 /*
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements. See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership. The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License. You may obtain a copy of the License at
011 *
012 * http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing,
015 * software distributed under the License is distributed on an
016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017 * KIND, either express or implied. See the License for the
018 * specific language governing permissions and limitations
019 * under the License.
020 */
021
022 import java.io.IOException;
023 import java.io.ObjectInputStream;
024 import java.io.ObjectOutputStream;
025 import java.io.Serializable;
026 import java.net.InetAddress;
027 import java.net.ServerSocket;
028 import java.net.Socket;
029 import java.net.SocketTimeoutException;
030 import java.util.HashMap;
031 import java.util.Map;
032 import java.util.Set;
033 import java.util.concurrent.ExecutorService;
034 import java.util.concurrent.Executors;
035 import java.util.concurrent.ThreadFactory;
036
037 import org.apache.commons.logging.Log;
038 import org.apache.commons.logging.LogFactory;
039 import org.apache.jcs.access.exception.CacheException;
040 import org.apache.jcs.auxiliary.lateral.LateralCacheInfo;
041 import org.apache.jcs.auxiliary.lateral.LateralCommand;
042 import org.apache.jcs.auxiliary.lateral.LateralElementDescriptor;
043 import org.apache.jcs.auxiliary.lateral.behavior.ILateralCacheListener;
044 import org.apache.jcs.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
045 import org.apache.jcs.engine.behavior.ICacheElement;
046 import org.apache.jcs.engine.behavior.ICompositeCacheManager;
047 import org.apache.jcs.engine.behavior.IShutdownObserver;
048 import org.apache.jcs.engine.control.CompositeCache;
049 import org.apache.jcs.engine.control.CompositeCacheManager;
050
051 /**
052 * Listens for connections from other TCP lateral caches and handles them. The initialization method
053 * starts a listening thread, which creates a socket server. When messages are received they are
054 * passed to a pooled executor which then calls the appropriate handle method.
055 */
056 public class LateralTCPListener<K extends Serializable, V extends Serializable>
057 implements ILateralCacheListener<K, V>, Serializable, IShutdownObserver
058 {
059 /** Don't change. */
060 private static final long serialVersionUID = -9107062664967131738L;
061
062 /** The logger */
063 protected final static Log log = LogFactory.getLog( LateralTCPListener.class );
064
065 /** How long the server will block on an accept(). 0 is infinite. */
066 private final static int acceptTimeOut = 1000;
067
068 /** The CacheHub this listener is associated with */
069 private transient ICompositeCacheManager cacheManager;
070
071 /** Map of available instances, keyed by port */
072 protected final static HashMap<String, ILateralCacheListener<?, ?>> instances =
073 new HashMap<String, ILateralCacheListener<?, ?>>();
074
075 /** The socket listener */
076 private ListenerThread receiver;
077
078 /** Configuration attributes */
079 private ITCPLateralCacheAttributes tcpLateralCacheAttributes;
080
081 /** Listening port */
082 protected int port;
083
084 /** The processor. We should probably use an event queue here. */
085 protected ExecutorService pooledExecutor;
086
087 /** put count */
088 private int putCnt = 0;
089
090 /** remove count */
091 private int removeCnt = 0;
092
093 /** get count */
094 private int getCnt = 0;
095
096 /**
097 * Use the vmid by default. This can be set for testing. If we ever need to run more than one
098 * per vm, then we need a new technique.
099 */
100 private long listenerId = LateralCacheInfo.listenerId;
101
102 /** is this shut down? */
103 protected boolean shutdown = false;
104
105 /** is this terminated? */
106 protected boolean terminated = false;
107
108 /**
109 * Gets the instance attribute of the LateralCacheTCPListener class.
110 * <p>
111 * @param ilca ITCPLateralCacheAttributes
112 * @param cacheMgr
113 * @return The instance value
114 */
115 public synchronized static <K extends Serializable, V extends Serializable> LateralTCPListener<K, V>
116 getInstance( ITCPLateralCacheAttributes ilca, ICompositeCacheManager cacheMgr )
117 {
118 @SuppressWarnings("unchecked") // Need to cast because of common map for all instances
119 LateralTCPListener<K, V> ins = (LateralTCPListener<K, V>) instances.get( String.valueOf( ilca.getTcpListenerPort() ) );
120
121 if ( ins == null )
122 {
123 ins = new LateralTCPListener<K, V>( ilca );
124
125 ins.init();
126
127 ins.setCacheManager( cacheMgr );
128
129 instances.put( String.valueOf( ilca.getTcpListenerPort() ), ins );
130
131 if ( log.isInfoEnabled() )
132 {
133 log.info( "Created new listener " + ilca.getTcpListenerPort() );
134 }
135 }
136
137 return ins;
138 }
139
140 /**
141 * Only need one since it does work for all regions, just reference by multiple region names.
142 * <p>
143 * @param ilca
144 */
145 protected LateralTCPListener( ITCPLateralCacheAttributes ilca )
146 {
147 this.setTcpLateralCacheAttributes( ilca );
148 }
149
150 /**
151 * This starts the ListenerThread on the specified port.
152 */
153 public void init()
154 {
155 try
156 {
157 this.port = getTcpLateralCacheAttributes().getTcpListenerPort();
158
159 receiver = new ListenerThread();
160 receiver.setDaemon( true );
161 receiver.start();
162
163 pooledExecutor = Executors.newCachedThreadPool(new MyThreadFactory());
164 terminated = false;
165 shutdown = false;
166 }
167 catch ( Exception ex )
168 {
169 log.error( ex );
170 throw new IllegalStateException( ex.getMessage() );
171 }
172 }
173
174 /**
175 * Let the lateral cache set a listener_id. Since there is only one listener for all the
176 * regions and every region gets registered? the id shouldn't be set if it isn't zero. If it is
177 * we assume that it is a reconnect.
178 * <p>
179 * By default, the listener id is the vmid.
180 * <p>
181 * The service should set this value. This value will never be changed by a server we connect
182 * to. It needs to be non static, for unit tests.
183 * <p>
184 * The service will use the value it sets in all send requests to the sender.
185 * <p>
186 * @param id The new listenerId value
187 * @throws IOException
188 */
189 public void setListenerId( long id )
190 throws IOException
191 {
192 this.listenerId = id;
193 if ( log.isDebugEnabled() )
194 {
195 log.debug( "set listenerId = " + id );
196 }
197 }
198
199 /**
200 * Gets the listenerId attribute of the LateralCacheTCPListener object
201 * <p>
202 * @return The listenerId value
203 * @throws IOException
204 */
205 public long getListenerId()
206 throws IOException
207 {
208 return this.listenerId;
209 }
210
211 /**
212 * Increments the put count. Gets the cache that was injected by the lateral factory. Calls put
213 * on the cache.
214 * <p>
215 * @see org.apache.jcs.engine.behavior.ICacheListener#handlePut(org.apache.jcs.engine.behavior.ICacheElement)
216 */
217 public void handlePut( ICacheElement<K, V> element )
218 throws IOException
219 {
220 putCnt++;
221 if ( log.isInfoEnabled() )
222 {
223 if ( getPutCnt() % 100 == 0 )
224 {
225 log.info( "Put Count (port " + getTcpLateralCacheAttributes().getTcpListenerPort() + ") = "
226 + getPutCnt() );
227 }
228 }
229
230 if ( log.isDebugEnabled() )
231 {
232 log.debug( "handlePut> cacheName=" + element.getCacheName() + ", key=" + element.getKey() );
233 }
234
235 getCache( element.getCacheName() ).localUpdate( element );
236 }
237
238 /**
239 * Increments the remove count. Gets the cache that was injected by the lateral factory. Calls
240 * remove on the cache.
241 * <p>
242 * @see org.apache.jcs.engine.behavior.ICacheListener#handleRemove(java.lang.String,
243 * java.io.Serializable)
244 */
245 public void handleRemove( String cacheName, K key )
246 throws IOException
247 {
248 removeCnt++;
249 if ( log.isInfoEnabled() )
250 {
251 if ( getRemoveCnt() % 100 == 0 )
252 {
253 log.info( "Remove Count = " + getRemoveCnt() );
254 }
255 }
256
257 if ( log.isDebugEnabled() )
258 {
259 log.debug( "handleRemove> cacheName=" + cacheName + ", key=" + key );
260 }
261
262 getCache( cacheName ).localRemove( key );
263 }
264
265 /**
266 * Gets the cache that was injected by the lateral factory. Calls removeAll on the cache.
267 * <p>
268 * @see org.apache.jcs.engine.behavior.ICacheListener#handleRemoveAll(java.lang.String)
269 */
270 public void handleRemoveAll( String cacheName )
271 throws IOException
272 {
273 if ( log.isDebugEnabled() )
274 {
275 log.debug( "handleRemoveAll> cacheName=" + cacheName );
276 }
277
278 getCache( cacheName ).localRemoveAll();
279 }
280
281 /**
282 * Gets the cache that was injected by the lateral factory. Calls get on the cache.
283 * <p>
284 * @param cacheName
285 * @param key
286 * @return a ICacheElement
287 * @throws IOException
288 */
289 public ICacheElement<K, V> handleGet( String cacheName, K key )
290 throws IOException
291 {
292 getCnt++;
293 if ( log.isInfoEnabled() )
294 {
295 if ( getGetCnt() % 100 == 0 )
296 {
297 log.info( "Get Count (port " + getTcpLateralCacheAttributes().getTcpListenerPort() + ") = "
298 + getGetCnt() );
299 }
300 }
301
302 if ( log.isDebugEnabled() )
303 {
304 log.debug( "handleGet> cacheName=" + cacheName + ", key = " + key );
305 }
306
307 return getCache( cacheName ).localGet( key );
308 }
309
310 /**
311 * Gets the cache that was injected by the lateral factory. Calls get on the cache.
312 * <p>
313 * @param cacheName the name of the cache
314 * @param pattern the matching pattern
315 * @return Map
316 * @throws IOException
317 */
318 public Map<K, ICacheElement<K, V>> handleGetMatching( String cacheName, String pattern )
319 throws IOException
320 {
321 getCnt++;
322 if ( log.isInfoEnabled() )
323 {
324 if ( getGetCnt() % 100 == 0 )
325 {
326 log.info( "GetMatching Count (port " + getTcpLateralCacheAttributes().getTcpListenerPort() + ") = "
327 + getGetCnt() );
328 }
329 }
330
331 if ( log.isDebugEnabled() )
332 {
333 log.debug( "handleGetMatching> cacheName=" + cacheName + ", pattern = " + pattern );
334 }
335
336 return getCache( cacheName ).localGetMatching( pattern );
337 }
338
339 /**
340 * Gets the cache that was injected by the lateral factory. Calls getGroupKeys on the cache.
341 * <p>
342 * @param cacheName the name of the cache
343 * @param group the group name
344 * @return a set of keys
345 * @throws IOException
346 */
347 public Set<K> handleGetGroupKeys( String cacheName, String group ) throws IOException
348 {
349 return getCache( cacheName ).getGroupKeys(group, true);
350 }
351
352 /**
353 * Gets the cache that was injected by the lateral factory. Calls getGroupNames on the cache.
354 * <p>
355 * @param cacheName the name of the cache
356 * @return a set of group names
357 * @throws IOException
358 */
359 public Set<String> handleGetGroupNames( String cacheName ) throws IOException
360 {
361 return getCache( cacheName ).getGroupNames(true);
362 }
363
364 /**
365 * This marks this instance as terminated.
366 * <p>
367 * @see org.apache.jcs.engine.behavior.ICacheListener#handleDispose(java.lang.String)
368 */
369 public void handleDispose( String cacheName )
370 throws IOException
371 {
372 if ( log.isInfoEnabled() )
373 {
374 log.info( "handleDispose > cacheName=" + cacheName + " | Ignoring message. Do not dispose from remote." );
375 }
376
377 // TODO handle active deregistration, rather than passive detection
378 synchronized (this)
379 {
380 terminated = true;
381 }
382 }
383
384 public synchronized void dispose()
385 {
386 terminated = true;
387 notify();
388 }
389
390 /**
391 * Gets the cacheManager attribute of the LateralCacheTCPListener object.
392 * <p>
393 * Normally this is set by the factory. If it wasn't set the listener defaults to the expected
394 * singleton behavior of the cache manager.
395 * <p>
396 * @param name
397 * @return CompositeCache
398 */
399 protected CompositeCache<K, V> getCache( String name )
400 {
401 if ( getCacheManager() == null )
402 {
403 // revert to singleton on failure
404 try
405 {
406 setCacheManager( CompositeCacheManager.getInstance() );
407 }
408 catch (CacheException e)
409 {
410 throw new RuntimeException("Could not retrieve cache manager instance", e);
411 }
412
413 if ( log.isDebugEnabled() )
414 {
415 log.debug( "cacheMgr = " + getCacheManager() );
416 }
417 }
418
419 return getCacheManager().getCache( name );
420 }
421
422 /**
423 * This is roughly the number of updates the lateral has received.
424 * <p>
425 * @return Returns the putCnt.
426 */
427 public int getPutCnt()
428 {
429 return putCnt;
430 }
431
432 /**
433 * @return Returns the getCnt.
434 */
435 public int getGetCnt()
436 {
437 return getCnt;
438 }
439
440 /**
441 * @return Returns the removeCnt.
442 */
443 public int getRemoveCnt()
444 {
445 return removeCnt;
446 }
447
448 /**
449 * @param cacheMgr The cacheMgr to set.
450 */
451 public void setCacheManager( ICompositeCacheManager cacheMgr )
452 {
453 this.cacheManager = cacheMgr;
454 }
455
456 /**
457 * @return Returns the cacheMgr.
458 */
459 public ICompositeCacheManager getCacheManager()
460 {
461 return cacheManager;
462 }
463
464 /**
465 * @param tcpLateralCacheAttributes The tcpLateralCacheAttributes to set.
466 */
467 public void setTcpLateralCacheAttributes( ITCPLateralCacheAttributes tcpLateralCacheAttributes )
468 {
469 this.tcpLateralCacheAttributes = tcpLateralCacheAttributes;
470 }
471
472 /**
473 * @return Returns the tcpLateralCacheAttributes.
474 */
475 public ITCPLateralCacheAttributes getTcpLateralCacheAttributes()
476 {
477 return tcpLateralCacheAttributes;
478 }
479
480 /**
481 * Processes commands from the server socket. There should be one listener for each configured
482 * TCP lateral.
483 */
484 public class ListenerThread
485 extends Thread
486 {
487 /** Main processing method for the ListenerThread object */
488 @Override
489 public void run()
490 {
491 try
492 {
493 log.info( "Listening on port " + port );
494
495 ServerSocket serverSocket = new ServerSocket( port );
496 serverSocket.setSoTimeout( acceptTimeOut );
497
498 ConnectionHandler handler;
499
500 outer: while ( true )
501 {
502 if ( log.isDebugEnabled() )
503 {
504 log.debug( "Waiting for clients to connect " );
505 }
506
507 Socket socket = null;
508 inner: while (true)
509 {
510 // Check to see if we've been asked to exit, and exit
511 synchronized (this)
512 {
513 if (terminated)
514 {
515 if (log.isDebugEnabled())
516 {
517 log.debug("Thread terminated, exiting gracefully");
518 }
519 break outer;
520 }
521 }
522 try
523 {
524 socket = serverSocket.accept();
525 break inner;
526 }
527 catch (SocketTimeoutException e)
528 {
529 // No problem! We loop back up!
530 continue inner;
531 }
532 }
533
534 if ( socket != null && log.isDebugEnabled() )
535 {
536 InetAddress inetAddress = socket.getInetAddress();
537 log.debug( "Connected to client at " + inetAddress );
538 }
539
540 handler = new ConnectionHandler( socket );
541 pooledExecutor.execute( handler );
542 }
543 }
544 catch ( Exception e )
545 {
546 log.error( "Exception caught in TCP listener", e );
547 }
548 }
549 }
550
551 /**
552 * A Separate thread that runs when a command comes into the LateralTCPReceiver.
553 */
554 public class ConnectionHandler
555 implements Runnable
556 {
557 /** The socket connection, passed in via constructor */
558 private final Socket socket;
559
560 /**
561 * Construct for a given socket
562 * @param socket
563 */
564 public ConnectionHandler( Socket socket )
565 {
566 this.socket = socket;
567 }
568
569 /**
570 * Main processing method for the LateralTCPReceiverConnection object
571 */
572 @SuppressWarnings("unchecked") // Nee to cast from Object
573 public void run()
574 {
575 ObjectInputStream ois;
576
577 try
578 {
579 ois = new ObjectInputStream( socket.getInputStream() );
580 }
581 catch ( Exception e )
582 {
583 log.error( "Could not open ObjectInputStream on " + socket, e );
584
585 return;
586 }
587
588 LateralElementDescriptor<K, V> led;
589
590 try
591 {
592 while ( true )
593 {
594 led = (LateralElementDescriptor<K, V>) ois.readObject();
595
596 if ( led == null )
597 {
598 log.debug( "LateralElementDescriptor is null" );
599 continue;
600 }
601 if ( led.requesterId == getListenerId() )
602 {
603 log.debug( "from self" );
604 }
605 else
606 {
607 if ( log.isDebugEnabled() )
608 {
609 log.debug( "receiving LateralElementDescriptor from another" + "led = " + led
610 + ", led.command = " + led.command + ", led.ce = " + led.ce );
611 }
612
613 handle( led );
614 }
615 }
616 }
617 catch ( java.io.EOFException e )
618 {
619 log.info( "Caught java.io.EOFException closing connection." + e.getMessage() );
620 }
621 catch ( java.net.SocketException e )
622 {
623 log.info( "Caught java.net.SocketException closing connection." + e.getMessage() );
624 }
625 catch ( Exception e )
626 {
627 log.error( "Unexpected exception.", e );
628 }
629
630 try
631 {
632 ois.close();
633 }
634 catch ( IOException e )
635 {
636 log.error( "Could not close object input stream.", e );
637 }
638 }
639
640 /**
641 * This calls the appropriate method, based on the command sent in the Lateral element
642 * descriptor.
643 * <p>
644 * @param led
645 * @throws IOException
646 */
647 private void handle( LateralElementDescriptor<K, V> led )
648 throws IOException
649 {
650 String cacheName = led.ce.getCacheName();
651 K key = led.ce.getKey();
652
653 if ( led.command == LateralCommand.UPDATE )
654 {
655 handlePut( led.ce );
656 }
657 else if ( led.command == LateralCommand.REMOVE )
658 {
659 // if a hashcode was given and filtering is on
660 // check to see if they are the same
661 // if so, then don't remvoe, otherwise issue a remove
662 if ( led.valHashCode != -1 )
663 {
664 if ( getTcpLateralCacheAttributes().isFilterRemoveByHashCode() )
665 {
666 ICacheElement<K, V> test = getCache( cacheName ).localGet( key );
667 if ( test != null )
668 {
669 if ( test.getVal().hashCode() == led.valHashCode )
670 {
671 if ( log.isDebugEnabled() )
672 {
673 log.debug( "Filtering detected identical hashCode [" + led.valHashCode
674 + "], not issuing a remove for led " + led );
675 }
676 return;
677 }
678 else
679 {
680 if ( log.isDebugEnabled() )
681 {
682 log.debug( "Different hashcodes, in cache [" + test.getVal().hashCode()
683 + "] sent [" + led.valHashCode + "]" );
684 }
685 }
686 }
687 }
688 }
689 handleRemove( cacheName, key );
690 }
691 else if ( led.command == LateralCommand.REMOVEALL )
692 {
693 handleRemoveAll( cacheName );
694 }
695 else if ( led.command == LateralCommand.GET )
696 {
697 Serializable obj = handleGet( cacheName, key );
698
699 ObjectOutputStream oos = new ObjectOutputStream( socket.getOutputStream() );
700 oos.writeObject( obj );
701 oos.flush();
702 }
703 else if ( led.command == LateralCommand.GET_MATCHING )
704 {
705 Map<K, ICacheElement<K, V>> obj = handleGetMatching( cacheName, (String) key );
706
707 ObjectOutputStream oos = new ObjectOutputStream( socket.getOutputStream() );
708 oos.writeObject( obj );
709 oos.flush();
710 }
711 else if ( led.command == LateralCommand.GET_GROUP_KEYS )
712 {
713 String groupName = (String) key;
714 Set<K> obj = handleGetGroupKeys(cacheName, groupName);
715
716 ObjectOutputStream oos = new ObjectOutputStream( socket.getOutputStream() );
717 oos.writeObject( obj );
718 oos.flush();
719 }
720 else if ( led.command == LateralCommand.GET_GROUP_NAMES )
721 {
722 Set<String> obj = handleGetGroupNames(cacheName);
723
724 ObjectOutputStream oos = new ObjectOutputStream( socket.getOutputStream() );
725 oos.writeObject( obj );
726 oos.flush();
727 }
728 }
729 }
730
731 /**
732 * Allows us to set the daemon status on the executor threads
733 * <p>
734 * @author Aaron Smuts
735 */
736 protected static class MyThreadFactory
737 implements ThreadFactory
738 {
739 /**
740 * @param runner
741 * @return daemon thread
742 */
743 public Thread newThread( Runnable runner )
744 {
745 Thread t = new Thread( runner );
746 t.setDaemon( true );
747 t.setPriority( Thread.MIN_PRIORITY );
748 return t;
749 }
750 }
751
752 /**
753 * Shuts down the receiver.
754 */
755 public void shutdown()
756 {
757 if ( !shutdown )
758 {
759 shutdown = true;
760
761 if ( log.isInfoEnabled() )
762 {
763 log.info( "Shutting down TCP Lateral receiver." );
764 }
765 receiver.interrupt();
766 }
767 else
768 {
769 if ( log.isDebugEnabled() )
770 {
771 log.debug( "Shutdown already called." );
772 }
773 }
774 }
775 }