001package org.apache.commons.jcs.auxiliary.lateral.socket.tcp; 002 003/* 004 * Licensed to the Apache Software Foundation (ASF) under one 005 * or more contributor license agreements. See the NOTICE file 006 * distributed with this work for additional information 007 * regarding copyright ownership. The ASF licenses this file 008 * to you under the Apache License, Version 2.0 (the 009 * "License"); you may not use this file except in compliance 010 * with the License. You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, 015 * software distributed under the License is distributed on an 016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 017 * KIND, either express or implied. See the License for the 018 * specific language governing permissions and limitations 019 * under the License. 020 */ 021 022import java.io.EOFException; 023import java.io.IOException; 024import java.io.ObjectInputStream; 025import java.io.ObjectOutputStream; 026import java.io.Serializable; 027import java.net.InetAddress; 028import java.net.ServerSocket; 029import java.net.Socket; 030import java.net.SocketException; 031import java.net.SocketTimeoutException; 032import java.util.HashMap; 033import java.util.Map; 034import java.util.Set; 035import java.util.concurrent.ExecutorService; 036import java.util.concurrent.Executors; 037import java.util.concurrent.atomic.AtomicBoolean; 038 039import org.apache.commons.jcs.access.exception.CacheException; 040import org.apache.commons.jcs.auxiliary.lateral.LateralElementDescriptor; 041import org.apache.commons.jcs.auxiliary.lateral.behavior.ILateralCacheListener; 042import org.apache.commons.jcs.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes; 043import org.apache.commons.jcs.engine.CacheInfo; 044import org.apache.commons.jcs.engine.behavior.ICacheElement; 045import org.apache.commons.jcs.engine.behavior.ICompositeCacheManager; 046import org.apache.commons.jcs.engine.behavior.IShutdownObserver; 047import org.apache.commons.jcs.engine.control.CompositeCache; 048import org.apache.commons.jcs.engine.control.CompositeCacheManager; 049import org.apache.commons.jcs.io.ObjectInputStreamClassLoaderAware; 050import org.apache.commons.jcs.utils.threadpool.DaemonThreadFactory; 051import org.apache.commons.logging.Log; 052import org.apache.commons.logging.LogFactory; 053 054/** 055 * Listens for connections from other TCP lateral caches and handles them. The initialization method 056 * starts a listening thread, which creates a socket server. When messages are received they are 057 * passed to a pooled executor which then calls the appropriate handle method. 058 */ 059public class LateralTCPListener<K, V> 060 implements ILateralCacheListener<K, V>, IShutdownObserver 061{ 062 /** The logger */ 063 private static final Log log = LogFactory.getLog( LateralTCPListener.class ); 064 065 /** How long the server will block on an accept(). 0 is infinite. */ 066 private static final int acceptTimeOut = 1000; 067 068 /** The CacheHub this listener is associated with */ 069 private transient ICompositeCacheManager cacheManager; 070 071 /** Map of available instances, keyed by port */ 072 private static final HashMap<String, ILateralCacheListener<?, ?>> instances = 073 new HashMap<String, ILateralCacheListener<?, ?>>(); 074 075 /** The socket listener */ 076 private ListenerThread receiver; 077 078 /** Configuration attributes */ 079 private ITCPLateralCacheAttributes tcpLateralCacheAttributes; 080 081 /** Listening port */ 082 private int port; 083 084 /** The processor. We should probably use an event queue here. */ 085 private ExecutorService pooledExecutor; 086 087 /** put count */ 088 private int putCnt = 0; 089 090 /** remove count */ 091 private int removeCnt = 0; 092 093 /** get count */ 094 private int getCnt = 0; 095 096 /** 097 * Use the vmid by default. This can be set for testing. If we ever need to run more than one 098 * per vm, then we need a new technique. 099 */ 100 private long listenerId = CacheInfo.listenerId; 101 102 /** is this shut down? */ 103 private AtomicBoolean shutdown; 104 105 /** is this terminated? */ 106 private AtomicBoolean terminated; 107 108 /** 109 * Gets the instance attribute of the LateralCacheTCPListener class. 110 * <p> 111 * @param ilca ITCPLateralCacheAttributes 112 * @param cacheMgr 113 * @return The instance value 114 */ 115 public synchronized static <K, V> LateralTCPListener<K, V> 116 getInstance( ITCPLateralCacheAttributes ilca, ICompositeCacheManager cacheMgr ) 117 { 118 @SuppressWarnings("unchecked") // Need to cast because of common map for all instances 119 LateralTCPListener<K, V> ins = (LateralTCPListener<K, V>) instances.get( String.valueOf( ilca.getTcpListenerPort() ) ); 120 121 if ( ins == null ) 122 { 123 ins = new LateralTCPListener<K, V>( ilca ); 124 125 ins.init(); 126 ins.setCacheManager( cacheMgr ); 127 128 instances.put( String.valueOf( ilca.getTcpListenerPort() ), ins ); 129 130 if ( log.isInfoEnabled() ) 131 { 132 log.info( "Created new listener " + ilca.getTcpListenerPort() ); 133 } 134 } 135 136 return ins; 137 } 138 139 /** 140 * Only need one since it does work for all regions, just reference by multiple region names. 141 * <p> 142 * @param ilca 143 */ 144 protected LateralTCPListener( ITCPLateralCacheAttributes ilca ) 145 { 146 this.setTcpLateralCacheAttributes( ilca ); 147 } 148 149 /** 150 * This starts the ListenerThread on the specified port. 151 */ 152 @Override 153 public synchronized void init() 154 { 155 try 156 { 157 this.port = getTcpLateralCacheAttributes().getTcpListenerPort(); 158 159 pooledExecutor = Executors.newCachedThreadPool( 160 new DaemonThreadFactory("JCS-LateralTCPListener-")); 161 terminated = new AtomicBoolean(false); 162 shutdown = new AtomicBoolean(false); 163 164 log.info( "Listening on port " + port ); 165 166 ServerSocket serverSocket = new ServerSocket( port ); 167 serverSocket.setSoTimeout( acceptTimeOut ); 168 169 receiver = new ListenerThread(serverSocket); 170 receiver.setDaemon( true ); 171 receiver.start(); 172 } 173 catch ( IOException ex ) 174 { 175 throw new IllegalStateException( ex ); 176 } 177 } 178 179 /** 180 * Let the lateral cache set a listener_id. Since there is only one listener for all the 181 * regions and every region gets registered? the id shouldn't be set if it isn't zero. If it is 182 * we assume that it is a reconnect. 183 * <p> 184 * By default, the listener id is the vmid. 185 * <p> 186 * The service should set this value. This value will never be changed by a server we connect 187 * to. It needs to be non static, for unit tests. 188 * <p> 189 * The service will use the value it sets in all send requests to the sender. 190 * <p> 191 * @param id The new listenerId value 192 * @throws IOException 193 */ 194 @Override 195 public void setListenerId( long id ) 196 throws IOException 197 { 198 this.listenerId = id; 199 if ( log.isDebugEnabled() ) 200 { 201 log.debug( "set listenerId = " + id ); 202 } 203 } 204 205 /** 206 * Gets the listenerId attribute of the LateralCacheTCPListener object 207 * <p> 208 * @return The listenerId value 209 * @throws IOException 210 */ 211 @Override 212 public long getListenerId() 213 throws IOException 214 { 215 return this.listenerId; 216 } 217 218 /** 219 * Increments the put count. Gets the cache that was injected by the lateral factory. Calls put 220 * on the cache. 221 * <p> 222 * @see org.apache.commons.jcs.engine.behavior.ICacheListener#handlePut(org.apache.commons.jcs.engine.behavior.ICacheElement) 223 */ 224 @Override 225 public void handlePut( ICacheElement<K, V> element ) 226 throws IOException 227 { 228 putCnt++; 229 if ( log.isInfoEnabled() ) 230 { 231 if ( getPutCnt() % 100 == 0 ) 232 { 233 log.info( "Put Count (port " + getTcpLateralCacheAttributes().getTcpListenerPort() + ") = " 234 + getPutCnt() ); 235 } 236 } 237 238 if ( log.isDebugEnabled() ) 239 { 240 log.debug( "handlePut> cacheName=" + element.getCacheName() + ", key=" + element.getKey() ); 241 } 242 243 getCache( element.getCacheName() ).localUpdate( element ); 244 } 245 246 /** 247 * Increments the remove count. Gets the cache that was injected by the lateral factory. Calls 248 * remove on the cache. 249 * <p> 250 * @see org.apache.commons.jcs.engine.behavior.ICacheListener#handleRemove(java.lang.String, 251 * Object) 252 */ 253 @Override 254 public void handleRemove( String cacheName, K key ) 255 throws IOException 256 { 257 removeCnt++; 258 if ( log.isInfoEnabled() ) 259 { 260 if ( getRemoveCnt() % 100 == 0 ) 261 { 262 log.info( "Remove Count = " + getRemoveCnt() ); 263 } 264 } 265 266 if ( log.isDebugEnabled() ) 267 { 268 log.debug( "handleRemove> cacheName=" + cacheName + ", key=" + key ); 269 } 270 271 getCache( cacheName ).localRemove( key ); 272 } 273 274 /** 275 * Gets the cache that was injected by the lateral factory. Calls removeAll on the cache. 276 * <p> 277 * @see org.apache.commons.jcs.engine.behavior.ICacheListener#handleRemoveAll(java.lang.String) 278 */ 279 @Override 280 public void handleRemoveAll( String cacheName ) 281 throws IOException 282 { 283 if ( log.isDebugEnabled() ) 284 { 285 log.debug( "handleRemoveAll> cacheName=" + cacheName ); 286 } 287 288 getCache( cacheName ).localRemoveAll(); 289 } 290 291 /** 292 * Gets the cache that was injected by the lateral factory. Calls get on the cache. 293 * <p> 294 * @param cacheName 295 * @param key 296 * @return a ICacheElement 297 * @throws IOException 298 */ 299 public ICacheElement<K, V> handleGet( String cacheName, K key ) 300 throws IOException 301 { 302 getCnt++; 303 if ( log.isInfoEnabled() ) 304 { 305 if ( getGetCnt() % 100 == 0 ) 306 { 307 log.info( "Get Count (port " + getTcpLateralCacheAttributes().getTcpListenerPort() + ") = " 308 + getGetCnt() ); 309 } 310 } 311 312 if ( log.isDebugEnabled() ) 313 { 314 log.debug( "handleGet> cacheName=" + cacheName + ", key = " + key ); 315 } 316 317 return getCache( cacheName ).localGet( key ); 318 } 319 320 /** 321 * Gets the cache that was injected by the lateral factory. Calls get on the cache. 322 * <p> 323 * @param cacheName the name of the cache 324 * @param pattern the matching pattern 325 * @return Map 326 * @throws IOException 327 */ 328 public Map<K, ICacheElement<K, V>> handleGetMatching( String cacheName, String pattern ) 329 throws IOException 330 { 331 getCnt++; 332 if ( log.isInfoEnabled() ) 333 { 334 if ( getGetCnt() % 100 == 0 ) 335 { 336 log.info( "GetMatching Count (port " + getTcpLateralCacheAttributes().getTcpListenerPort() + ") = " 337 + getGetCnt() ); 338 } 339 } 340 341 if ( log.isDebugEnabled() ) 342 { 343 log.debug( "handleGetMatching> cacheName=" + cacheName + ", pattern = " + pattern ); 344 } 345 346 return getCache( cacheName ).localGetMatching( pattern ); 347 } 348 349 /** 350 * Gets the cache that was injected by the lateral factory. Calls getKeySet on the cache. 351 * <p> 352 * @param cacheName the name of the cache 353 * @return a set of keys 354 * @throws IOException 355 */ 356 public Set<K> handleGetKeySet( String cacheName ) throws IOException 357 { 358 return getCache( cacheName ).getKeySet(true); 359 } 360 361 /** 362 * This marks this instance as terminated. 363 * <p> 364 * @see org.apache.commons.jcs.engine.behavior.ICacheListener#handleDispose(java.lang.String) 365 */ 366 @Override 367 public void handleDispose( String cacheName ) 368 throws IOException 369 { 370 if ( log.isInfoEnabled() ) 371 { 372 log.info( "handleDispose > cacheName=" + cacheName + " | Ignoring message. Do not dispose from remote." ); 373 } 374 375 // TODO handle active deregistration, rather than passive detection 376 terminated.set(true); 377 } 378 379 @Override 380 public synchronized void dispose() 381 { 382 terminated.set(true); 383 notify(); 384 385 pooledExecutor.shutdownNow(); 386 } 387 388 /** 389 * Gets the cacheManager attribute of the LateralCacheTCPListener object. 390 * <p> 391 * Normally this is set by the factory. If it wasn't set the listener defaults to the expected 392 * singleton behavior of the cache manager. 393 * <p> 394 * @param name 395 * @return CompositeCache 396 */ 397 protected CompositeCache<K, V> getCache( String name ) 398 { 399 if ( getCacheManager() == null ) 400 { 401 // revert to singleton on failure 402 try 403 { 404 setCacheManager( CompositeCacheManager.getInstance() ); 405 } 406 catch (CacheException e) 407 { 408 throw new RuntimeException("Could not retrieve cache manager instance", e); 409 } 410 411 if ( log.isDebugEnabled() ) 412 { 413 log.debug( "cacheMgr = " + getCacheManager() ); 414 } 415 } 416 417 return getCacheManager().getCache( name ); 418 } 419 420 /** 421 * This is roughly the number of updates the lateral has received. 422 * <p> 423 * @return Returns the putCnt. 424 */ 425 public int getPutCnt() 426 { 427 return putCnt; 428 } 429 430 /** 431 * @return Returns the getCnt. 432 */ 433 public int getGetCnt() 434 { 435 return getCnt; 436 } 437 438 /** 439 * @return Returns the removeCnt. 440 */ 441 public int getRemoveCnt() 442 { 443 return removeCnt; 444 } 445 446 /** 447 * @param cacheMgr The cacheMgr to set. 448 */ 449 @Override 450 public void setCacheManager( ICompositeCacheManager cacheMgr ) 451 { 452 this.cacheManager = cacheMgr; 453 } 454 455 /** 456 * @return Returns the cacheMgr. 457 */ 458 @Override 459 public ICompositeCacheManager getCacheManager() 460 { 461 return cacheManager; 462 } 463 464 /** 465 * @param tcpLateralCacheAttributes The tcpLateralCacheAttributes to set. 466 */ 467 public void setTcpLateralCacheAttributes( ITCPLateralCacheAttributes tcpLateralCacheAttributes ) 468 { 469 this.tcpLateralCacheAttributes = tcpLateralCacheAttributes; 470 } 471 472 /** 473 * @return Returns the tcpLateralCacheAttributes. 474 */ 475 public ITCPLateralCacheAttributes getTcpLateralCacheAttributes() 476 { 477 return tcpLateralCacheAttributes; 478 } 479 480 /** 481 * Processes commands from the server socket. There should be one listener for each configured 482 * TCP lateral. 483 */ 484 public class ListenerThread 485 extends Thread 486 { 487 /** The socket listener */ 488 private final ServerSocket serverSocket; 489 490 /** 491 * Constructor 492 * 493 * @param serverSocket 494 */ 495 public ListenerThread(ServerSocket serverSocket) 496 { 497 super(); 498 this.serverSocket = serverSocket; 499 } 500 501 /** Main processing method for the ListenerThread object */ 502 @SuppressWarnings("synthetic-access") 503 @Override 504 public void run() 505 { 506 try 507 { 508 ConnectionHandler handler; 509 510 outer: while ( true ) 511 { 512 if ( log.isDebugEnabled() ) 513 { 514 log.debug( "Waiting for clients to connect " ); 515 } 516 517 Socket socket = null; 518 inner: while (true) 519 { 520 // Check to see if we've been asked to exit, and exit 521 if (terminated.get()) 522 { 523 if (log.isDebugEnabled()) 524 { 525 log.debug("Thread terminated, exiting gracefully"); 526 } 527 break outer; 528 } 529 530 try 531 { 532 socket = serverSocket.accept(); 533 break inner; 534 } 535 catch (SocketTimeoutException e) 536 { 537 // No problem! We loop back up! 538 continue inner; 539 } 540 } 541 542 if ( socket != null && log.isDebugEnabled() ) 543 { 544 InetAddress inetAddress = socket.getInetAddress(); 545 log.debug( "Connected to client at " + inetAddress ); 546 } 547 548 handler = new ConnectionHandler( socket ); 549 pooledExecutor.execute( handler ); 550 } 551 } 552 catch ( IOException e ) 553 { 554 log.error( "Exception caught in TCP listener", e ); 555 } 556 finally 557 { 558 if (serverSocket != null) 559 { 560 try 561 { 562 serverSocket.close(); 563 } 564 catch (IOException e) 565 { 566 log.error( "Exception caught closing socket", e ); 567 } 568 } 569 } 570 } 571 } 572 573 /** 574 * A Separate thread that runs when a command comes into the LateralTCPReceiver. 575 */ 576 public class ConnectionHandler 577 implements Runnable 578 { 579 /** The socket connection, passed in via constructor */ 580 private final Socket socket; 581 582 /** 583 * Construct for a given socket 584 * @param socket 585 */ 586 public ConnectionHandler( Socket socket ) 587 { 588 this.socket = socket; 589 } 590 591 /** 592 * Main processing method for the LateralTCPReceiverConnection object 593 */ 594 @Override 595 @SuppressWarnings({"unchecked", // Need to cast from Object 596 "synthetic-access" }) 597 public void run() 598 { 599 ObjectInputStream ois; 600 601 try 602 { 603 ois = new ObjectInputStreamClassLoaderAware( socket.getInputStream(), null ); 604 } 605 catch ( Exception e ) 606 { 607 log.error( "Could not open ObjectInputStream on " + socket, e ); 608 609 return; 610 } 611 612 LateralElementDescriptor<K, V> led; 613 614 try 615 { 616 while ( true ) 617 { 618 led = (LateralElementDescriptor<K, V>) ois.readObject(); 619 620 if ( led == null ) 621 { 622 log.debug( "LateralElementDescriptor is null" ); 623 continue; 624 } 625 if ( led.requesterId == getListenerId() ) 626 { 627 log.debug( "from self" ); 628 } 629 else 630 { 631 if ( log.isDebugEnabled() ) 632 { 633 log.debug( "receiving LateralElementDescriptor from another" + "led = " + led 634 + ", led.command = " + led.command + ", led.ce = " + led.ce ); 635 } 636 637 handle( led ); 638 } 639 } 640 } 641 catch ( EOFException e ) 642 { 643 log.info( "Caught java.io.EOFException closing connection." + e.getMessage() ); 644 } 645 catch ( SocketException e ) 646 { 647 log.info( "Caught java.net.SocketException closing connection." + e.getMessage() ); 648 } 649 catch ( Exception e ) 650 { 651 log.error( "Unexpected exception.", e ); 652 } 653 654 try 655 { 656 ois.close(); 657 } 658 catch ( IOException e ) 659 { 660 log.error( "Could not close object input stream.", e ); 661 } 662 } 663 664 /** 665 * This calls the appropriate method, based on the command sent in the Lateral element 666 * descriptor. 667 * <p> 668 * @param led 669 * @throws IOException 670 */ 671 @SuppressWarnings("synthetic-access") 672 private void handle( LateralElementDescriptor<K, V> led ) 673 throws IOException 674 { 675 String cacheName = led.ce.getCacheName(); 676 K key = led.ce.getKey(); 677 Serializable obj = null; 678 679 switch (led.command) 680 { 681 case UPDATE: 682 handlePut( led.ce ); 683 break; 684 685 case REMOVE: 686 // if a hashcode was given and filtering is on 687 // check to see if they are the same 688 // if so, then don't remove, otherwise issue a remove 689 if ( led.valHashCode != -1 ) 690 { 691 if ( getTcpLateralCacheAttributes().isFilterRemoveByHashCode() ) 692 { 693 ICacheElement<K, V> test = getCache( cacheName ).localGet( key ); 694 if ( test != null ) 695 { 696 if ( test.getVal().hashCode() == led.valHashCode ) 697 { 698 if ( log.isDebugEnabled() ) 699 { 700 log.debug( "Filtering detected identical hashCode [" + led.valHashCode 701 + "], not issuing a remove for led " + led ); 702 } 703 return; 704 } 705 else 706 { 707 if ( log.isDebugEnabled() ) 708 { 709 log.debug( "Different hashcodes, in cache [" + test.getVal().hashCode() 710 + "] sent [" + led.valHashCode + "]" ); 711 } 712 } 713 } 714 } 715 } 716 handleRemove( cacheName, key ); 717 break; 718 719 case REMOVEALL: 720 handleRemoveAll( cacheName ); 721 break; 722 723 case GET: 724 obj = handleGet( cacheName, key ); 725 break; 726 727 case GET_MATCHING: 728 obj = (Serializable) handleGetMatching( cacheName, (String) key ); 729 break; 730 731 case GET_KEYSET: 732 obj = (Serializable) handleGetKeySet(cacheName); 733 break; 734 735 default: break; 736 } 737 738 if (obj != null) 739 { 740 ObjectOutputStream oos = new ObjectOutputStream( socket.getOutputStream() ); 741 oos.writeObject( obj ); 742 oos.flush(); 743 } 744 } 745 } 746 747 /** 748 * Shuts down the receiver. 749 */ 750 @Override 751 public void shutdown() 752 { 753 if ( shutdown.compareAndSet(false, true) ) 754 { 755 if ( log.isInfoEnabled() ) 756 { 757 log.info( "Shutting down TCP Lateral receiver." ); 758 } 759 760 receiver.interrupt(); 761 } 762 else 763 { 764 if ( log.isDebugEnabled() ) 765 { 766 log.debug( "Shutdown already called." ); 767 } 768 } 769 } 770}