001package org.apache.commons.jcs.auxiliary.remote.server; 002 003/* 004 * Licensed to the Apache Software Foundation (ASF) under one 005 * or more contributor license agreements. See the NOTICE file 006 * distributed with this work for additional information 007 * regarding copyright ownership. The ASF licenses this file 008 * to you under the Apache License, Version 2.0 (the 009 * "License"); you may not use this file except in compliance 010 * with the License. You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, 015 * software distributed under the License is distributed on an 016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 017 * KIND, either express or implied. See the License for the 018 * specific language governing permissions and limitations 019 * under the License. 020 */ 021 022import java.io.IOException; 023import java.io.Serializable; 024import java.rmi.RemoteException; 025import java.rmi.registry.Registry; 026import java.rmi.server.RMISocketFactory; 027import java.rmi.server.UnicastRemoteObject; 028import java.rmi.server.Unreferenced; 029import java.util.Collections; 030import java.util.Iterator; 031import java.util.Map; 032import java.util.Properties; 033import java.util.Set; 034import java.util.concurrent.ConcurrentHashMap; 035import java.util.concurrent.ConcurrentMap; 036import java.util.concurrent.locks.ReentrantLock; 037 038import org.apache.commons.jcs.access.exception.CacheException; 039import org.apache.commons.jcs.auxiliary.remote.RemoteUtils; 040import org.apache.commons.jcs.auxiliary.remote.behavior.IRemoteCacheListener; 041import org.apache.commons.jcs.auxiliary.remote.server.behavior.IRemoteCacheServer; 042import org.apache.commons.jcs.auxiliary.remote.server.behavior.IRemoteCacheServerAttributes; 043import org.apache.commons.jcs.auxiliary.remote.server.behavior.RemoteType; 044import org.apache.commons.jcs.engine.CacheEventQueueFactory; 045import org.apache.commons.jcs.engine.CacheListeners; 046import org.apache.commons.jcs.engine.behavior.ICacheElement; 047import org.apache.commons.jcs.engine.behavior.ICacheEventQueue; 048import org.apache.commons.jcs.engine.behavior.ICacheListener; 049import org.apache.commons.jcs.engine.control.CompositeCache; 050import org.apache.commons.jcs.engine.control.CompositeCacheManager; 051import org.apache.commons.jcs.engine.logging.CacheEvent; 052import org.apache.commons.jcs.engine.logging.behavior.ICacheEvent; 053import org.apache.commons.jcs.engine.logging.behavior.ICacheEventLogger; 054import org.apache.commons.logging.Log; 055import org.apache.commons.logging.LogFactory; 056 057/** 058 * This class provides remote cache services. The remote cache server propagates events from local 059 * caches to other local caches. It can also store cached data, making it available to new clients. 060 * <p> 061 * Remote cache servers can be clustered. If the cache used by this remote cache is configured to 062 * use a remote cache of type cluster, the two remote caches will communicate with each other. 063 * Remote and put requests can be sent from one remote to another. If they are configured to 064 * broadcast such event to their client, then remove an puts can be sent to all locals in the 065 * cluster. 066 * <p> 067 * Get requests are made between clustered servers if AllowClusterGet is true. You can setup several 068 * clients to use one remote server and several to use another. The get local will be distributed 069 * between the two servers. Since caches are usually high get and low put, this should allow you to 070 * scale. 071 */ 072public class RemoteCacheServer<K, V> 073 extends UnicastRemoteObject 074 implements IRemoteCacheServer<K, V>, Unreferenced 075{ 076 public static final String DFEAULT_REMOTE_CONFIGURATION_FILE = "/remote.cache.ccf"; 077 078 /** For serialization. Don't change. */ 079 private static final long serialVersionUID = -8072345435941473116L; 080 081 /** log instance */ 082 private static final Log log = LogFactory.getLog( RemoteCacheServer.class ); 083 084 /** timing -- if we should record operation times. */ 085 private static final boolean timing = true; 086 087 /** Number of puts into the cache. */ 088 private int puts = 0; 089 090 /** Maps cache name to CacheListeners object. association of listeners (regions). */ 091 private final transient ConcurrentMap<String, CacheListeners<K, V>> cacheListenersMap = 092 new ConcurrentHashMap<String, CacheListeners<K, V>>(); 093 094 /** maps cluster listeners to regions. */ 095 private final transient ConcurrentMap<String, CacheListeners<K, V>> clusterListenersMap = 096 new ConcurrentHashMap<String, CacheListeners<K, V>>(); 097 098 /** The central hub */ 099 private transient CompositeCacheManager cacheManager; 100 101 /** relates listener id with a type */ 102 private final ConcurrentMap<Long, RemoteType> idTypeMap = new ConcurrentHashMap<Long, RemoteType>(); 103 104 /** relates listener id with an ip address */ 105 private final ConcurrentMap<Long, String> idIPMap = new ConcurrentHashMap<Long, String>(); 106 107 /** Used to get the next listener id. */ 108 private final int[] listenerId = new int[1]; 109 110 /** Configuration settings. */ 111 // package protected for access by unit test code 112 final IRemoteCacheServerAttributes remoteCacheServerAttributes; 113 114 /** The interval at which we will log updates. */ 115 private final int logInterval = 100; 116 117 /** An optional event logger */ 118 private transient ICacheEventLogger cacheEventLogger; 119 120 /** Lock for Cache listener initialization */ 121 private ReentrantLock cacheListenersLock = new ReentrantLock(); 122 123 /** Lock for Cluster listener initialization */ 124 private ReentrantLock clusterListenersLock = new ReentrantLock(); 125 126 /** 127 * Constructor for the RemoteCacheServer object. This initializes the server with the values 128 * from the properties object. 129 * <p> 130 * @param rcsa 131 * @param config cache hub configuration 132 * @throws RemoteException 133 */ 134 protected RemoteCacheServer( IRemoteCacheServerAttributes rcsa, Properties config ) 135 throws RemoteException 136 { 137 super( rcsa.getServicePort() ); 138 this.remoteCacheServerAttributes = rcsa; 139 init( config ); 140 } 141 142 /** 143 * Constructor for the RemoteCacheServer object. This initializes the server with the values 144 * from the properties object. 145 * <p> 146 * @param rcsa 147 * @param config cache hub configuration 148 * @param customRMISocketFactory 149 * @throws RemoteException 150 */ 151 protected RemoteCacheServer( IRemoteCacheServerAttributes rcsa, Properties config, RMISocketFactory customRMISocketFactory ) 152 throws RemoteException 153 { 154 super( rcsa.getServicePort(), customRMISocketFactory, customRMISocketFactory ); 155 this.remoteCacheServerAttributes = rcsa; 156 init( config ); 157 } 158 159 /** 160 * Constructor for the RemoteCacheServer object. This initializes the server with the values 161 * from the config file. 162 * <p> 163 * @param rcsa 164 * @throws RemoteException 165 * 166 * @deprecated Use version with Properties object instead 167 */ 168 @Deprecated 169 protected RemoteCacheServer( IRemoteCacheServerAttributes rcsa ) 170 throws RemoteException 171 { 172 super( rcsa.getServicePort() ); 173 this.remoteCacheServerAttributes = rcsa; 174 init( rcsa.getConfigFileName() ); 175 } 176 177 /** 178 * Constructor for the RemoteCacheServer object. This initializes the server with the values 179 * from the config file. 180 * <p> 181 * @param rcsa 182 * @param customRMISocketFactory 183 * @throws RemoteException 184 * 185 * @deprecated Use version with Properties object instead 186 */ 187 @Deprecated 188 protected RemoteCacheServer( IRemoteCacheServerAttributes rcsa, RMISocketFactory customRMISocketFactory ) 189 throws RemoteException 190 { 191 super( rcsa.getServicePort(), customRMISocketFactory, customRMISocketFactory ); 192 this.remoteCacheServerAttributes = rcsa; 193 init( rcsa.getConfigFileName() ); 194 } 195 196 /** 197 * Initialize the RMI Cache Server from a properties file. 198 * <p> 199 * @param prop 200 * @throws RemoteException if the configuration of the cache manager instance fails 201 * 202 * @deprecated Use version with Properties parameter instead 203 */ 204 @Deprecated 205 private void init( String propFile ) throws RemoteException 206 { 207 String propFileName = propFile == null ? DFEAULT_REMOTE_CONFIGURATION_FILE : propFile; 208 209 Properties prop = null; 210 try 211 { 212 prop = RemoteUtils.loadProps(propFileName); 213 } 214 catch (IOException e) 215 { 216 throw new RemoteException(e.getMessage(), e); 217 } 218 219 init(prop); 220 } 221 222 /** 223 * Initialize the RMI Cache Server from a properties object. 224 * <p> 225 * @param prop the configuration properties 226 * @throws RemoteException if the configuration of the cache manager instance fails 227 */ 228 private void init( Properties prop ) throws RemoteException 229 { 230 try 231 { 232 cacheManager = createCacheManager( prop ); 233 } 234 catch (CacheException e) 235 { 236 throw new RemoteException(e.getMessage(), e); 237 } 238 239 // cacheManager would have created a number of ICache objects. 240 // Use these objects to set up the cacheListenersMap. 241 String[] list = cacheManager.getCacheNames(); 242 for ( int i = 0; i < list.length; i++ ) 243 { 244 String name = list[i]; 245 CompositeCache<K, V> cache = cacheManager.getCache( name ); 246 cacheListenersMap.put( name, new CacheListeners<K, V>( cache ) ); 247 } 248 } 249 250 /** 251 * Subclass can override this method to create the specific cache manager. 252 * <p> 253 * @param prop the configuration object. 254 * @return The cache hub configured with this configuration. 255 * 256 * @throws CacheException if the configuration cannot be loaded 257 */ 258 private CompositeCacheManager createCacheManager( Properties prop ) throws CacheException 259 { 260 CompositeCacheManager hub = CompositeCacheManager.getUnconfiguredInstance(); 261 hub.configure( prop ); 262 return hub; 263 } 264 265 /** 266 * Puts a cache bean to the remote cache and notifies all listeners which <br> 267 * <ol> 268 * <li>have a different listener id than the originating host;</li> 269 * <li>are currently subscribed to the related cache.</li> 270 * </ol> 271 * <p> 272 * @param item 273 * @throws IOException 274 */ 275 public void put( ICacheElement<K, V> item ) 276 throws IOException 277 { 278 update( item ); 279 } 280 281 /** 282 * @param item 283 * @throws IOException 284 */ 285 @Override 286 public void update( ICacheElement<K, V> item ) 287 throws IOException 288 { 289 update( item, 0 ); 290 } 291 292 /** 293 * The internal processing is wrapped in event logging calls. 294 * <p> 295 * @param item 296 * @param requesterId 297 * @throws IOException 298 */ 299 @Override 300 public void update( ICacheElement<K, V> item, long requesterId ) 301 throws IOException 302 { 303 ICacheEvent<ICacheElement<K, V>> cacheEvent = createICacheEvent( item, requesterId, ICacheEventLogger.UPDATE_EVENT ); 304 try 305 { 306 processUpdate( item, requesterId ); 307 } 308 finally 309 { 310 logICacheEvent( cacheEvent ); 311 } 312 } 313 314 /** 315 * An update can come from either a local cache's remote auxiliary, or it can come from a remote 316 * server. A remote server is considered a a source of type cluster. 317 * <p> 318 * If the update came from a cluster, then we should tell the cache manager that this was a 319 * remote put. This way, any lateral and remote auxiliaries configured for the region will not 320 * be updated. This is basically how a remote listener works when plugged into a local cache. 321 * <p> 322 * If the cluster is configured to keep local cluster consistency, then all listeners will be 323 * updated. This allows cluster server A to update cluster server B and then B to update its 324 * clients if it is told to keep local cluster consistency. Otherwise, server A will update 325 * server B and B will not tell its clients. If you cluster using lateral caches for instance, 326 * this is how it will work. Updates to a cluster node, will never get to the leaves. The remote 327 * cluster, with local cluster consistency, allows you to update leaves. This basically allows 328 * you to have a failover remote server. 329 * <p> 330 * Since currently a cluster will not try to get from other cluster servers, you can scale a bit 331 * with a cluster configuration. Puts and removes will be broadcasted to all clients, but the 332 * get load on a remote server can be reduced. 333 * <p> 334 * @param item 335 * @param requesterId 336 */ 337 private void processUpdate( ICacheElement<K, V> item, long requesterId ) 338 { 339 long start = 0; 340 if ( timing ) 341 { 342 start = System.currentTimeMillis(); 343 } 344 345 logUpdateInfo( item ); 346 347 try 348 { 349 CacheListeners<K, V> cacheDesc = getCacheListeners( item.getCacheName() ); 350 /* Object val = */item.getVal(); 351 352 boolean fromCluster = isRequestFromCluster( requesterId ); 353 354 if ( log.isDebugEnabled() ) 355 { 356 log.debug( "In update, requesterId = [" + requesterId + "] fromCluster = " + fromCluster ); 357 } 358 359 // ordered cache item update and notification. 360 synchronized ( cacheDesc ) 361 { 362 try 363 { 364 CompositeCache<K, V> c = (CompositeCache<K, V>) cacheDesc.cache; 365 366 // If the source of this request was not from a cluster, 367 // then consider it a local update. The cache manager will 368 // try to 369 // update all auxiliaries. 370 // 371 // This requires that two local caches not be connected to 372 // two clustered remote caches. The failover runner will 373 // have to make sure of this. ALos, the local cache needs 374 // avoid updating this source. Will need to pass the source 375 // id somehow. The remote cache should update all local 376 // caches 377 // but not update the cluster source. Cluster remote caches 378 // should only be updated by the server and not the 379 // RemoteCache. 380 if ( fromCluster ) 381 { 382 if ( log.isDebugEnabled() ) 383 { 384 log.debug( "Put FROM cluster, NOT updating other auxiliaries for region. " 385 + " requesterId [" + requesterId + "]" ); 386 } 387 c.localUpdate( item ); 388 } 389 else 390 { 391 if ( log.isDebugEnabled() ) 392 { 393 log.debug( "Put NOT from cluster, updating other auxiliaries for region. " 394 + " requesterId [" + requesterId + "]" ); 395 } 396 c.update( item ); 397 } 398 } 399 catch ( Exception ce ) 400 { 401 // swallow 402 if ( log.isInfoEnabled() ) 403 { 404 log.info( "Exception caught updating item. requesterId [" + requesterId + "] " 405 + ce.getMessage() ); 406 } 407 } 408 409 // UPDATE LOCALS IF A REQUEST COMES FROM A CLUSTER 410 // IF LOCAL CLUSTER CONSISTENCY IS CONFIGURED 411 if ( !fromCluster || ( fromCluster && remoteCacheServerAttributes.isLocalClusterConsistency() ) ) 412 { 413 ICacheEventQueue<K, V>[] qlist = getEventQList( cacheDesc, requesterId ); 414 if ( log.isDebugEnabled() ) 415 { 416 log.debug( "qlist.length = " + qlist.length ); 417 } 418 for ( int i = 0; i < qlist.length; i++ ) 419 { 420 qlist[i].addPutEvent( item ); 421 } 422 } 423 } 424 } 425 catch ( IOException e ) 426 { 427 if ( cacheEventLogger != null ) 428 { 429 cacheEventLogger.logError( "RemoteCacheServer", ICacheEventLogger.UPDATE_EVENT, e.getMessage() 430 + " REGION: " + item.getCacheName() + " ITEM: " + item ); 431 } 432 433 log.error( "Trouble in Update. requesterId [" + requesterId + "]", e ); 434 } 435 436 // TODO use JAMON for timing 437 if ( timing ) 438 { 439 long end = System.currentTimeMillis(); 440 if ( log.isDebugEnabled() ) 441 { 442 log.debug( "put took " + String.valueOf( end - start ) + " ms." ); 443 } 444 } 445 } 446 447 /** 448 * Log some details. 449 * <p> 450 * @param item 451 */ 452 private void logUpdateInfo( ICacheElement<K, V> item ) 453 { 454 // not thread safe, but it doesn't have to be 100% accurate 455 puts++; 456 457 if ( log.isInfoEnabled() ) 458 { 459 if ( puts % logInterval == 0 ) 460 { 461 log.info( "puts = " + puts ); 462 } 463 } 464 465 if ( log.isDebugEnabled() ) 466 { 467 log.debug( "In update, put [" + item.getKey() + "] in [" + item.getCacheName() + "]" ); 468 } 469 } 470 471 /** 472 * Returns a cache value from the specified remote cache; or null if the cache or key does not 473 * exist. 474 * <p> 475 * @param cacheName 476 * @param key 477 * @return ICacheElement 478 * @throws IOException 479 */ 480 @Override 481 public ICacheElement<K, V> get( String cacheName, K key ) 482 throws IOException 483 { 484 return this.get( cacheName, key, 0 ); 485 } 486 487 /** 488 * Returns a cache bean from the specified cache; or null if the key does not exist. 489 * <p> 490 * Adding the requestor id, allows the cache to determine the source of the get. 491 * <p> 492 * The internal processing is wrapped in event logging calls. 493 * <p> 494 * @param cacheName 495 * @param key 496 * @param requesterId 497 * @return ICacheElement 498 * @throws IOException 499 */ 500 @Override 501 public ICacheElement<K, V> get( String cacheName, K key, long requesterId ) 502 throws IOException 503 { 504 ICacheElement<K, V> element = null; 505 ICacheEvent<K> cacheEvent = createICacheEvent( cacheName, key, requesterId, ICacheEventLogger.GET_EVENT ); 506 try 507 { 508 element = processGet( cacheName, key, requesterId ); 509 } 510 finally 511 { 512 logICacheEvent( cacheEvent ); 513 } 514 return element; 515 } 516 517 /** 518 * Returns a cache bean from the specified cache; or null if the key does not exist. 519 * <p> 520 * Adding the requester id, allows the cache to determine the source of the get. 521 * <p> 522 * @param cacheName 523 * @param key 524 * @param requesterId 525 * @return ICacheElement 526 */ 527 private ICacheElement<K, V> processGet( String cacheName, K key, long requesterId ) 528 { 529 boolean fromCluster = isRequestFromCluster( requesterId ); 530 531 if ( log.isDebugEnabled() ) 532 { 533 log.debug( "get [" + key + "] from cache [" + cacheName + "] requesterId = [" + requesterId 534 + "] fromCluster = " + fromCluster ); 535 } 536 537 CacheListeners<K, V> cacheDesc = null; 538 try 539 { 540 cacheDesc = getCacheListeners( cacheName ); 541 } 542 catch ( Exception e ) 543 { 544 log.error( "Problem getting listeners.", e ); 545 546 if ( cacheEventLogger != null ) 547 { 548 cacheEventLogger.logError( "RemoteCacheServer", ICacheEventLogger.GET_EVENT, e.getMessage() + cacheName 549 + " KEY: " + key ); 550 } 551 } 552 553 ICacheElement<K, V> element = getFromCacheListeners( key, fromCluster, cacheDesc, null ); 554 return element; 555 } 556 557 /** 558 * Gets the item from the associated cache listeners. 559 * <p> 560 * @param key 561 * @param fromCluster 562 * @param cacheDesc 563 * @param element 564 * @return ICacheElement 565 */ 566 private ICacheElement<K, V> getFromCacheListeners( K key, boolean fromCluster, CacheListeners<K, V> cacheDesc, 567 ICacheElement<K, V> element ) 568 { 569 ICacheElement<K, V> returnElement = element; 570 571 if ( cacheDesc != null ) 572 { 573 CompositeCache<K, V> c = (CompositeCache<K, V>) cacheDesc.cache; 574 575 // If we have a get come in from a client and we don't have the item 576 // locally, we will allow the cache to look in other non local sources, 577 // such as a remote cache or a lateral. 578 // 579 // Since remote servers never get from clients and clients never go 580 // remote from a remote call, this 581 // will not result in any loops. 582 // 583 // This is the only instance I can think of where we allow a remote get 584 // from a remote call. The purpose is to allow remote cache servers to 585 // talk to each other. If one goes down, you want it to be able to get 586 // data from those that were up when the failed server comes back o 587 // line. 588 589 if ( !fromCluster && this.remoteCacheServerAttributes.isAllowClusterGet() ) 590 { 591 if ( log.isDebugEnabled() ) 592 { 593 log.debug( "NonLocalGet. fromCluster [" + fromCluster + "] AllowClusterGet [" 594 + this.remoteCacheServerAttributes.isAllowClusterGet() + "]" ); 595 } 596 returnElement = c.get( key ); 597 } 598 else 599 { 600 // Gets from cluster type remote will end up here. 601 // Gets from all clients will end up here if allow cluster get is 602 // false. 603 604 if ( log.isDebugEnabled() ) 605 { 606 log.debug( "LocalGet. fromCluster [" + fromCluster + "] AllowClusterGet [" 607 + this.remoteCacheServerAttributes.isAllowClusterGet() + "]" ); 608 } 609 returnElement = c.localGet( key ); 610 } 611 } 612 613 return returnElement; 614 } 615 616 /** 617 * Gets all matching items. 618 * <p> 619 * @param cacheName 620 * @param pattern 621 * @return Map of keys and wrapped objects 622 * @throws IOException 623 */ 624 @Override 625 public Map<K, ICacheElement<K, V>> getMatching( String cacheName, String pattern ) 626 throws IOException 627 { 628 return getMatching( cacheName, pattern, 0 ); 629 } 630 631 /** 632 * Retrieves all matching keys. 633 * <p> 634 * @param cacheName 635 * @param pattern 636 * @param requesterId 637 * @return Map of keys and wrapped objects 638 * @throws IOException 639 */ 640 @Override 641 public Map<K, ICacheElement<K, V>> getMatching( String cacheName, String pattern, long requesterId ) 642 throws IOException 643 { 644 ICacheEvent<String> cacheEvent = createICacheEvent( cacheName, pattern, requesterId, 645 ICacheEventLogger.GETMATCHING_EVENT ); 646 try 647 { 648 return processGetMatching( cacheName, pattern, requesterId ); 649 } 650 finally 651 { 652 logICacheEvent( cacheEvent ); 653 } 654 } 655 656 /** 657 * Retrieves all matching keys. 658 * <p> 659 * @param cacheName 660 * @param pattern 661 * @param requesterId 662 * @return Map of keys and wrapped objects 663 */ 664 protected Map<K, ICacheElement<K, V>> processGetMatching( String cacheName, String pattern, long requesterId ) 665 { 666 boolean fromCluster = isRequestFromCluster( requesterId ); 667 668 if ( log.isDebugEnabled() ) 669 { 670 log.debug( "getMatching [" + pattern + "] from cache [" + cacheName + "] requesterId = [" + requesterId 671 + "] fromCluster = " + fromCluster ); 672 } 673 674 CacheListeners<K, V> cacheDesc = null; 675 try 676 { 677 cacheDesc = getCacheListeners( cacheName ); 678 } 679 catch ( Exception e ) 680 { 681 log.error( "Problem getting listeners.", e ); 682 683 if ( cacheEventLogger != null ) 684 { 685 cacheEventLogger.logError( "RemoteCacheServer", ICacheEventLogger.GETMATCHING_EVENT, e.getMessage() 686 + cacheName + " pattern: " + pattern ); 687 } 688 } 689 690 return getMatchingFromCacheListeners( pattern, fromCluster, cacheDesc ); 691 } 692 693 /** 694 * Gets the item from the associated cache listeners. 695 * <p> 696 * @param pattern 697 * @param fromCluster 698 * @param cacheDesc 699 * @return Map of keys to results 700 */ 701 private Map<K, ICacheElement<K, V>> getMatchingFromCacheListeners( String pattern, boolean fromCluster, CacheListeners<K, V> cacheDesc ) 702 { 703 Map<K, ICacheElement<K, V>> elements = null; 704 if ( cacheDesc != null ) 705 { 706 CompositeCache<K, V> c = (CompositeCache<K, V>) cacheDesc.cache; 707 708 // We always want to go remote and then merge the items. But this can lead to inconsistencies after 709 // failover recovery. Removed items may show up. There is no good way to prevent this. 710 // We should make it configurable. 711 712 if ( !fromCluster && this.remoteCacheServerAttributes.isAllowClusterGet() ) 713 { 714 if ( log.isDebugEnabled() ) 715 { 716 log.debug( "NonLocalGetMatching. fromCluster [" + fromCluster + "] AllowClusterGet [" 717 + this.remoteCacheServerAttributes.isAllowClusterGet() + "]" ); 718 } 719 elements = c.getMatching( pattern ); 720 } 721 else 722 { 723 // Gets from cluster type remote will end up here. 724 // Gets from all clients will end up here if allow cluster get is 725 // false. 726 727 if ( log.isDebugEnabled() ) 728 { 729 log.debug( "LocalGetMatching. fromCluster [" + fromCluster + "] AllowClusterGet [" 730 + this.remoteCacheServerAttributes.isAllowClusterGet() + "]" ); 731 } 732 elements = c.localGetMatching( pattern ); 733 } 734 } 735 return elements; 736 } 737 738 /** 739 * Gets multiple items from the cache based on the given set of keys. 740 * <p> 741 * @param cacheName 742 * @param keys 743 * @return a map of K key to ICacheElement<K, V> element, or an empty map if there is no 744 * data in cache for any of these keys 745 * @throws IOException 746 */ 747 @Override 748 public Map<K, ICacheElement<K, V>> getMultiple( String cacheName, Set<K> keys ) 749 throws IOException 750 { 751 return this.getMultiple( cacheName, keys, 0 ); 752 } 753 754 /** 755 * Gets multiple items from the cache based on the given set of keys. 756 * <p> 757 * The internal processing is wrapped in event logging calls. 758 * <p> 759 * @param cacheName 760 * @param keys 761 * @param requesterId 762 * @return a map of K key to ICacheElement<K, V> element, or an empty map if there is no 763 * data in cache for any of these keys 764 * @throws IOException 765 */ 766 @Override 767 public Map<K, ICacheElement<K, V>> getMultiple( String cacheName, Set<K> keys, long requesterId ) 768 throws IOException 769 { 770 ICacheEvent<Serializable> cacheEvent = createICacheEvent( cacheName, (Serializable) keys, requesterId, 771 ICacheEventLogger.GETMULTIPLE_EVENT ); 772 try 773 { 774 return processGetMultiple( cacheName, keys, requesterId ); 775 } 776 finally 777 { 778 logICacheEvent( cacheEvent ); 779 } 780 } 781 782 /** 783 * Gets multiple items from the cache based on the given set of keys. 784 * <p> 785 * @param cacheName 786 * @param keys 787 * @param requesterId 788 * @return a map of K key to ICacheElement<K, V> element, or an empty map if there is no 789 * data in cache for any of these keys 790 */ 791 private Map<K, ICacheElement<K, V>> processGetMultiple( String cacheName, Set<K> keys, long requesterId ) 792 { 793 boolean fromCluster = isRequestFromCluster( requesterId ); 794 795 if ( log.isDebugEnabled() ) 796 { 797 log.debug( "getMultiple [" + keys + "] from cache [" + cacheName + "] requesterId = [" + requesterId 798 + "] fromCluster = " + fromCluster ); 799 } 800 801 CacheListeners<K, V> cacheDesc = getCacheListeners( cacheName ); 802 Map<K, ICacheElement<K, V>> elements = getMultipleFromCacheListeners( keys, null, fromCluster, cacheDesc ); 803 return elements; 804 } 805 806 /** 807 * Since a non-receiving remote cache client will not register a listener, it will not have a 808 * listener id assigned from the server. As such the remote server cannot determine if it is a 809 * cluster or a normal client. It will assume that it is a normal client. 810 * <p> 811 * @param requesterId 812 * @return true is from a cluster. 813 */ 814 private boolean isRequestFromCluster( long requesterId ) 815 { 816 RemoteType remoteTypeL = idTypeMap.get( Long.valueOf( requesterId ) ); 817 return remoteTypeL == RemoteType.CLUSTER; 818 } 819 820 /** 821 * Gets the items from the associated cache listeners. 822 * <p> 823 * @param keys 824 * @param elements 825 * @param fromCluster 826 * @param cacheDesc 827 * @return Map 828 */ 829 private Map<K, ICacheElement<K, V>> getMultipleFromCacheListeners( Set<K> keys, Map<K, ICacheElement<K, V>> elements, boolean fromCluster, CacheListeners<K, V> cacheDesc ) 830 { 831 Map<K, ICacheElement<K, V>> returnElements = elements; 832 833 if ( cacheDesc != null ) 834 { 835 CompositeCache<K, V> c = (CompositeCache<K, V>) cacheDesc.cache; 836 837 // If we have a getMultiple come in from a client and we don't have the item 838 // locally, we will allow the cache to look in other non local sources, 839 // such as a remote cache or a lateral. 840 // 841 // Since remote servers never get from clients and clients never go 842 // remote from a remote call, this 843 // will not result in any loops. 844 // 845 // This is the only instance I can think of where we allow a remote get 846 // from a remote call. The purpose is to allow remote cache servers to 847 // talk to each other. If one goes down, you want it to be able to get 848 // data from those that were up when the failed server comes back on 849 // line. 850 851 if ( !fromCluster && this.remoteCacheServerAttributes.isAllowClusterGet() ) 852 { 853 if ( log.isDebugEnabled() ) 854 { 855 log.debug( "NonLocalGetMultiple. fromCluster [" + fromCluster + "] AllowClusterGet [" 856 + this.remoteCacheServerAttributes.isAllowClusterGet() + "]" ); 857 } 858 859 returnElements = c.getMultiple( keys ); 860 } 861 else 862 { 863 // Gets from cluster type remote will end up here. 864 // Gets from all clients will end up here if allow cluster get is 865 // false. 866 867 if ( log.isDebugEnabled() ) 868 { 869 log.debug( "LocalGetMultiple. fromCluster [" + fromCluster + "] AllowClusterGet [" 870 + this.remoteCacheServerAttributes.isAllowClusterGet() + "]" ); 871 } 872 873 returnElements = c.localGetMultiple( keys ); 874 } 875 } 876 877 return returnElements; 878 } 879 880 /** 881 * Return the keys in the cache. 882 * <p> 883 * @param cacheName the name of the cache region 884 * @see org.apache.commons.jcs.auxiliary.AuxiliaryCache#getKeySet() 885 */ 886 @Override 887 public Set<K> getKeySet(String cacheName) throws IOException 888 { 889 return processGetKeySet( cacheName ); 890 } 891 892 /** 893 * Gets the set of keys of objects currently in the cache. 894 * <p> 895 * @param cacheName 896 * @return Set 897 */ 898 protected Set<K> processGetKeySet( String cacheName ) 899 { 900 CacheListeners<K, V> cacheDesc = null; 901 try 902 { 903 cacheDesc = getCacheListeners( cacheName ); 904 } 905 catch ( Exception e ) 906 { 907 log.error( "Problem getting listeners.", e ); 908 } 909 910 if ( cacheDesc == null ) 911 { 912 return Collections.emptySet(); 913 } 914 915 CompositeCache<K, V> c = (CompositeCache<K, V>) cacheDesc.cache; 916 return c.getKeySet(); 917 } 918 919 /** 920 * Removes the given key from the specified remote cache. Defaults the listener id to 0. 921 * <p> 922 * @param cacheName 923 * @param key 924 * @throws IOException 925 */ 926 @Override 927 public void remove( String cacheName, K key ) 928 throws IOException 929 { 930 remove( cacheName, key, 0 ); 931 } 932 933 /** 934 * Remove the key from the cache region and don't tell the source listener about it. 935 * <p> 936 * The internal processing is wrapped in event logging calls. 937 * <p> 938 * @param cacheName 939 * @param key 940 * @param requesterId 941 * @throws IOException 942 */ 943 @Override 944 public void remove( String cacheName, K key, long requesterId ) 945 throws IOException 946 { 947 ICacheEvent<K> cacheEvent = createICacheEvent( cacheName, key, requesterId, ICacheEventLogger.REMOVE_EVENT ); 948 try 949 { 950 processRemove( cacheName, key, requesterId ); 951 } 952 finally 953 { 954 logICacheEvent( cacheEvent ); 955 } 956 } 957 958 /** 959 * Remove the key from the cache region and don't tell the source listener about it. 960 * <p> 961 * @param cacheName 962 * @param key 963 * @param requesterId 964 * @throws IOException 965 */ 966 private void processRemove( String cacheName, K key, long requesterId ) 967 throws IOException 968 { 969 if ( log.isDebugEnabled() ) 970 { 971 log.debug( "remove [" + key + "] from cache [" + cacheName + "]" ); 972 } 973 974 CacheListeners<K, V> cacheDesc = cacheListenersMap.get( cacheName ); 975 976 boolean fromCluster = isRequestFromCluster( requesterId ); 977 978 if ( cacheDesc != null ) 979 { 980 // best attempt to achieve ordered cache item removal and 981 // notification. 982 synchronized ( cacheDesc ) 983 { 984 boolean removeSuccess = false; 985 986 // No need to notify if it was not cached. 987 CompositeCache<K, V> c = (CompositeCache<K, V>) cacheDesc.cache; 988 989 if ( fromCluster ) 990 { 991 if ( log.isDebugEnabled() ) 992 { 993 log.debug( "Remove FROM cluster, NOT updating other auxiliaries for region" ); 994 } 995 removeSuccess = c.localRemove( key ); 996 } 997 else 998 { 999 if ( log.isDebugEnabled() ) 1000 { 1001 log.debug( "Remove NOT from cluster, updating other auxiliaries for region" ); 1002 } 1003 removeSuccess = c.remove( key ); 1004 } 1005 1006 if ( log.isDebugEnabled() ) 1007 { 1008 log.debug( "remove [" + key + "] from cache [" + cacheName + "] success (was it found) = " 1009 + removeSuccess ); 1010 } 1011 1012 // UPDATE LOCALS IF A REQUEST COMES FROM A CLUSTER 1013 // IF LOCAL CLUSTER CONSISTENCY IS CONFIGURED 1014 if ( !fromCluster || ( fromCluster && remoteCacheServerAttributes.isLocalClusterConsistency() ) ) 1015 { 1016 ICacheEventQueue<K, V>[] qlist = getEventQList( cacheDesc, requesterId ); 1017 1018 for ( int i = 0; i < qlist.length; i++ ) 1019 { 1020 qlist[i].addRemoveEvent( key ); 1021 } 1022 } 1023 } 1024 } 1025 } 1026 1027 /** 1028 * Remove all keys from the specified remote cache. 1029 * <p> 1030 * @param cacheName 1031 * @throws IOException 1032 */ 1033 @Override 1034 public void removeAll( String cacheName ) 1035 throws IOException 1036 { 1037 removeAll( cacheName, 0 ); 1038 } 1039 1040 /** 1041 * Remove all keys from the specified remote cache. 1042 * <p> 1043 * The internal processing is wrapped in event logging calls. 1044 * <p> 1045 * @param cacheName 1046 * @param requesterId 1047 * @throws IOException 1048 */ 1049 @Override 1050 public void removeAll( String cacheName, long requesterId ) 1051 throws IOException 1052 { 1053 ICacheEvent<String> cacheEvent = createICacheEvent( cacheName, "all", requesterId, ICacheEventLogger.REMOVEALL_EVENT ); 1054 try 1055 { 1056 processRemoveAll( cacheName, requesterId ); 1057 } 1058 finally 1059 { 1060 logICacheEvent( cacheEvent ); 1061 } 1062 } 1063 1064 /** 1065 * Remove all keys from the specified remote cache. 1066 * <p> 1067 * @param cacheName 1068 * @param requesterId 1069 * @throws IOException 1070 */ 1071 private void processRemoveAll( String cacheName, long requesterId ) 1072 throws IOException 1073 { 1074 CacheListeners<K, V> cacheDesc = cacheListenersMap.get( cacheName ); 1075 1076 boolean fromCluster = isRequestFromCluster( requesterId ); 1077 1078 if ( cacheDesc != null ) 1079 { 1080 // best attempt to achieve ordered cache item removal and 1081 // notification. 1082 synchronized ( cacheDesc ) 1083 { 1084 // No need to broadcast, or notify if it was not cached. 1085 CompositeCache<K, V> c = (CompositeCache<K, V>) cacheDesc.cache; 1086 1087 if ( fromCluster ) 1088 { 1089 if ( log.isDebugEnabled() ) 1090 { 1091 log.debug( "RemoveALL FROM cluster, NOT updating other auxiliaries for region" ); 1092 } 1093 c.localRemoveAll(); 1094 } 1095 else 1096 { 1097 if ( log.isDebugEnabled() ) 1098 { 1099 log.debug( "RemoveALL NOT from cluster, updating other auxiliaries for region" ); 1100 } 1101 c.removeAll(); 1102 } 1103 1104 // update registered listeners 1105 if ( !fromCluster || ( fromCluster && remoteCacheServerAttributes.isLocalClusterConsistency() ) ) 1106 { 1107 ICacheEventQueue<K, V>[] qlist = getEventQList( cacheDesc, requesterId ); 1108 1109 for ( int i = 0; i < qlist.length; i++ ) 1110 { 1111 qlist[i].addRemoveAllEvent(); 1112 } 1113 } 1114 } 1115 } 1116 } 1117 1118 /** 1119 * How many put events have we received. 1120 * <p> 1121 * @return puts 1122 */ 1123 // Currently only intended for use by unit tests 1124 int getPutCount() 1125 { 1126 return puts; 1127 } 1128 1129 /** 1130 * Frees the specified remote cache. 1131 * <p> 1132 * @param cacheName 1133 * @throws IOException 1134 */ 1135 @Override 1136 public void dispose( String cacheName ) 1137 throws IOException 1138 { 1139 dispose( cacheName, 0 ); 1140 } 1141 1142 /** 1143 * Frees the specified remote cache. 1144 * <p> 1145 * @param cacheName 1146 * @param requesterId 1147 * @throws IOException 1148 */ 1149 public void dispose( String cacheName, long requesterId ) 1150 throws IOException 1151 { 1152 ICacheEvent<String> cacheEvent = createICacheEvent( cacheName, "none", requesterId, ICacheEventLogger.DISPOSE_EVENT ); 1153 try 1154 { 1155 processDispose( cacheName, requesterId ); 1156 } 1157 finally 1158 { 1159 logICacheEvent( cacheEvent ); 1160 } 1161 } 1162 1163 /** 1164 * @param cacheName 1165 * @param requesterId 1166 * @throws IOException 1167 */ 1168 private void processDispose( String cacheName, long requesterId ) 1169 throws IOException 1170 { 1171 if ( log.isInfoEnabled() ) 1172 { 1173 log.info( "Dispose request received from listener [" + requesterId + "]" ); 1174 } 1175 1176 CacheListeners<K, V> cacheDesc = cacheListenersMap.get( cacheName ); 1177 1178 // this is dangerous 1179 if ( cacheDesc != null ) 1180 { 1181 // best attempt to achieve ordered free-cache-op and notification. 1182 synchronized ( cacheDesc ) 1183 { 1184 ICacheEventQueue<K, V>[] qlist = getEventQList( cacheDesc, requesterId ); 1185 1186 for ( int i = 0; i < qlist.length; i++ ) 1187 { 1188 qlist[i].addDisposeEvent(); 1189 } 1190 cacheManager.freeCache( cacheName ); 1191 } 1192 } 1193 } 1194 1195 /** 1196 * Frees all remote caches. 1197 * <p> 1198 * @throws IOException 1199 */ 1200 @Override 1201 public void release() 1202 throws IOException 1203 { 1204 for (CacheListeners<K, V> cacheDesc : cacheListenersMap.values()) 1205 { 1206 ICacheEventQueue<K, V>[] qlist = getEventQList( cacheDesc, 0 ); 1207 1208 for ( int i = 0; i < qlist.length; i++ ) 1209 { 1210 qlist[i].addDisposeEvent(); 1211 } 1212 } 1213 cacheManager.release(); 1214 } 1215 1216 /** 1217 * Returns the cache listener for the specified cache. Creates the cache and the cache 1218 * descriptor if they do not already exist. 1219 * <p> 1220 * @param cacheName 1221 * @return The cacheListeners value 1222 */ 1223 protected CacheListeners<K, V> getCacheListeners( String cacheName ) 1224 { 1225 CacheListeners<K, V> cacheListeners = cacheListenersMap.get( cacheName ); 1226 1227 if ( cacheListeners == null ) 1228 { 1229 cacheListenersLock.lock(); 1230 1231 try 1232 { 1233 // double check 1234 cacheListeners = cacheListenersMap.get( cacheName ); 1235 if ( cacheListeners == null ) 1236 { 1237 CompositeCache<K, V> cache = cacheManager.getCache( cacheName ); 1238 cacheListeners = new CacheListeners<K, V>( cache ); 1239 cacheListenersMap.put( cacheName, cacheListeners ); 1240 } 1241 } 1242 finally 1243 { 1244 cacheListenersLock.unlock(); 1245 } 1246 } 1247 1248 return cacheListeners; 1249 } 1250 1251 /** 1252 * Gets the clusterListeners attribute of the RemoteCacheServer object. 1253 * <p> 1254 * TODO may be able to remove this 1255 * @param cacheName 1256 * @return The clusterListeners value 1257 */ 1258 protected CacheListeners<K, V> getClusterListeners( String cacheName ) 1259 { 1260 CacheListeners<K, V> cacheListeners = clusterListenersMap.get( cacheName ); 1261 1262 if ( cacheListeners == null ) 1263 { 1264 clusterListenersLock.lock(); 1265 1266 try 1267 { 1268 cacheListeners = clusterListenersMap.get( cacheName ); 1269 if ( cacheListeners == null ) 1270 { 1271 CompositeCache<K, V> cache = cacheManager.getCache( cacheName ); 1272 cacheListeners = new CacheListeners<K, V>( cache ); 1273 clusterListenersMap.put( cacheName, cacheListeners ); 1274 } 1275 } 1276 finally 1277 { 1278 clusterListenersLock.unlock(); 1279 } 1280 } 1281 1282 return cacheListeners; 1283 } 1284 1285 /** 1286 * Gets the eventQList attribute of the RemoteCacheServer object. This returns the event queues 1287 * stored in the cacheListeners object for a particular region, if the queue is not for this 1288 * requester. 1289 * <p> 1290 * Basically, this makes sure that a request from a particular local cache, identified by its 1291 * listener id, does not result in a call to that same listener. 1292 * <p> 1293 * @param cacheListeners 1294 * @param requesterId 1295 * @return The eventQList value 1296 */ 1297 @SuppressWarnings("unchecked") // No generic arrays in java 1298 private ICacheEventQueue<K, V>[] getEventQList( CacheListeners<K, V> cacheListeners, long requesterId ) 1299 { 1300 ICacheEventQueue<K, V>[] list = cacheListeners.eventQMap.values().toArray( new ICacheEventQueue[0] ); 1301 int count = 0; 1302 // Set those not qualified to null; Count those qualified. 1303 for ( int i = 0; i < list.length; i++ ) 1304 { 1305 ICacheEventQueue<K, V> q = list[i]; 1306 if ( q.isWorking() && q.getListenerId() != requesterId ) 1307 { 1308 count++; 1309 } 1310 else 1311 { 1312 list[i] = null; 1313 } 1314 } 1315 if ( count == list.length ) 1316 { 1317 // All qualified. 1318 return list; 1319 } 1320 1321 // Returns only the qualified. 1322 ICacheEventQueue<K, V>[] qq = new ICacheEventQueue[count]; 1323 count = 0; 1324 for ( int i = 0; i < list.length; i++ ) 1325 { 1326 if ( list[i] != null ) 1327 { 1328 qq[count++] = list[i]; 1329 } 1330 } 1331 return qq; 1332 } 1333 1334 /** 1335 * Removes dead event queues. Should clean out deregistered listeners. 1336 * <p> 1337 * @param eventQMap 1338 */ 1339 private static <KK, VV> void cleanupEventQMap( Map<Long, ICacheEventQueue<KK, VV>> eventQMap ) 1340 { 1341 synchronized ( eventQMap ) 1342 { 1343 for (Iterator<Map.Entry<Long, ICacheEventQueue<KK, VV>>> itr = eventQMap.entrySet().iterator(); itr.hasNext(); ) 1344 { 1345 Map.Entry<Long, ICacheEventQueue<KK, VV>> e = itr.next(); 1346 ICacheEventQueue<KK, VV> q = e.getValue(); 1347 1348 // this does not care if the q is alive (i.e. if 1349 // there are active threads; it cares if the queue 1350 // is working -- if it has not encountered errors 1351 // above the failure threshold 1352 if ( !q.isWorking() ) 1353 { 1354 itr.remove(); 1355 log.warn( "Cache event queue " + q + " is not working and removed from cache server." ); 1356 } 1357 } 1358 } 1359 } 1360 1361 /** 1362 * Subscribes to the specified remote cache. 1363 * <p> 1364 * If the client id is 0, then the remote cache server will increment it's local count and 1365 * assign an id to the client. 1366 * <p> 1367 * @param cacheName the specified remote cache. 1368 * @param listener object to notify for cache changes. must be synchronized since there are 1369 * remote calls involved. 1370 * @throws IOException 1371 */ 1372 @Override 1373 @SuppressWarnings("unchecked") // Need to cast to specific return type from getClusterListeners() 1374 public <KK, VV> void addCacheListener( String cacheName, ICacheListener<KK, VV> listener ) 1375 throws IOException 1376 { 1377 if ( cacheName == null || listener == null ) 1378 { 1379 throw new IllegalArgumentException( "cacheName and listener must not be null" ); 1380 } 1381 CacheListeners<KK, VV> cacheListeners; 1382 1383 IRemoteCacheListener<KK, VV> ircl = (IRemoteCacheListener<KK, VV>) listener; 1384 1385 String listenerAddress = ircl.getLocalHostAddress(); 1386 1387 RemoteType remoteType = ircl.getRemoteType(); 1388 if ( remoteType == RemoteType.CLUSTER ) 1389 { 1390 log.debug( "adding cluster listener, listenerAddress [" + listenerAddress + "]" ); 1391 cacheListeners = (CacheListeners<KK, VV>)getClusterListeners( cacheName ); 1392 } 1393 else 1394 { 1395 log.debug( "adding normal listener, listenerAddress [" + listenerAddress + "]" ); 1396 cacheListeners = (CacheListeners<KK, VV>)getCacheListeners( cacheName ); 1397 } 1398 Map<Long, ICacheEventQueue<KK, VV>> eventQMap = cacheListeners.eventQMap; 1399 cleanupEventQMap( eventQMap ); 1400 1401 // synchronized ( listenerId ) 1402 synchronized ( ICacheListener.class ) 1403 { 1404 long id = 0; 1405 try 1406 { 1407 id = listener.getListenerId(); 1408 // clients probably shouldn't do this. 1409 if ( id == 0 ) 1410 { 1411 // must start at one so the next gets recognized 1412 long listenerIdB = nextListenerId(); 1413 if ( log.isDebugEnabled() ) 1414 { 1415 log.debug( "listener id=" + ( listenerIdB & 0xff ) + " addded for cache [" + cacheName 1416 + "], listenerAddress [" + listenerAddress + "]" ); 1417 } 1418 listener.setListenerId( listenerIdB ); 1419 id = listenerIdB; 1420 1421 // in case it needs synchronization 1422 String message = "Adding vm listener under new id = [" + listenerIdB + "], listenerAddress [" 1423 + listenerAddress + "]"; 1424 logApplicationEvent( "RemoteCacheServer", "addCacheListener", message ); 1425 if ( log.isInfoEnabled() ) 1426 { 1427 log.info( message ); 1428 } 1429 } 1430 else 1431 { 1432 String message = "Adding listener under existing id = [" + id + "], listenerAddress [" 1433 + listenerAddress + "]"; 1434 logApplicationEvent( "RemoteCacheServer", "addCacheListener", message ); 1435 if ( log.isInfoEnabled() ) 1436 { 1437 log.info( message ); 1438 } 1439 // should confirm the the host is the same as we have on 1440 // record, just in case a client has made a mistake. 1441 } 1442 1443 // relate the type to an id 1444 this.idTypeMap.put( Long.valueOf( id ), remoteType); 1445 if ( listenerAddress != null ) 1446 { 1447 this.idIPMap.put( Long.valueOf( id ), listenerAddress ); 1448 } 1449 } 1450 catch ( IOException ioe ) 1451 { 1452 String message = "Problem setting listener id, listenerAddress [" + listenerAddress + "]"; 1453 log.error( message, ioe ); 1454 1455 if ( cacheEventLogger != null ) 1456 { 1457 cacheEventLogger.logError( "RemoteCacheServer", "addCacheListener", message + " - " 1458 + ioe.getMessage() ); 1459 } 1460 } 1461 1462 CacheEventQueueFactory<KK, VV> fact = new CacheEventQueueFactory<KK, VV>(); 1463 ICacheEventQueue<KK, VV> q = fact.createCacheEventQueue( listener, id, cacheName, remoteCacheServerAttributes 1464 .getEventQueuePoolName(), remoteCacheServerAttributes.getEventQueueType() ); 1465 1466 eventQMap.put(Long.valueOf(listener.getListenerId()), q); 1467 1468 if ( log.isInfoEnabled() ) 1469 { 1470 log.info( cacheListeners ); 1471 } 1472 } 1473 } 1474 1475 /** 1476 * Subscribes to all remote caches. 1477 * <p> 1478 * @param listener The feature to be added to the CacheListener attribute 1479 * @throws IOException 1480 */ 1481 @Override 1482 public <KK, VV> void addCacheListener( ICacheListener<KK, VV> listener ) 1483 throws IOException 1484 { 1485 for (String cacheName : cacheListenersMap.keySet()) 1486 { 1487 addCacheListener( cacheName, listener ); 1488 1489 if ( log.isDebugEnabled() ) 1490 { 1491 log.debug( "Adding listener for cache [" + cacheName + "]" ); 1492 } 1493 } 1494 } 1495 1496 /** 1497 * Unsubscribe this listener from this region. If the listener is registered, it will be removed 1498 * from the event queue map list. 1499 * <p> 1500 * @param cacheName 1501 * @param listener 1502 * @throws IOException 1503 */ 1504 @Override 1505 public <KK, VV> void removeCacheListener( String cacheName, ICacheListener<KK, VV> listener ) 1506 throws IOException 1507 { 1508 removeCacheListener( cacheName, listener.getListenerId() ); 1509 } 1510 1511 /** 1512 * Unsubscribe this listener from this region. If the listener is registered, it will be removed 1513 * from the event queue map list. 1514 * <p> 1515 * @param cacheName 1516 * @param listenerId 1517 */ 1518 public void removeCacheListener( String cacheName, long listenerId ) 1519 { 1520 String message = "Removing listener for cache region = [" + cacheName + "] and listenerId [" + listenerId + "]"; 1521 logApplicationEvent( "RemoteCacheServer", "removeCacheListener", message ); 1522 if ( log.isInfoEnabled() ) 1523 { 1524 log.info( message ); 1525 } 1526 1527 boolean isClusterListener = isRequestFromCluster( listenerId ); 1528 1529 CacheListeners<K, V> cacheDesc = null; 1530 1531 if ( isClusterListener ) 1532 { 1533 cacheDesc = getClusterListeners( cacheName ); 1534 } 1535 else 1536 { 1537 cacheDesc = getCacheListeners( cacheName ); 1538 } 1539 Map<Long, ICacheEventQueue<K, V>> eventQMap = cacheDesc.eventQMap; 1540 cleanupEventQMap( eventQMap ); 1541 ICacheEventQueue<K, V> q = eventQMap.remove( Long.valueOf( listenerId ) ); 1542 1543 if ( q != null ) 1544 { 1545 if ( log.isDebugEnabled() ) 1546 { 1547 log.debug( "Found queue for cache region = [" + cacheName + "] and listenerId [" + listenerId + "]" ); 1548 } 1549 q.destroy(); 1550 cleanupEventQMap( eventQMap ); 1551 } 1552 else 1553 { 1554 if ( log.isDebugEnabled() ) 1555 { 1556 log.debug( "Did not find queue for cache region = [" + cacheName + "] and listenerId [" + listenerId 1557 + "]" ); 1558 } 1559 } 1560 1561 // cleanup 1562 idTypeMap.remove( Long.valueOf( listenerId ) ); 1563 idIPMap.remove( Long.valueOf( listenerId ) ); 1564 1565 if ( log.isInfoEnabled() ) 1566 { 1567 log.info( "After removing listener [" + listenerId + "] cache region " + cacheName + "'s listener size [" 1568 + cacheDesc.eventQMap.size() + "]" ); 1569 } 1570 } 1571 1572 /** 1573 * Unsubscribes from all remote caches. 1574 * <p> 1575 * @param listener 1576 * @throws IOException 1577 */ 1578 @Override 1579 public <KK, VV> void removeCacheListener( ICacheListener<KK, VV> listener ) 1580 throws IOException 1581 { 1582 for (String cacheName : cacheListenersMap.keySet()) 1583 { 1584 removeCacheListener( cacheName, listener ); 1585 1586 if ( log.isInfoEnabled() ) 1587 { 1588 log.info( "Removing listener for cache [" + cacheName + "]" ); 1589 } 1590 } 1591 } 1592 1593 /** 1594 * Shuts down the remote server. 1595 * <p> 1596 * @throws IOException 1597 */ 1598 @Override 1599 public void shutdown() 1600 throws IOException 1601 { 1602 shutdown("", Registry.REGISTRY_PORT); 1603 } 1604 1605 /** 1606 * Shuts down a server at a particular host and port. Then it calls shutdown on the cache 1607 * itself. 1608 * <p> 1609 * @param host 1610 * @param port 1611 * @throws IOException 1612 */ 1613 @Override 1614 public void shutdown( String host, int port ) 1615 throws IOException 1616 { 1617 if ( log.isInfoEnabled() ) 1618 { 1619 log.info( "Received shutdown request. Shutting down server." ); 1620 } 1621 1622 synchronized (listenerId) 1623 { 1624 for (String cacheName : cacheListenersMap.keySet()) 1625 { 1626 for (int i = 0; i <= listenerId[0]; i++) 1627 { 1628 removeCacheListener( cacheName, i ); 1629 } 1630 1631 if ( log.isInfoEnabled() ) 1632 { 1633 log.info( "Removing listener for cache [" + cacheName + "]" ); 1634 } 1635 } 1636 1637 cacheListenersMap.clear(); 1638 clusterListenersMap.clear(); 1639 } 1640 RemoteCacheServerFactory.shutdownImpl( host, port ); 1641 this.cacheManager.shutDown(); 1642 } 1643 1644 /** 1645 * Called by the RMI runtime sometime after the runtime determines that the reference list, the 1646 * list of clients referencing the remote object, becomes empty. 1647 */ 1648 // TODO: test out the DGC. 1649 @Override 1650 public void unreferenced() 1651 { 1652 if ( log.isInfoEnabled() ) 1653 { 1654 log.info( "*** Server now unreferenced and subject to GC. ***" ); 1655 } 1656 } 1657 1658 /** 1659 * Returns the next generated listener id [0,255]. 1660 * <p> 1661 * @return the listener id of a client. This should be unique for this server. 1662 */ 1663 private long nextListenerId() 1664 { 1665 long id = 0; 1666 if ( listenerId[0] == Integer.MAX_VALUE ) 1667 { 1668 synchronized ( listenerId ) 1669 { 1670 id = listenerId[0]; 1671 listenerId[0] = 0; 1672 // TODO: record & check if the generated id is currently being 1673 // used by a valid listener. Currently if the id wraps after 1674 // Long.MAX_VALUE, 1675 // we just assume it won't collide with an existing listener who 1676 // is live. 1677 } 1678 } 1679 else 1680 { 1681 synchronized ( listenerId ) 1682 { 1683 id = ++listenerId[0]; 1684 } 1685 } 1686 return id; 1687 } 1688 1689 /** 1690 * Gets the stats attribute of the RemoteCacheServer object. 1691 * <p> 1692 * @return The stats value 1693 * @throws IOException 1694 */ 1695 @Override 1696 public String getStats() 1697 throws IOException 1698 { 1699 return cacheManager.getStats(); 1700 } 1701 1702 /** 1703 * Logs an event if an event logger is configured. 1704 * <p> 1705 * @param item 1706 * @param requesterId 1707 * @param eventName 1708 * @return ICacheEvent 1709 */ 1710 private ICacheEvent<ICacheElement<K, V>> createICacheEvent( ICacheElement<K, V> item, long requesterId, String eventName ) 1711 { 1712 if ( cacheEventLogger == null ) 1713 { 1714 return new CacheEvent<ICacheElement<K, V>>(); 1715 } 1716 String ipAddress = getExtraInfoForRequesterId( requesterId ); 1717 return cacheEventLogger 1718 .createICacheEvent( "RemoteCacheServer", item.getCacheName(), eventName, ipAddress, item ); 1719 } 1720 1721 /** 1722 * Logs an event if an event logger is configured. 1723 * <p> 1724 * @param cacheName 1725 * @param key 1726 * @param requesterId 1727 * @param eventName 1728 * @return ICacheEvent 1729 */ 1730 private <T> ICacheEvent<T> createICacheEvent( String cacheName, T key, long requesterId, String eventName ) 1731 { 1732 if ( cacheEventLogger == null ) 1733 { 1734 return new CacheEvent<T>(); 1735 } 1736 String ipAddress = getExtraInfoForRequesterId( requesterId ); 1737 return cacheEventLogger.createICacheEvent( "RemoteCacheServer", cacheName, eventName, ipAddress, key ); 1738 } 1739 1740 /** 1741 * Logs an event if an event logger is configured. 1742 * <p> 1743 * @param source 1744 * @param eventName 1745 * @param optionalDetails 1746 */ 1747 protected void logApplicationEvent( String source, String eventName, String optionalDetails ) 1748 { 1749 if ( cacheEventLogger != null ) 1750 { 1751 cacheEventLogger.logApplicationEvent( source, eventName, optionalDetails ); 1752 } 1753 } 1754 1755 /** 1756 * Logs an event if an event logger is configured. 1757 * <p> 1758 * @param cacheEvent 1759 */ 1760 protected <T> void logICacheEvent( ICacheEvent<T> cacheEvent ) 1761 { 1762 if ( cacheEventLogger != null ) 1763 { 1764 cacheEventLogger.logICacheEvent( cacheEvent ); 1765 } 1766 } 1767 1768 /** 1769 * Ip address for the client, if one is stored. 1770 * <p> 1771 * Protected for testing. 1772 * <p> 1773 * @param requesterId 1774 * @return String 1775 */ 1776 protected String getExtraInfoForRequesterId( long requesterId ) 1777 { 1778 String ipAddress = idIPMap.get( Long.valueOf( requesterId ) ); 1779 return ipAddress; 1780 } 1781 1782 /** 1783 * Allows it to be injected. 1784 * <p> 1785 * @param cacheEventLogger 1786 */ 1787 public void setCacheEventLogger( ICacheEventLogger cacheEventLogger ) 1788 { 1789 this.cacheEventLogger = cacheEventLogger; 1790 } 1791}