001package org.apache.commons.jcs3.auxiliary.remote; 002 003/* 004 * Licensed to the Apache Software Foundation (ASF) under one 005 * or more contributor license agreements. See the NOTICE file 006 * distributed with this work for additional information 007 * regarding copyright ownership. The ASF licenses this file 008 * to you under the Apache License, Version 2.0 (the 009 * "License"); you may not use this file except in compliance 010 * with the License. You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, 015 * software distributed under the License is distributed on an 016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 017 * KIND, either express or implied. See the License for the 018 * specific language governing permissions and limitations 019 * under the License. 020 */ 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.HashMap; 025import java.util.Map; 026import java.util.Set; 027import java.util.concurrent.Callable; 028import java.util.concurrent.ExecutionException; 029import java.util.concurrent.ExecutorService; 030import java.util.concurrent.Future; 031import java.util.concurrent.TimeUnit; 032import java.util.concurrent.TimeoutException; 033 034import org.apache.commons.jcs3.auxiliary.AbstractAuxiliaryCacheEventLogging; 035import org.apache.commons.jcs3.auxiliary.AuxiliaryCacheAttributes; 036import org.apache.commons.jcs3.auxiliary.remote.behavior.IRemoteCacheAttributes; 037import org.apache.commons.jcs3.auxiliary.remote.behavior.IRemoteCacheClient; 038import org.apache.commons.jcs3.auxiliary.remote.behavior.IRemoteCacheListener; 039import org.apache.commons.jcs3.auxiliary.remote.server.behavior.RemoteType; 040import org.apache.commons.jcs3.engine.CacheStatus; 041import org.apache.commons.jcs3.engine.ZombieCacheServiceNonLocal; 042import org.apache.commons.jcs3.engine.behavior.ICacheElement; 043import org.apache.commons.jcs3.engine.behavior.ICacheElementSerialized; 044import org.apache.commons.jcs3.engine.behavior.ICacheServiceNonLocal; 045import org.apache.commons.jcs3.engine.behavior.IZombie; 046import org.apache.commons.jcs3.engine.logging.behavior.ICacheEventLogger; 047import org.apache.commons.jcs3.engine.stats.StatElement; 048import org.apache.commons.jcs3.engine.stats.Stats; 049import org.apache.commons.jcs3.engine.stats.behavior.IStatElement; 050import org.apache.commons.jcs3.engine.stats.behavior.IStats; 051import org.apache.commons.jcs3.log.Log; 052import org.apache.commons.jcs3.log.LogManager; 053import org.apache.commons.jcs3.utils.serialization.SerializationConversionUtil; 054import org.apache.commons.jcs3.utils.threadpool.ThreadPoolManager; 055 056/** Abstract base for remote caches. I'm trying to break out and reuse common functionality. */ 057public abstract class AbstractRemoteAuxiliaryCache<K, V> 058 extends AbstractAuxiliaryCacheEventLogging<K, V> 059 implements IRemoteCacheClient<K, V> 060{ 061 /** The logger. */ 062 private static final Log log = LogManager.getLog( AbstractRemoteAuxiliaryCache.class ); 063 064 /** 065 * This does the work. In an RMI instances, it will be a remote reference. In an http remote 066 * cache it will be an http client. In zombie mode it is replaced with a balking facade. 067 */ 068 private ICacheServiceNonLocal<K, V> remoteCacheService; 069 070 /** The cacheName */ 071 protected final String cacheName; 072 073 /** The listener. This can be null. */ 074 private IRemoteCacheListener<K, V> remoteCacheListener; 075 076 /** The configuration values. TODO, we'll need a base here. */ 077 private IRemoteCacheAttributes remoteCacheAttributes; 078 079 /** A thread pool for gets if configured. */ 080 private ExecutorService pool; 081 082 /** Should we get asynchronously using a pool. */ 083 private boolean usePoolForGet; 084 085 /** 086 * Creates the base. 087 * <p> 088 * @param cattr 089 * @param remote 090 * @param listener 091 */ 092 public AbstractRemoteAuxiliaryCache( final IRemoteCacheAttributes cattr, final ICacheServiceNonLocal<K, V> remote, 093 final IRemoteCacheListener<K, V> listener ) 094 { 095 this.setRemoteCacheAttributes( cattr ); 096 this.cacheName = cattr.getCacheName(); 097 this.setRemoteCacheService( remote ); 098 this.setRemoteCacheListener( listener ); 099 100 if ( log.isDebugEnabled() ) 101 { 102 log.debug( "Construct> cacheName={0}", cattr::getCacheName); 103 log.debug( "irca = {0}", this::getRemoteCacheAttributes); 104 log.debug( "remote = {0}", remote ); 105 log.debug( "listener = {0}", listener ); 106 } 107 108 // use a pool if it is greater than 0 109 log.debug( "GetTimeoutMillis() = {0}", 110 () -> getRemoteCacheAttributes().getGetTimeoutMillis() ); 111 112 if ( getRemoteCacheAttributes().getGetTimeoutMillis() > 0 ) 113 { 114 pool = ThreadPoolManager.getInstance().getExecutorService( getRemoteCacheAttributes().getThreadPoolName() ); 115 log.debug( "Thread Pool = {0}", pool ); 116 usePoolForGet = true; 117 } 118 } 119 120 /** 121 * Synchronously dispose the remote cache; if failed, replace the remote handle with a zombie. 122 * <p> 123 * @throws IOException 124 */ 125 @Override 126 protected void processDispose() 127 throws IOException 128 { 129 log.info( "Disposing of remote cache." ); 130 try 131 { 132 if ( getRemoteCacheListener() != null ) 133 { 134 getRemoteCacheListener().dispose(); 135 } 136 } 137 catch ( final IOException ex ) 138 { 139 log.error( "Couldn't dispose", ex ); 140 handleException( ex, "Failed to dispose [" + cacheName + "]", ICacheEventLogger.DISPOSE_EVENT ); 141 } 142 } 143 144 /** 145 * Synchronously get from the remote cache; if failed, replace the remote handle with a zombie. 146 * <p> 147 * Use threadpool to timeout if a value is set for GetTimeoutMillis 148 * <p> 149 * If we are a cluster client, we need to leave the Element in its serialized form. Cluster 150 * clients cannot deserialize objects. Cluster clients get ICacheElementSerialized objects from 151 * other remote servers. 152 * <p> 153 * @param key 154 * @return ICacheElement, a wrapper around the key, value, and attributes 155 * @throws IOException 156 */ 157 @Override 158 protected ICacheElement<K, V> processGet( final K key ) 159 throws IOException 160 { 161 ICacheElement<K, V> retVal = null; 162 try 163 { 164 if ( usePoolForGet ) 165 { 166 retVal = getUsingPool( key ); 167 } 168 else 169 { 170 retVal = getRemoteCacheService().get( cacheName, key, getListenerId() ); 171 } 172 173 // Eventually the instance of will not be necessary. 174 // Never try to deserialize if you are a cluster client. Cluster 175 // clients are merely intra-remote cache communicators. Remote caches are assumed 176 // to have no ability to deserialize the objects. 177 if (retVal instanceof ICacheElementSerialized && this.getRemoteCacheAttributes().getRemoteType() != RemoteType.CLUSTER) 178 { 179 retVal = SerializationConversionUtil.getDeSerializedCacheElement( (ICacheElementSerialized<K, V>) retVal, 180 super.getElementSerializer() ); 181 } 182 } 183 catch ( final IOException | ClassNotFoundException ex ) 184 { 185 handleException( ex, "Failed to get [" + key + "] from [" + cacheName + "]", ICacheEventLogger.GET_EVENT ); 186 } 187 return retVal; 188 } 189 190 /** 191 * This allows gets to timeout in case of remote server machine shutdown. 192 * <p> 193 * @param key 194 * @return ICacheElement 195 * @throws IOException 196 */ 197 public ICacheElement<K, V> getUsingPool( final K key ) 198 throws IOException 199 { 200 final int timeout = getRemoteCacheAttributes().getGetTimeoutMillis(); 201 202 try 203 { 204 final Callable<ICacheElement<K, V>> command = () -> getRemoteCacheService().get( cacheName, key, getListenerId() ); 205 206 // execute using the pool 207 final Future<ICacheElement<K, V>> future = pool.submit(command); 208 209 // used timed get in order to timeout 210 final ICacheElement<K, V> ice = future.get(timeout, TimeUnit.MILLISECONDS); 211 212 if ( ice == null ) 213 { 214 log.debug( "nothing found in remote cache" ); 215 } 216 else 217 { 218 log.debug( "found item in remote cache" ); 219 } 220 return ice; 221 } 222 catch ( final TimeoutException te ) 223 { 224 log.warn( "TimeoutException, Get Request timed out after {0}", timeout ); 225 throw new IOException( "Get Request timed out after " + timeout ); 226 } 227 catch ( final InterruptedException ex ) 228 { 229 log.warn( "InterruptedException, Get Request timed out after {0}", timeout ); 230 throw new IOException( "Get Request timed out after " + timeout ); 231 } 232 catch (final ExecutionException ex) 233 { 234 // assume that this is an IOException thrown by the callable. 235 log.error( "ExecutionException, Assuming an IO exception thrown in the background.", ex ); 236 throw new IOException( "Get Request timed out after " + timeout ); 237 } 238 } 239 240 /** 241 * Calls get matching on the server. Each entry in the result is unwrapped. 242 * <p> 243 * @param pattern 244 * @return Map 245 * @throws IOException 246 */ 247 @Override 248 public Map<K, ICacheElement<K, V>> processGetMatching( final String pattern ) 249 throws IOException 250 { 251 final Map<K, ICacheElement<K, V>> results = new HashMap<>(); 252 try 253 { 254 final Map<K, ICacheElement<K, V>> rawResults = getRemoteCacheService().getMatching( cacheName, pattern, getListenerId() ); 255 256 // Eventually the instance of will not be necessary. 257 if ( rawResults != null ) 258 { 259 for (final Map.Entry<K, ICacheElement<K, V>> entry : rawResults.entrySet()) 260 { 261 ICacheElement<K, V> unwrappedResult = null; 262 if ( entry.getValue() instanceof ICacheElementSerialized ) 263 { 264 // Never try to deserialize if you are a cluster client. Cluster 265 // clients are merely intra-remote cache communicators. Remote caches are assumed 266 // to have no ability to deserialize the objects. 267 if ( this.getRemoteCacheAttributes().getRemoteType() != RemoteType.CLUSTER ) 268 { 269 unwrappedResult = SerializationConversionUtil 270 .getDeSerializedCacheElement( (ICacheElementSerialized<K, V>) entry.getValue(), 271 super.getElementSerializer() ); 272 } 273 } 274 else 275 { 276 unwrappedResult = entry.getValue(); 277 } 278 results.put( entry.getKey(), unwrappedResult ); 279 } 280 } 281 } 282 catch ( final IOException | ClassNotFoundException ex ) 283 { 284 handleException( ex, "Failed to getMatching [" + pattern + "] from [" + cacheName + "]", 285 ICacheEventLogger.GET_EVENT ); 286 } 287 return results; 288 } 289 290 /** 291 * Synchronously remove from the remote cache; if failed, replace the remote handle with a 292 * zombie. 293 * <p> 294 * @param key 295 * @return boolean, whether or not the item was removed 296 * @throws IOException 297 */ 298 @Override 299 protected boolean processRemove( final K key ) 300 throws IOException 301 { 302 if ( !this.getRemoteCacheAttributes().getGetOnly() ) 303 { 304 log.debug( "remove> key={0}", key ); 305 try 306 { 307 getRemoteCacheService().remove( cacheName, key, getListenerId() ); 308 } 309 catch ( final IOException ex ) 310 { 311 handleException( ex, "Failed to remove " + key + " from " + cacheName, ICacheEventLogger.REMOVE_EVENT ); 312 } 313 return true; 314 } 315 return false; 316 } 317 318 /** 319 * Synchronously removeAll from the remote cache; if failed, replace the remote handle with a 320 * zombie. 321 * <p> 322 * @throws IOException 323 */ 324 @Override 325 protected void processRemoveAll() 326 throws IOException 327 { 328 if ( !this.getRemoteCacheAttributes().getGetOnly() ) 329 { 330 try 331 { 332 getRemoteCacheService().removeAll( cacheName, getListenerId() ); 333 } 334 catch ( final IOException ex ) 335 { 336 handleException( ex, "Failed to remove all from " + cacheName, ICacheEventLogger.REMOVEALL_EVENT ); 337 } 338 } 339 } 340 341 /** 342 * Serializes the object and then calls update on the remote server with the byte array. The 343 * byte array is wrapped in a ICacheElementSerialized. This allows the remote server to operate 344 * without any knowledge of caches classes. 345 * <p> 346 * @param ce 347 * @throws IOException 348 */ 349 @Override 350 protected void processUpdate( final ICacheElement<K, V> ce ) 351 throws IOException 352 { 353 if ( !getRemoteCacheAttributes().getGetOnly() ) 354 { 355 try 356 { 357 log.debug( "sending item to remote server" ); 358 359 // convert so we don't have to know about the object on the 360 // other end. 361 ICacheElementSerialized<K, V> serialized = SerializationConversionUtil.getSerializedCacheElement( ce, super.getElementSerializer() ); 362 363 remoteCacheService.update( serialized, getListenerId() ); 364 } 365 catch ( final IOException ex ) 366 { 367 // event queue will wait and retry 368 handleException( ex, "Failed to put [" + ce.getKey() + "] to " + ce.getCacheName(), 369 ICacheEventLogger.UPDATE_EVENT ); 370 } 371 } 372 else 373 { 374 log.debug( "get only mode, not sending to remote server" ); 375 } 376 } 377 378 /** 379 * Return the keys in this cache. 380 * <p> 381 * @see org.apache.commons.jcs3.auxiliary.AuxiliaryCache#getKeySet() 382 */ 383 @Override 384 public Set<K> getKeySet() 385 throws IOException 386 { 387 return getRemoteCacheService().getKeySet(cacheName); 388 } 389 390 /** 391 * Allows other member of this package to access the listener. This is mainly needed for 392 * deregistering a listener. 393 * <p> 394 * @return IRemoteCacheListener, the listener for this remote server 395 */ 396 @Override 397 public IRemoteCacheListener<K, V> getListener() 398 { 399 return getRemoteCacheListener(); 400 } 401 402 /** 403 * let the remote cache set a listener_id. Since there is only one listener for all the regions 404 * and every region gets registered? the id shouldn't be set if it isn't zero. If it is we 405 * assume that it is a reconnect. 406 * <p> 407 * @param id The new listenerId value 408 */ 409 public void setListenerId( final long id ) 410 { 411 if ( getRemoteCacheListener() != null ) 412 { 413 try 414 { 415 getRemoteCacheListener().setListenerId( id ); 416 417 log.debug( "set listenerId = {0}", id ); 418 } 419 catch ( final IOException e ) 420 { 421 log.error( "Problem setting listenerId", e ); 422 } 423 } 424 } 425 426 /** 427 * Gets the listenerId attribute of the RemoteCacheListener object 428 * <p> 429 * @return The listenerId value 430 */ 431 @Override 432 public long getListenerId() 433 { 434 if ( getRemoteCacheListener() != null ) 435 { 436 try 437 { 438 log.debug( "get listenerId = {0}", getRemoteCacheListener().getListenerId() ); 439 return getRemoteCacheListener().getListenerId(); 440 } 441 catch ( final IOException e ) 442 { 443 log.error( "Problem getting listenerId", e ); 444 } 445 } 446 return -1; 447 } 448 449 /** 450 * Returns the current cache size. 451 * @return The size value 452 */ 453 @Override 454 public int getSize() 455 { 456 return 0; 457 } 458 459 /** 460 * Custom exception handling some children. This should be used to initiate failover. 461 * <p> 462 * @param ex 463 * @param msg 464 * @param eventName 465 * @throws IOException 466 */ 467 protected abstract void handleException( Exception ex, String msg, String eventName ) 468 throws IOException; 469 470 /** 471 * Gets the stats attribute of the RemoteCache object. 472 * <p> 473 * @return The stats value 474 */ 475 @Override 476 public String getStats() 477 { 478 return getStatistics().toString(); 479 } 480 481 /** 482 * @return IStats object 483 */ 484 @Override 485 public IStats getStatistics() 486 { 487 final IStats stats = new Stats(); 488 stats.setTypeName( "AbstractRemoteAuxiliaryCache" ); 489 490 final ArrayList<IStatElement<?>> elems = new ArrayList<>(); 491 492 elems.add(new StatElement<>( "Remote Type", this.getRemoteCacheAttributes().getRemoteTypeName() ) ); 493 494// if ( this.getRemoteCacheAttributes().getRemoteType() == RemoteType.CLUSTER ) 495// { 496// // something cluster specific 497// } 498 499 elems.add(new StatElement<>( "UsePoolForGet", Boolean.valueOf(usePoolForGet) ) ); 500 501 if ( pool != null ) 502 { 503 elems.add(new StatElement<>( "Pool", pool ) ); 504 } 505 506 if ( getRemoteCacheService() instanceof ZombieCacheServiceNonLocal ) 507 { 508 elems.add(new StatElement<>( "Zombie Queue Size", 509 Integer.valueOf(( (ZombieCacheServiceNonLocal<K, V>) getRemoteCacheService() ).getQueueSize()) ) ); 510 } 511 512 stats.setStatElements( elems ); 513 514 return stats; 515 } 516 517 /** 518 * Returns the cache status. An error status indicates the remote connection is not available. 519 * <p> 520 * @return The status value 521 */ 522 @Override 523 public CacheStatus getStatus() 524 { 525 return getRemoteCacheService() instanceof IZombie ? CacheStatus.ERROR : CacheStatus.ALIVE; 526 } 527 528 /** 529 * Replaces the current remote cache service handle with the given handle. If the current remote 530 * is a Zombie, then it propagates any events that are queued to the restored service. 531 * <p> 532 * @param restoredRemote ICacheServiceNonLocal -- the remote server or proxy to the remote server 533 */ 534 @Override 535 public void fixCache( final ICacheServiceNonLocal<?, ?> restoredRemote ) 536 { 537 @SuppressWarnings("unchecked") // Don't know how to do this properly 538 final 539 ICacheServiceNonLocal<K, V> remote = (ICacheServiceNonLocal<K, V>)restoredRemote; 540 final ICacheServiceNonLocal<K, V> prevRemote = getRemoteCacheService(); 541 if ( prevRemote instanceof ZombieCacheServiceNonLocal ) 542 { 543 final ZombieCacheServiceNonLocal<K, V> zombie = (ZombieCacheServiceNonLocal<K, V>) prevRemote; 544 setRemoteCacheService( remote ); 545 try 546 { 547 zombie.propagateEvents( remote ); 548 } 549 catch ( final Exception e ) 550 { 551 try 552 { 553 handleException( e, "Problem propagating events from Zombie Queue to new Remote Service.", 554 "fixCache" ); 555 } 556 catch ( final IOException e1 ) 557 { 558 // swallow, since this is just expected kick back. Handle always throws 559 } 560 } 561 } 562 else 563 { 564 setRemoteCacheService( remote ); 565 } 566 } 567 568 569 /** 570 * Gets the cacheType attribute of the RemoteCache object 571 * @return The cacheType value 572 */ 573 @Override 574 public CacheType getCacheType() 575 { 576 return CacheType.REMOTE_CACHE; 577 } 578 579 /** 580 * Gets the cacheName attribute of the RemoteCache object. 581 * <p> 582 * @return The cacheName value 583 */ 584 @Override 585 public String getCacheName() 586 { 587 return cacheName; 588 } 589 590 /** 591 * @param remote the remote to set 592 */ 593 protected void setRemoteCacheService( final ICacheServiceNonLocal<K, V> remote ) 594 { 595 this.remoteCacheService = remote; 596 } 597 598 /** 599 * @return the remote 600 */ 601 protected ICacheServiceNonLocal<K, V> getRemoteCacheService() 602 { 603 return remoteCacheService; 604 } 605 606 /** 607 * @return Returns the AuxiliaryCacheAttributes. 608 */ 609 @Override 610 public AuxiliaryCacheAttributes getAuxiliaryCacheAttributes() 611 { 612 return getRemoteCacheAttributes(); 613 } 614 615 /** 616 * @param remoteCacheAttributes the remoteCacheAttributes to set 617 */ 618 protected void setRemoteCacheAttributes( final IRemoteCacheAttributes remoteCacheAttributes ) 619 { 620 this.remoteCacheAttributes = remoteCacheAttributes; 621 } 622 623 /** 624 * @return the remoteCacheAttributes 625 */ 626 protected IRemoteCacheAttributes getRemoteCacheAttributes() 627 { 628 return remoteCacheAttributes; 629 } 630 631 /** 632 * @param remoteCacheListener the remoteCacheListener to set 633 */ 634 protected void setRemoteCacheListener( final IRemoteCacheListener<K, V> remoteCacheListener ) 635 { 636 this.remoteCacheListener = remoteCacheListener; 637 } 638 639 /** 640 * @return the remoteCacheListener 641 */ 642 protected IRemoteCacheListener<K, V> getRemoteCacheListener() 643 { 644 return remoteCacheListener; 645 } 646}