001package org.apache.commons.jcs3.auxiliary.lateral.socket.tcp; 002 003/* 004 * Licensed to the Apache Software Foundation (ASF) under one 005 * or more contributor license agreements. See the NOTICE file 006 * distributed with this work for additional information 007 * regarding copyright ownership. The ASF licenses this file 008 * to you under the Apache License, Version 2.0 (the 009 * "License"); you may not use this file except in compliance 010 * with the License. You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, 015 * software distributed under the License is distributed on an 016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 017 * KIND, either express or implied. See the License for the 018 * specific language governing permissions and limitations 019 * under the License. 020 */ 021 022import java.io.IOException; 023import java.io.InputStream; 024import java.io.OutputStream; 025import java.net.InetSocketAddress; 026import java.net.ServerSocket; 027import java.net.Socket; 028import java.net.SocketAddress; 029import java.nio.channels.SelectionKey; 030import java.nio.channels.Selector; 031import java.nio.channels.ServerSocketChannel; 032import java.nio.channels.SocketChannel; 033import java.util.Iterator; 034import java.util.Map; 035import java.util.Set; 036import java.util.concurrent.ConcurrentHashMap; 037import java.util.concurrent.atomic.AtomicBoolean; 038 039import org.apache.commons.jcs3.auxiliary.lateral.LateralElementDescriptor; 040import org.apache.commons.jcs3.auxiliary.lateral.behavior.ILateralCacheListener; 041import org.apache.commons.jcs3.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes; 042import org.apache.commons.jcs3.engine.CacheInfo; 043import org.apache.commons.jcs3.engine.behavior.ICacheElement; 044import org.apache.commons.jcs3.engine.behavior.ICompositeCacheManager; 045import org.apache.commons.jcs3.engine.behavior.IElementSerializer; 046import org.apache.commons.jcs3.engine.behavior.IShutdownObserver; 047import org.apache.commons.jcs3.engine.control.CompositeCache; 048import org.apache.commons.jcs3.log.Log; 049import org.apache.commons.jcs3.log.LogManager; 050import org.apache.commons.jcs3.utils.serialization.StandardSerializer; 051 052/** 053 * Listens for connections from other TCP lateral caches and handles them. The initialization method 054 * starts a listening thread, which creates a socket server. When messages are received they are 055 * passed to a pooled executor which then calls the appropriate handle method. 056 */ 057public class LateralTCPListener<K, V> 058 implements ILateralCacheListener<K, V>, IShutdownObserver 059{ 060 /** The logger */ 061 private static final Log log = LogManager.getLog( LateralTCPListener.class ); 062 063 /** How long the server will block on an accept(). 0 is infinite. */ 064 private static final int acceptTimeOut = 1000; 065 066 /** The CacheHub this listener is associated with */ 067 private transient ICompositeCacheManager cacheManager; 068 069 /** Map of available instances, keyed by port */ 070 private static final ConcurrentHashMap<String, ILateralCacheListener<?, ?>> instances = 071 new ConcurrentHashMap<>(); 072 073 /** Configuration attributes */ 074 private ITCPLateralCacheAttributes tcpLateralCacheAttributes; 075 076 /** The listener thread */ 077 private Thread listenerThread; 078 079 /** 080 * Serializer for reading and writing 081 */ 082 private IElementSerializer serializer; 083 084 /** put count */ 085 private int putCnt; 086 087 /** remove count */ 088 private int removeCnt; 089 090 /** get count */ 091 private int getCnt; 092 093 /** 094 * Use the vmid by default. This can be set for testing. If we ever need to run more than one 095 * per vm, then we need a new technique. 096 */ 097 private long listenerId = CacheInfo.listenerId; 098 099 /** is this shut down? */ 100 private final AtomicBoolean shutdown = new AtomicBoolean(); 101 102 /** is this terminated? */ 103 private final AtomicBoolean terminated = new AtomicBoolean(); 104 105 /** 106 * Gets the instance attribute of the LateralCacheTCPListener class. 107 * <p> 108 * @param ilca ITCPLateralCacheAttributes 109 * @param cacheMgr 110 * @return The instance value 111 * @deprecated Specify serializer 112 */ 113 @Deprecated 114 @SuppressWarnings("unchecked") // Need to cast because of common map for all instances 115 public static <K, V> LateralTCPListener<K, V> 116 getInstance( final ITCPLateralCacheAttributes ilca, final ICompositeCacheManager cacheMgr) 117 { 118 return (LateralTCPListener<K, V>) instances.computeIfAbsent( 119 String.valueOf( ilca.getTcpListenerPort() ), 120 k -> { 121 final LateralTCPListener<K, V> newIns = new LateralTCPListener<>( ilca, new StandardSerializer() ); 122 123 newIns.init(); 124 newIns.setCacheManager( cacheMgr ); 125 126 log.info("Created new listener {0}", ilca::getTcpListenerPort); 127 128 return newIns; 129 }); 130 } 131 132 /** 133 * Gets the instance attribute of the LateralCacheTCPListener class. 134 * <p> 135 * @param ilca ITCPLateralCacheAttributes 136 * @param cacheMgr 137 * @param serializer the serializer to use when receiving 138 * @return The instance value 139 */ 140 @SuppressWarnings("unchecked") // Need to cast because of common map for all instances 141 public static <K, V> LateralTCPListener<K, V> 142 getInstance( final ITCPLateralCacheAttributes ilca, final ICompositeCacheManager cacheMgr, final IElementSerializer serializer ) 143 { 144 return (LateralTCPListener<K, V>) instances.computeIfAbsent( 145 String.valueOf( ilca.getTcpListenerPort() ), 146 k -> { 147 final LateralTCPListener<K, V> newIns = new LateralTCPListener<>( ilca, serializer ); 148 149 newIns.init(); 150 newIns.setCacheManager( cacheMgr ); 151 152 log.info("Created new listener {0}", ilca::getTcpListenerPort); 153 154 return newIns; 155 }); 156 } 157 158 /** 159 * Only need one since it does work for all regions, just reference by multiple region names. 160 * <p> 161 * @param ilca ITCPLateralCacheAttributes 162 * @deprecated Specify serializer 163 */ 164 @Deprecated 165 protected LateralTCPListener( final ITCPLateralCacheAttributes ilca ) 166 { 167 this(ilca, new StandardSerializer()); 168 } 169 170 /** 171 * Only need one since it does work for all regions, just reference by multiple region names. 172 * <p> 173 * @param ilca ITCPLateralCacheAttributes 174 * @param serializer the serializer to use when receiving 175 */ 176 protected LateralTCPListener( final ITCPLateralCacheAttributes ilca, final IElementSerializer serializer ) 177 { 178 this.setTcpLateralCacheAttributes( ilca ); 179 this.serializer = serializer; 180 } 181 182 /** 183 * This starts the ListenerThread on the specified port. 184 */ 185 @Override 186 public synchronized void init() 187 { 188 try 189 { 190 final int port = getTcpLateralCacheAttributes().getTcpListenerPort(); 191 final String host = getTcpLateralCacheAttributes().getTcpListenerHost(); 192 193 terminated.set(false); 194 shutdown.set(false); 195 196 final ServerSocketChannel serverSocket = ServerSocketChannel.open(); 197 198 SocketAddress endPoint; 199 200 if (host != null && !host.isEmpty()) 201 { 202 log.info( "Listening on {0}:{1}", host, port ); 203 //Bind the SocketAddress with host and port 204 endPoint = new InetSocketAddress(host, port); 205 } 206 else 207 { 208 log.info( "Listening on port {0}", port ); 209 endPoint = new InetSocketAddress(port); 210 } 211 212 serverSocket.bind(endPoint); 213 serverSocket.configureBlocking(false); 214 215 listenerThread = new Thread(() -> runListener(serverSocket), 216 "JCS-LateralTCPListener-" + host + ":" + port); 217 listenerThread.setDaemon(true); 218 listenerThread.start(); 219 } 220 catch ( final IOException ex ) 221 { 222 throw new IllegalStateException( ex ); 223 } 224 } 225 226 /** 227 * Let the lateral cache set a listener_id. Since there is only one listener for all the 228 * regions and every region gets registered? the id shouldn't be set if it isn't zero. If it is 229 * we assume that it is a reconnect. 230 * <p> 231 * By default, the listener id is the vmid. 232 * <p> 233 * The service should set this value. This value will never be changed by a server we connect 234 * to. It needs to be non static, for unit tests. 235 * <p> 236 * The service will use the value it sets in all send requests to the sender. 237 * <p> 238 * @param id The new listenerId value 239 * @throws IOException 240 */ 241 @Override 242 public void setListenerId( final long id ) 243 throws IOException 244 { 245 this.listenerId = id; 246 log.debug( "set listenerId = {0}", id ); 247 } 248 249 /** 250 * Gets the listenerId attribute of the LateralCacheTCPListener object 251 * <p> 252 * @return The listenerId value 253 * @throws IOException 254 */ 255 @Override 256 public long getListenerId() 257 throws IOException 258 { 259 return this.listenerId; 260 } 261 262 /** 263 * Increments the put count. Gets the cache that was injected by the lateral factory. Calls put 264 * on the cache. 265 * <p> 266 * @see org.apache.commons.jcs3.engine.behavior.ICacheListener#handlePut(org.apache.commons.jcs3.engine.behavior.ICacheElement) 267 */ 268 @Override 269 public void handlePut( final ICacheElement<K, V> element ) 270 throws IOException 271 { 272 putCnt++; 273 if ( log.isInfoEnabled() && getPutCnt() % 100 == 0 ) 274 { 275 log.info( "Put Count (port {0}) = {1}", 276 () -> getTcpLateralCacheAttributes().getTcpListenerPort(), 277 this::getPutCnt); 278 } 279 280 log.debug( "handlePut> cacheName={0}, key={1}", 281 element::getCacheName, element::getKey); 282 283 getCache( element.getCacheName() ).localUpdate( element ); 284 } 285 286 /** 287 * Increments the remove count. Gets the cache that was injected by the lateral factory. Calls 288 * remove on the cache. 289 * <p> 290 * @see org.apache.commons.jcs3.engine.behavior.ICacheListener#handleRemove(String, 291 * Object) 292 */ 293 @Override 294 public void handleRemove( final String cacheName, final K key ) 295 throws IOException 296 { 297 removeCnt++; 298 if ( log.isInfoEnabled() && getRemoveCnt() % 100 == 0 ) 299 { 300 log.info( "Remove Count = {0}", this::getRemoveCnt); 301 } 302 303 log.debug( "handleRemove> cacheName={0}, key={1}", cacheName, key ); 304 305 getCache( cacheName ).localRemove( key ); 306 } 307 308 /** 309 * Gets the cache that was injected by the lateral factory. Calls removeAll on the cache. 310 * <p> 311 * @see org.apache.commons.jcs3.engine.behavior.ICacheListener#handleRemoveAll(String) 312 */ 313 @Override 314 public void handleRemoveAll( final String cacheName ) 315 throws IOException 316 { 317 log.debug( "handleRemoveAll> cacheName={0}", cacheName ); 318 319 getCache( cacheName ).localRemoveAll(); 320 } 321 322 /** 323 * Gets the cache that was injected by the lateral factory. Calls get on the cache. 324 * <p> 325 * @param cacheName 326 * @param key 327 * @return a ICacheElement 328 * @throws IOException 329 */ 330 public ICacheElement<K, V> handleGet( final String cacheName, final K key ) 331 throws IOException 332 { 333 getCnt++; 334 if ( log.isInfoEnabled() && getGetCnt() % 100 == 0 ) 335 { 336 log.info( "Get Count (port {0}) = {1}", 337 () -> getTcpLateralCacheAttributes().getTcpListenerPort(), 338 this::getGetCnt); 339 } 340 341 log.debug( "handleGet> cacheName={0}, key={1}", cacheName, key ); 342 343 return getCache( cacheName ).localGet( key ); 344 } 345 346 /** 347 * Gets the cache that was injected by the lateral factory. Calls get on the cache. 348 * <p> 349 * @param cacheName the name of the cache 350 * @param pattern the matching pattern 351 * @return Map 352 * @throws IOException 353 */ 354 public Map<K, ICacheElement<K, V>> handleGetMatching( final String cacheName, final String pattern ) 355 throws IOException 356 { 357 getCnt++; 358 if ( log.isInfoEnabled() && getGetCnt() % 100 == 0 ) 359 { 360 log.info( "GetMatching Count (port {0}) = {1}", 361 () -> getTcpLateralCacheAttributes().getTcpListenerPort(), 362 this::getGetCnt); 363 } 364 365 log.debug( "handleGetMatching> cacheName={0}, pattern={1}", cacheName, pattern ); 366 367 return getCache( cacheName ).localGetMatching( pattern ); 368 } 369 370 /** 371 * Gets the cache that was injected by the lateral factory. Calls getKeySet on the cache. 372 * <p> 373 * @param cacheName the name of the cache 374 * @return a set of keys 375 * @throws IOException 376 */ 377 public Set<K> handleGetKeySet( final String cacheName ) throws IOException 378 { 379 return getCache( cacheName ).getKeySet(true); 380 } 381 382 /** 383 * This marks this instance as terminated. 384 * <p> 385 * @see org.apache.commons.jcs3.engine.behavior.ICacheListener#handleDispose(String) 386 */ 387 @Override 388 public void handleDispose( final String cacheName ) 389 throws IOException 390 { 391 log.info( "handleDispose > cacheName={0} | Ignoring message. " 392 + "Do not dispose from remote.", cacheName ); 393 394 // TODO handle active deregistration, rather than passive detection 395 dispose(); 396 } 397 398 @Override 399 public synchronized void dispose() 400 { 401 if (terminated.compareAndSet(false, true)) 402 { 403 notify(); 404 listenerThread.interrupt(); 405 } 406 } 407 408 /** 409 * Gets the cacheManager attribute of the LateralCacheTCPListener object. 410 * <p> 411 * Normally this is set by the factory. If it wasn't set the listener defaults to the expected 412 * singleton behavior of the cache manager. 413 * <p> 414 * @param name 415 * @return CompositeCache 416 */ 417 protected CompositeCache<K, V> getCache( final String name ) 418 { 419 return getCacheManager().getCache( name ); 420 } 421 422 /** 423 * This is roughly the number of updates the lateral has received. 424 * <p> 425 * @return Returns the putCnt. 426 */ 427 public int getPutCnt() 428 { 429 return putCnt; 430 } 431 432 /** 433 * @return Returns the getCnt. 434 */ 435 public int getGetCnt() 436 { 437 return getCnt; 438 } 439 440 /** 441 * @return Returns the removeCnt. 442 */ 443 public int getRemoveCnt() 444 { 445 return removeCnt; 446 } 447 448 /** 449 * @param cacheMgr The cacheMgr to set. 450 */ 451 @Override 452 public void setCacheManager( final ICompositeCacheManager cacheMgr ) 453 { 454 this.cacheManager = cacheMgr; 455 } 456 457 /** 458 * @return Returns the cacheMgr. 459 */ 460 @Override 461 public ICompositeCacheManager getCacheManager() 462 { 463 return cacheManager; 464 } 465 466 /** 467 * @param tcpLateralCacheAttributes The tcpLateralCacheAttributes to set. 468 */ 469 public void setTcpLateralCacheAttributes( final ITCPLateralCacheAttributes tcpLateralCacheAttributes ) 470 { 471 this.tcpLateralCacheAttributes = tcpLateralCacheAttributes; 472 } 473 474 /** 475 * @return Returns the tcpLateralCacheAttributes. 476 */ 477 public ITCPLateralCacheAttributes getTcpLateralCacheAttributes() 478 { 479 return tcpLateralCacheAttributes; 480 } 481 482 /** 483 * Processes commands from the server socket. There should be one listener for each configured 484 * TCP lateral. 485 * @deprecated No longer used 486 */ 487 @Deprecated 488 public class ListenerThread 489 extends Thread 490 { 491 /** The socket listener */ 492 private final ServerSocket serverSocket; 493 494 /** 495 * Constructor 496 * 497 * @param serverSocket 498 */ 499 public ListenerThread(final ServerSocket serverSocket) 500 { 501 this.serverSocket = serverSocket; 502 } 503 504 /** Main processing method for the ListenerThread object */ 505 @Override 506 public void run() 507 { 508 runListener(serverSocket.getChannel()); 509 } 510 } 511 512 /** 513 * Processes commands from the server socket. There should be one listener for each configured 514 * TCP lateral. 515 */ 516 private void runListener(final ServerSocketChannel serverSocket) 517 { 518 try (Selector selector = Selector.open()) 519 { 520 serverSocket.register(selector, SelectionKey.OP_ACCEPT); 521 log.debug("Waiting for clients to connect"); 522 523 // Check to see if we've been asked to exit, and exit 524 while (!terminated.get()) 525 { 526 int activeKeys = selector.select(acceptTimeOut); 527 if (activeKeys == 0) 528 { 529 continue; 530 } 531 532 for (Iterator<SelectionKey> i = selector.selectedKeys().iterator(); i.hasNext();) 533 { 534 if (terminated.get()) 535 { 536 break; 537 } 538 539 SelectionKey key = i.next(); 540 541 if (!key.isValid()) 542 { 543 continue; 544 } 545 546 if (key.isAcceptable()) 547 { 548 ServerSocketChannel server = (ServerSocketChannel) key.channel(); 549 SocketChannel client = server.accept(); 550 if (client == null) 551 { 552 //may happen in non-blocking mode 553 continue; 554 } 555 556 log.info("Connected to client at {0}", client.getRemoteAddress()); 557 558 client.configureBlocking(false); 559 client.register(selector, SelectionKey.OP_READ); 560 } 561 562 if (key.isReadable()) 563 { 564 handleClient(key); 565 } 566 567 i.remove(); 568 } 569 } 570 571 log.debug("Thread terminated, exiting gracefully"); 572 573 //close all registered channels 574 selector.keys().forEach(key -> { 575 try 576 { 577 key.channel().close(); 578 } 579 catch (IOException e) 580 { 581 log.warn("Problem closing channel", e); 582 } 583 }); 584 } 585 catch (final IOException e) 586 { 587 log.error( "Exception caught in TCP listener", e ); 588 } 589 finally 590 { 591 try 592 { 593 serverSocket.close(); 594 } 595 catch (IOException e) 596 { 597 log.error( "Exception closing TCP listener", e ); 598 } 599 } 600 } 601 602 /** 603 * A Separate thread that runs when a command comes into the LateralTCPReceiver. 604 * @deprecated No longer used 605 */ 606 @Deprecated 607 public class ConnectionHandler 608 implements Runnable 609 { 610 /** The socket connection, passed in via constructor */ 611 private final Socket socket; 612 613 /** 614 * Construct for a given socket 615 * @param socket 616 */ 617 public ConnectionHandler( final Socket socket ) 618 { 619 this.socket = socket; 620 } 621 622 /** 623 * Main processing method for the LateralTCPReceiverConnection object 624 */ 625 @Override 626 public void run() 627 { 628 try (InputStream is = socket.getInputStream()) 629 { 630 while ( true ) 631 { 632 final LateralElementDescriptor<K, V> led = 633 serializer.deSerializeFrom(is, null); 634 635 if ( led == null ) 636 { 637 log.debug( "LateralElementDescriptor is null" ); 638 continue; 639 } 640 if ( led.getRequesterId() == getListenerId() ) 641 { 642 log.debug( "from self" ); 643 } 644 else 645 { 646 log.debug( "receiving LateralElementDescriptor from another led = {0}", 647 led ); 648 649 Object obj = handleElement(led); 650 if (obj != null) 651 { 652 OutputStream os = socket.getOutputStream(); 653 serializer.serializeTo(obj, os); 654 os.flush(); 655 } 656 } 657 } 658 } 659 catch (final IOException e) 660 { 661 log.info("Caught {0}, closing connection.", e.getClass().getSimpleName(), e); 662 } 663 catch (final ClassNotFoundException e) 664 { 665 log.error( "Deserialization failed reading from socket", e ); 666 } 667 } 668 } 669 670 /** 671 * A Separate thread that runs when a command comes into the LateralTCPReceiver. 672 */ 673 private void handleClient(final SelectionKey key) 674 { 675 final SocketChannel socketChannel = (SocketChannel) key.channel(); 676 677 try 678 { 679 final LateralElementDescriptor<K, V> led = 680 serializer.deSerializeFrom(socketChannel, null); 681 682 if ( led == null ) 683 { 684 log.debug("LateralElementDescriptor is null"); 685 return; 686 } 687 688 if ( led.getRequesterId() == getListenerId() ) 689 { 690 log.debug( "from self" ); 691 } 692 else 693 { 694 log.debug( "receiving LateralElementDescriptor from another led = {0}", 695 led ); 696 697 Object obj = handleElement(led); 698 if (obj != null) 699 { 700 serializer.serializeTo(obj, socketChannel); 701 } 702 } 703 } 704 catch (final IOException e) 705 { 706 log.info("Caught {0}, closing connection.", e.getClass().getSimpleName(), e); 707 try 708 { 709 socketChannel.close(); 710 } 711 catch (IOException e1) 712 { 713 log.error("Error while closing connection", e ); 714 } 715 } 716 catch (final ClassNotFoundException e) 717 { 718 log.error( "Deserialization failed reading from socket", e ); 719 } 720 } 721 722 /** 723 * This calls the appropriate method, based on the command sent in the Lateral element 724 * descriptor. 725 * <p> 726 * @param led the lateral element 727 * @return a possible response 728 * @throws IOException 729 */ 730 private Object handleElement(final LateralElementDescriptor<K, V> led) throws IOException 731 { 732 final String cacheName = led.getPayload().getCacheName(); 733 final K key = led.getPayload().getKey(); 734 Object obj = null; 735 736 switch (led.getCommand()) 737 { 738 case UPDATE: 739 handlePut(led.getPayload()); 740 break; 741 742 case REMOVE: 743 // if a hash code was given and filtering is on 744 // check to see if they are the same 745 // if so, then don't remove, otherwise issue a remove 746 if (led.getValHashCode() != -1 && 747 getTcpLateralCacheAttributes().isFilterRemoveByHashCode()) 748 { 749 final ICacheElement<K, V> test = getCache( cacheName ).localGet( key ); 750 if ( test != null ) 751 { 752 if ( test.getVal().hashCode() == led.getValHashCode() ) 753 { 754 log.debug( "Filtering detected identical hashCode [{0}], " 755 + "not issuing a remove for led {1}", 756 led.getValHashCode(), led ); 757 return null; 758 } 759 log.debug( "Different hash codes, in cache [{0}] sent [{1}]", 760 test.getVal()::hashCode, led::getValHashCode ); 761 } 762 } 763 handleRemove( cacheName, key ); 764 break; 765 766 case REMOVEALL: 767 handleRemoveAll( cacheName ); 768 break; 769 770 case GET: 771 obj = handleGet( cacheName, key ); 772 break; 773 774 case GET_MATCHING: 775 obj = handleGetMatching( cacheName, (String) key ); 776 break; 777 778 case GET_KEYSET: 779 obj = handleGetKeySet(cacheName); 780 break; 781 782 default: break; 783 } 784 785 return obj; 786 } 787 788 /** 789 * Shuts down the receiver. 790 */ 791 @Override 792 public void shutdown() 793 { 794 if ( shutdown.compareAndSet(false, true) ) 795 { 796 log.info( "Shutting down TCP Lateral receiver." ); 797 dispose(); 798 } 799 else 800 { 801 log.debug( "Shutdown already called." ); 802 } 803 } 804}