001package org.apache.commons.jcs.auxiliary.remote; 002 003/* 004 * Licensed to the Apache Software Foundation (ASF) under one 005 * or more contributor license agreements. See the NOTICE file 006 * distributed with this work for additional information 007 * regarding copyright ownership. The ASF licenses this file 008 * to you under the Apache License, Version 2.0 (the 009 * "License"); you may not use this file except in compliance 010 * with the License. You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, 015 * software distributed under the License is distributed on an 016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 017 * KIND, either express or implied. See the License for the 018 * specific language governing permissions and limitations 019 * under the License. 020 */ 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.HashMap; 025import java.util.Map; 026import java.util.Set; 027import java.util.concurrent.Callable; 028import java.util.concurrent.ExecutionException; 029import java.util.concurrent.Future; 030import java.util.concurrent.ThreadPoolExecutor; 031import java.util.concurrent.TimeUnit; 032import java.util.concurrent.TimeoutException; 033 034import org.apache.commons.jcs.auxiliary.AbstractAuxiliaryCacheEventLogging; 035import org.apache.commons.jcs.auxiliary.AuxiliaryCacheAttributes; 036import org.apache.commons.jcs.auxiliary.remote.behavior.IRemoteCacheAttributes; 037import org.apache.commons.jcs.auxiliary.remote.behavior.IRemoteCacheClient; 038import org.apache.commons.jcs.auxiliary.remote.behavior.IRemoteCacheListener; 039import org.apache.commons.jcs.auxiliary.remote.server.behavior.RemoteType; 040import org.apache.commons.jcs.engine.CacheStatus; 041import org.apache.commons.jcs.engine.ZombieCacheServiceNonLocal; 042import org.apache.commons.jcs.engine.behavior.ICacheElement; 043import org.apache.commons.jcs.engine.behavior.ICacheElementSerialized; 044import org.apache.commons.jcs.engine.behavior.ICacheServiceNonLocal; 045import org.apache.commons.jcs.engine.behavior.IZombie; 046import org.apache.commons.jcs.engine.logging.behavior.ICacheEventLogger; 047import org.apache.commons.jcs.engine.stats.StatElement; 048import org.apache.commons.jcs.engine.stats.Stats; 049import org.apache.commons.jcs.engine.stats.behavior.IStatElement; 050import org.apache.commons.jcs.engine.stats.behavior.IStats; 051import org.apache.commons.jcs.utils.serialization.SerializationConversionUtil; 052import org.apache.commons.jcs.utils.threadpool.ThreadPoolManager; 053import org.apache.commons.logging.Log; 054import org.apache.commons.logging.LogFactory; 055 056/** Abstract base for remote caches. I'm trying to break out and reuse common functionality. */ 057public abstract class AbstractRemoteAuxiliaryCache<K, V> 058 extends AbstractAuxiliaryCacheEventLogging<K, V> 059 implements IRemoteCacheClient<K, V> 060{ 061 /** The logger. */ 062 private static final Log log = LogFactory.getLog( AbstractRemoteAuxiliaryCache.class ); 063 064 /** 065 * This does the work. In an RMI instances, it will be a remote reference. In an http remote 066 * cache it will be an http client. In zombie mode it is replaced with a balking facade. 067 */ 068 private ICacheServiceNonLocal<K, V> remoteCacheService; 069 070 /** The cacheName */ 071 protected final String cacheName; 072 073 /** The listener. This can be null. */ 074 private IRemoteCacheListener<K, V> remoteCacheListener; 075 076 /** The configuration values. TODO, we'll need a base here. */ 077 private IRemoteCacheAttributes remoteCacheAttributes; 078 079 /** A thread pool for gets if configured. */ 080 private ThreadPoolExecutor pool = null; 081 082 /** Should we get asynchronously using a pool. */ 083 private boolean usePoolForGet = false; 084 085 /** 086 * Creates the base. 087 * <p> 088 * @param cattr 089 * @param remote 090 * @param listener 091 */ 092 public AbstractRemoteAuxiliaryCache( IRemoteCacheAttributes cattr, ICacheServiceNonLocal<K, V> remote, 093 IRemoteCacheListener<K, V> listener ) 094 { 095 this.setRemoteCacheAttributes( cattr ); 096 this.cacheName = cattr.getCacheName(); 097 this.setRemoteCacheService( remote ); 098 this.setRemoteCacheListener( listener ); 099 100 if ( log.isDebugEnabled() ) 101 { 102 log.debug( "Construct> cacheName=" + cattr.getCacheName() ); 103 log.debug( "irca = " + getRemoteCacheAttributes() ); 104 log.debug( "remote = " + remote ); 105 log.debug( "listener = " + listener ); 106 } 107 108 // use a pool if it is greater than 0 109 if ( log.isDebugEnabled() ) 110 { 111 log.debug( "GetTimeoutMillis() = " + getRemoteCacheAttributes().getGetTimeoutMillis() ); 112 } 113 114 if ( getRemoteCacheAttributes().getGetTimeoutMillis() > 0 ) 115 { 116 pool = ThreadPoolManager.getInstance().getPool( getRemoteCacheAttributes().getThreadPoolName() ); 117 if ( log.isDebugEnabled() ) 118 { 119 log.debug( "Thread Pool = " + pool ); 120 } 121 if ( pool != null ) 122 { 123 usePoolForGet = true; 124 } 125 } 126 } 127 128 /** 129 * Synchronously dispose the remote cache; if failed, replace the remote handle with a zombie. 130 * <p> 131 * @throws IOException 132 */ 133 @Override 134 protected void processDispose() 135 throws IOException 136 { 137 if ( log.isInfoEnabled() ) 138 { 139 log.info( "Disposing of remote cache." ); 140 } 141 try 142 { 143 if ( getRemoteCacheListener() != null ) 144 { 145 getRemoteCacheListener().dispose(); 146 } 147 } 148 catch ( Exception ex ) 149 { 150 log.error( "Couldn't dispose", ex ); 151 handleException( ex, "Failed to dispose [" + cacheName + "]", ICacheEventLogger.DISPOSE_EVENT ); 152 } 153 } 154 155 /** 156 * Synchronously get from the remote cache; if failed, replace the remote handle with a zombie. 157 * <p> 158 * Use threadpool to timeout if a value is set for GetTimeoutMillis 159 * <p> 160 * If we are a cluster client, we need to leave the Element in its serialized form. Cluster 161 * clients cannot deserialize objects. Cluster clients get ICacheElementSerialized objects from 162 * other remote servers. 163 * <p> 164 * @param key 165 * @return ICacheElement, a wrapper around the key, value, and attributes 166 * @throws IOException 167 */ 168 @Override 169 protected ICacheElement<K, V> processGet( K key ) 170 throws IOException 171 { 172 ICacheElement<K, V> retVal = null; 173 try 174 { 175 if ( usePoolForGet ) 176 { 177 retVal = getUsingPool( key ); 178 } 179 else 180 { 181 retVal = getRemoteCacheService().get( cacheName, key, getListenerId() ); 182 } 183 184 // Eventually the instance of will not be necessary. 185 if ( retVal instanceof ICacheElementSerialized ) 186 { 187 // Never try to deserialize if you are a cluster client. Cluster 188 // clients are merely intra-remote cache communicators. Remote caches are assumed 189 // to have no ability to deserialize the objects. 190 if ( this.getRemoteCacheAttributes().getRemoteType() != RemoteType.CLUSTER ) 191 { 192 retVal = SerializationConversionUtil.getDeSerializedCacheElement( (ICacheElementSerialized<K, V>) retVal, 193 super.getElementSerializer() ); 194 } 195 } 196 } 197 catch ( Exception ex ) 198 { 199 handleException( ex, "Failed to get [" + key + "] from [" + cacheName + "]", ICacheEventLogger.GET_EVENT ); 200 } 201 return retVal; 202 } 203 204 /** 205 * This allows gets to timeout in case of remote server machine shutdown. 206 * <p> 207 * @param key 208 * @return ICacheElement 209 * @throws IOException 210 */ 211 public ICacheElement<K, V> getUsingPool( final K key ) 212 throws IOException 213 { 214 int timeout = getRemoteCacheAttributes().getGetTimeoutMillis(); 215 216 try 217 { 218 Callable<ICacheElement<K, V>> command = new Callable<ICacheElement<K, V>>() 219 { 220 @Override 221 public ICacheElement<K, V> call() 222 throws IOException 223 { 224 return getRemoteCacheService().get( cacheName, key, getListenerId() ); 225 } 226 }; 227 228 // execute using the pool 229 Future<ICacheElement<K, V>> future = pool.submit(command); 230 231 // used timed get in order to timeout 232 ICacheElement<K, V> ice = future.get(timeout, TimeUnit.MILLISECONDS); 233 234 if ( log.isDebugEnabled() ) 235 { 236 if ( ice == null ) 237 { 238 log.debug( "nothing found in remote cache" ); 239 } 240 else 241 { 242 log.debug( "found item in remote cache" ); 243 } 244 } 245 return ice; 246 } 247 catch ( TimeoutException te ) 248 { 249 log.warn( "TimeoutException, Get Request timed out after " + timeout ); 250 throw new IOException( "Get Request timed out after " + timeout ); 251 } 252 catch ( InterruptedException ex ) 253 { 254 log.warn( "InterruptedException, Get Request timed out after " + timeout ); 255 throw new IOException( "Get Request timed out after " + timeout ); 256 } 257 catch (ExecutionException ex) 258 { 259 // assume that this is an IOException thrown by the callable. 260 log.error( "ExecutionException, Assuming an IO exception thrown in the background.", ex ); 261 throw new IOException( "Get Request timed out after " + timeout ); 262 } 263 } 264 265 /** 266 * Calls get matching on the server. Each entry in the result is unwrapped. 267 * <p> 268 * @param pattern 269 * @return Map 270 * @throws IOException 271 */ 272 @Override 273 public Map<K, ICacheElement<K, V>> processGetMatching( String pattern ) 274 throws IOException 275 { 276 Map<K, ICacheElement<K, V>> results = new HashMap<K, ICacheElement<K, V>>(); 277 try 278 { 279 Map<K, ICacheElement<K, V>> rawResults = getRemoteCacheService().getMatching( cacheName, pattern, getListenerId() ); 280 281 // Eventually the instance of will not be necessary. 282 if ( rawResults != null ) 283 { 284 for (Map.Entry<K, ICacheElement<K, V>> entry : rawResults.entrySet()) 285 { 286 ICacheElement<K, V> unwrappedResult = null; 287 if ( entry.getValue() instanceof ICacheElementSerialized ) 288 { 289 // Never try to deserialize if you are a cluster client. Cluster 290 // clients are merely intra-remote cache communicators. Remote caches are assumed 291 // to have no ability to deserialize the objects. 292 if ( this.getRemoteCacheAttributes().getRemoteType() != RemoteType.CLUSTER ) 293 { 294 unwrappedResult = SerializationConversionUtil 295 .getDeSerializedCacheElement( (ICacheElementSerialized<K, V>) entry.getValue(), 296 super.getElementSerializer() ); 297 } 298 } 299 else 300 { 301 unwrappedResult = entry.getValue(); 302 } 303 results.put( entry.getKey(), unwrappedResult ); 304 } 305 } 306 } 307 catch ( Exception ex ) 308 { 309 handleException( ex, "Failed to getMatching [" + pattern + "] from [" + cacheName + "]", 310 ICacheEventLogger.GET_EVENT ); 311 } 312 return results; 313 } 314 315 /** 316 * Gets multiple items from the cache based on the given set of keys. 317 * <p> 318 * @param keys 319 * @return a map of K key to ICacheElement<K, V> element, or an empty map if there is no 320 * data in cache for any of these keys 321 * @throws IOException 322 */ 323 @Override 324 protected Map<K, ICacheElement<K, V>> processGetMultiple( Set<K> keys ) 325 throws IOException 326 { 327 Map<K, ICacheElement<K, V>> elements = new HashMap<K, ICacheElement<K, V>>(); 328 if ( keys != null && !keys.isEmpty() ) 329 { 330 for (K key : keys) 331 { 332 ICacheElement<K, V> element = get( key ); 333 334 if ( element != null ) 335 { 336 elements.put( key, element ); 337 } 338 } 339 } 340 return elements; 341 } 342 343 /** 344 * Synchronously remove from the remote cache; if failed, replace the remote handle with a 345 * zombie. 346 * <p> 347 * @param key 348 * @return boolean, whether or not the item was removed 349 * @throws IOException 350 */ 351 @Override 352 protected boolean processRemove( K key ) 353 throws IOException 354 { 355 if ( !this.getRemoteCacheAttributes().getGetOnly() ) 356 { 357 if ( log.isDebugEnabled() ) 358 { 359 log.debug( "remove> key=" + key ); 360 } 361 try 362 { 363 getRemoteCacheService().remove( cacheName, key, getListenerId() ); 364 } 365 catch ( Exception ex ) 366 { 367 handleException( ex, "Failed to remove " + key + " from " + cacheName, ICacheEventLogger.REMOVE_EVENT ); 368 } 369 return true; 370 } 371 return false; 372 } 373 374 /** 375 * Synchronously removeAll from the remote cache; if failed, replace the remote handle with a 376 * zombie. 377 * <p> 378 * @throws IOException 379 */ 380 @Override 381 protected void processRemoveAll() 382 throws IOException 383 { 384 if ( !this.getRemoteCacheAttributes().getGetOnly() ) 385 { 386 try 387 { 388 getRemoteCacheService().removeAll( cacheName, getListenerId() ); 389 } 390 catch ( Exception ex ) 391 { 392 handleException( ex, "Failed to remove all from " + cacheName, ICacheEventLogger.REMOVEALL_EVENT ); 393 } 394 } 395 } 396 397 /** 398 * Serializes the object and then calls update on the remote server with the byte array. The 399 * byte array is wrapped in a ICacheElementSerialized. This allows the remote server to operate 400 * without any knowledge of caches classes. 401 * <p> 402 * @param ce 403 * @throws IOException 404 */ 405 @Override 406 protected void processUpdate( ICacheElement<K, V> ce ) 407 throws IOException 408 { 409 if ( !getRemoteCacheAttributes().getGetOnly() ) 410 { 411 ICacheElementSerialized<K, V> serialized = null; 412 try 413 { 414 if ( log.isDebugEnabled() ) 415 { 416 log.debug( "sending item to remote server" ); 417 } 418 419 // convert so we don't have to know about the object on the 420 // other end. 421 serialized = SerializationConversionUtil.getSerializedCacheElement( ce, super.getElementSerializer() ); 422 423 remoteCacheService.update( serialized, getListenerId() ); 424 } 425 catch ( NullPointerException npe ) 426 { 427 log.error( "npe for ce = " + ce + "ce.attr = " + ce.getElementAttributes(), npe ); 428 } 429 catch ( Exception ex ) 430 { 431 // event queue will wait and retry 432 handleException( ex, "Failed to put [" + ce.getKey() + "] to " + ce.getCacheName(), 433 ICacheEventLogger.UPDATE_EVENT ); 434 } 435 } 436 else 437 { 438 if ( log.isDebugEnabled() ) 439 { 440 log.debug( "get only mode, not sending to remote server" ); 441 } 442 } 443 } 444 445 /** 446 * Return the keys in this cache. 447 * <p> 448 * @see org.apache.commons.jcs.auxiliary.AuxiliaryCache#getKeySet() 449 */ 450 @Override 451 public Set<K> getKeySet() 452 throws IOException 453 { 454 return getRemoteCacheService().getKeySet(cacheName); 455 } 456 457 /** 458 * Allows other member of this package to access the listener. This is mainly needed for 459 * deregistering a listener. 460 * <p> 461 * @return IRemoteCacheListener, the listener for this remote server 462 */ 463 @Override 464 public IRemoteCacheListener<K, V> getListener() 465 { 466 return getRemoteCacheListener(); 467 } 468 469 /** 470 * let the remote cache set a listener_id. Since there is only one listener for all the regions 471 * and every region gets registered? the id shouldn't be set if it isn't zero. If it is we 472 * assume that it is a reconnect. 473 * <p> 474 * @param id The new listenerId value 475 */ 476 public void setListenerId( long id ) 477 { 478 if ( getRemoteCacheListener() != null ) 479 { 480 try 481 { 482 getRemoteCacheListener().setListenerId( id ); 483 484 if ( log.isDebugEnabled() ) 485 { 486 log.debug( "set listenerId = " + id ); 487 } 488 } 489 catch ( Exception e ) 490 { 491 log.error( "Problem setting listenerId", e ); 492 } 493 } 494 } 495 496 /** 497 * Gets the listenerId attribute of the RemoteCacheListener object 498 * <p> 499 * @return The listenerId value 500 */ 501 @Override 502 public long getListenerId() 503 { 504 if ( getRemoteCacheListener() != null ) 505 { 506 try 507 { 508 if ( log.isDebugEnabled() ) 509 { 510 log.debug( "get listenerId = " + getRemoteCacheListener().getListenerId() ); 511 } 512 return getRemoteCacheListener().getListenerId(); 513 } 514 catch ( Exception e ) 515 { 516 log.error( "Problem getting listenerId", e ); 517 } 518 } 519 return -1; 520 } 521 522 /** 523 * Returns the current cache size. 524 * @return The size value 525 */ 526 @Override 527 public int getSize() 528 { 529 return 0; 530 } 531 532 /** 533 * Custom exception handling some children. This should be used to initiate failover. 534 * <p> 535 * @param ex 536 * @param msg 537 * @param eventName 538 * @throws IOException 539 */ 540 protected abstract void handleException( Exception ex, String msg, String eventName ) 541 throws IOException; 542 543 /** 544 * Gets the stats attribute of the RemoteCache object. 545 * <p> 546 * @return The stats value 547 */ 548 @Override 549 public String getStats() 550 { 551 return getStatistics().toString(); 552 } 553 554 /** 555 * @return IStats object 556 */ 557 @Override 558 public IStats getStatistics() 559 { 560 IStats stats = new Stats(); 561 stats.setTypeName( "AbstractRemoteAuxiliaryCache" ); 562 563 ArrayList<IStatElement<?>> elems = new ArrayList<IStatElement<?>>(); 564 565 elems.add(new StatElement<String>( "Remote Type", this.getRemoteCacheAttributes().getRemoteTypeName() ) ); 566 567// if ( this.getRemoteCacheAttributes().getRemoteType() == RemoteType.CLUSTER ) 568// { 569// // something cluster specific 570// } 571 572 elems.add(new StatElement<Boolean>( "UsePoolForGet", Boolean.valueOf(usePoolForGet) ) ); 573 574 if ( pool != null ) 575 { 576 elems.add(new StatElement<Integer>( "Pool Size", Integer.valueOf(pool.getPoolSize()) ) ); 577 elems.add(new StatElement<Integer>( "Maximum Pool Size", Integer.valueOf(pool.getMaximumPoolSize()) ) ); 578 } 579 580 if ( getRemoteCacheService() instanceof ZombieCacheServiceNonLocal ) 581 { 582 elems.add(new StatElement<Integer>( "Zombie Queue Size", 583 Integer.valueOf(( (ZombieCacheServiceNonLocal<K, V>) getRemoteCacheService() ).getQueueSize()) ) ); 584 } 585 586 stats.setStatElements( elems ); 587 588 return stats; 589 } 590 591 /** 592 * Returns the cache status. An error status indicates the remote connection is not available. 593 * <p> 594 * @return The status value 595 */ 596 @Override 597 public CacheStatus getStatus() 598 { 599 return getRemoteCacheService() instanceof IZombie ? CacheStatus.ERROR : CacheStatus.ALIVE; 600 } 601 602 /** 603 * Replaces the current remote cache service handle with the given handle. If the current remote 604 * is a Zombie, then it propagates any events that are queued to the restored service. 605 * <p> 606 * @param restoredRemote ICacheServiceNonLocal -- the remote server or proxy to the remote server 607 */ 608 @Override 609 public void fixCache( ICacheServiceNonLocal<?, ?> restoredRemote ) 610 { 611 @SuppressWarnings("unchecked") // Don't know how to do this properly 612 ICacheServiceNonLocal<K, V> remote = (ICacheServiceNonLocal<K, V>)restoredRemote; 613 ICacheServiceNonLocal<K, V> prevRemote = getRemoteCacheService(); 614 if ( prevRemote instanceof ZombieCacheServiceNonLocal ) 615 { 616 ZombieCacheServiceNonLocal<K, V> zombie = (ZombieCacheServiceNonLocal<K, V>) prevRemote; 617 setRemoteCacheService( remote ); 618 try 619 { 620 zombie.propagateEvents( remote ); 621 } 622 catch ( Exception e ) 623 { 624 try 625 { 626 handleException( e, "Problem propagating events from Zombie Queue to new Remote Service.", 627 "fixCache" ); 628 } 629 catch ( IOException e1 ) 630 { 631 // swallow, since this is just expected kick back. Handle always throws 632 } 633 } 634 } 635 else 636 { 637 setRemoteCacheService( remote ); 638 } 639 } 640 641 642 /** 643 * Gets the cacheType attribute of the RemoteCache object 644 * @return The cacheType value 645 */ 646 @Override 647 public CacheType getCacheType() 648 { 649 return CacheType.REMOTE_CACHE; 650 } 651 652 /** 653 * Gets the cacheName attribute of the RemoteCache object. 654 * <p> 655 * @return The cacheName value 656 */ 657 @Override 658 public String getCacheName() 659 { 660 return cacheName; 661 } 662 663 /** 664 * @param remote the remote to set 665 */ 666 protected void setRemoteCacheService( ICacheServiceNonLocal<K, V> remote ) 667 { 668 this.remoteCacheService = remote; 669 } 670 671 /** 672 * @return the remote 673 */ 674 protected ICacheServiceNonLocal<K, V> getRemoteCacheService() 675 { 676 return remoteCacheService; 677 } 678 679 /** 680 * @return Returns the AuxiliaryCacheAttributes. 681 */ 682 @Override 683 public AuxiliaryCacheAttributes getAuxiliaryCacheAttributes() 684 { 685 return getRemoteCacheAttributes(); 686 } 687 688 /** 689 * @param remoteCacheAttributes the remoteCacheAttributes to set 690 */ 691 protected void setRemoteCacheAttributes( IRemoteCacheAttributes remoteCacheAttributes ) 692 { 693 this.remoteCacheAttributes = remoteCacheAttributes; 694 } 695 696 /** 697 * @return the remoteCacheAttributes 698 */ 699 protected IRemoteCacheAttributes getRemoteCacheAttributes() 700 { 701 return remoteCacheAttributes; 702 } 703 704 /** 705 * @param remoteCacheListener the remoteCacheListener to set 706 */ 707 protected void setRemoteCacheListener( IRemoteCacheListener<K, V> remoteCacheListener ) 708 { 709 this.remoteCacheListener = remoteCacheListener; 710 } 711 712 /** 713 * @return the remoteCacheListener 714 */ 715 protected IRemoteCacheListener<K, V> getRemoteCacheListener() 716 { 717 return remoteCacheListener; 718 } 719}