001package org.apache.commons.jcs.auxiliary.remote; 002 003/* 004 * Licensed to the Apache Software Foundation (ASF) under one 005 * or more contributor license agreements. See the NOTICE file 006 * distributed with this work for additional information 007 * regarding copyright ownership. The ASF licenses this file 008 * to you under the Apache License, Version 2.0 (the 009 * "License"); you may not use this file except in compliance 010 * with the License. You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, 015 * software distributed under the License is distributed on an 016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 017 * KIND, either express or implied. See the License for the 018 * specific language governing permissions and limitations 019 * under the License. 020 */ 021 022import java.io.IOException; 023import java.rmi.UnmarshalException; 024import java.util.ArrayList; 025import java.util.Collections; 026import java.util.HashMap; 027import java.util.Map; 028import java.util.Set; 029 030import org.apache.commons.jcs.auxiliary.AbstractAuxiliaryCache; 031import org.apache.commons.jcs.auxiliary.AuxiliaryCacheAttributes; 032import org.apache.commons.jcs.auxiliary.remote.behavior.IRemoteCacheClient; 033import org.apache.commons.jcs.engine.CacheAdaptor; 034import org.apache.commons.jcs.engine.CacheEventQueueFactory; 035import org.apache.commons.jcs.engine.CacheStatus; 036import org.apache.commons.jcs.engine.behavior.ICacheElement; 037import org.apache.commons.jcs.engine.behavior.ICacheEventQueue; 038import org.apache.commons.jcs.engine.behavior.ICacheServiceNonLocal; 039import org.apache.commons.jcs.engine.stats.StatElement; 040import org.apache.commons.jcs.engine.stats.Stats; 041import org.apache.commons.jcs.engine.stats.behavior.IStatElement; 042import org.apache.commons.jcs.engine.stats.behavior.IStats; 043import org.apache.commons.logging.Log; 044import org.apache.commons.logging.LogFactory; 045 046/** 047 * The RemoteCacheNoWait wraps the RemoteCacheClient. The client holds a handle on the 048 * RemoteCacheService. 049 * <p> 050 * Used to queue up update requests to the underlying cache. These requests will be processed in 051 * their order of arrival via the cache event queue processor. 052 * <p> 053 * Typically errors will be handled down stream. We only need to kill the queue if an error makes it 054 * to this level from the queue. That can only happen if the queue is damaged, since the events are 055 * Processed asynchronously. 056 * <p> 057 * There is no reason to create a queue on startup if the remote is not healthy. 058 * <p> 059 * If the remote cache encounters an error it will zombie--create a balking facade for the service. 060 * The Zombie will queue up items until the connection is restored. An alternative way to accomplish 061 * the same thing would be to stop, not destroy the queue at this level. That way items would be 062 * added to the queue and then when the connection is restored, we could start the worker threads 063 * again. This is a better long term solution, but it requires some significant changes to the 064 * complicated worker queues. 065 */ 066public class RemoteCacheNoWait<K, V> 067 extends AbstractAuxiliaryCache<K, V> 068{ 069 /** log instance */ 070 private static final Log log = LogFactory.getLog( RemoteCacheNoWait.class ); 071 072 /** The remote cache client */ 073 private final IRemoteCacheClient<K, V> remoteCacheClient; 074 075 /** Event queue for queuing up calls like put and remove. */ 076 private ICacheEventQueue<K, V> cacheEventQueue; 077 078 /** how many times get has been called. */ 079 private int getCount = 0; 080 081 /** how many times getMatching has been called. */ 082 private int getMatchingCount = 0; 083 084 /** how many times getMultiple has been called. */ 085 private int getMultipleCount = 0; 086 087 /** how many times remove has been called. */ 088 private int removeCount = 0; 089 090 /** how many times put has been called. */ 091 private int putCount = 0; 092 093 /** 094 * Constructs with the given remote cache, and fires up an event queue for asynchronous 095 * processing. 096 * <p> 097 * @param cache 098 */ 099 public RemoteCacheNoWait( IRemoteCacheClient<K, V> cache ) 100 { 101 remoteCacheClient = cache; 102 this.cacheEventQueue = createCacheEventQueue(cache); 103 104 if ( remoteCacheClient.getStatus() == CacheStatus.ERROR ) 105 { 106 cacheEventQueue.destroy(); 107 } 108 } 109 110 /** 111 * Create a cache event queue from the parameters of the remote client 112 * @param client the remote client 113 */ 114 private ICacheEventQueue<K, V> createCacheEventQueue( IRemoteCacheClient<K, V> client ) 115 { 116 CacheEventQueueFactory<K, V> factory = new CacheEventQueueFactory<K, V>(); 117 ICacheEventQueue<K, V> ceq = factory.createCacheEventQueue( 118 new CacheAdaptor<K, V>( client ), 119 client.getListenerId(), 120 client.getCacheName(), 121 client.getAuxiliaryCacheAttributes().getEventQueuePoolName(), 122 client.getAuxiliaryCacheAttributes().getEventQueueType() ); 123 return ceq; 124 } 125 126 /** 127 * Adds a put event to the queue. 128 * <p> 129 * @param element 130 * @throws IOException 131 */ 132 @Override 133 public void update( ICacheElement<K, V> element ) 134 throws IOException 135 { 136 putCount++; 137 try 138 { 139 cacheEventQueue.addPutEvent( element ); 140 } 141 catch ( IOException e ) 142 { 143 log.error( "Problem adding putEvent to queue.", e ); 144 cacheEventQueue.destroy(); 145 throw e; 146 } 147 } 148 149 /** 150 * Synchronously reads from the remote cache. 151 * <p> 152 * @param key 153 * @return element from the remote cache, or null if not present 154 * @throws IOException 155 */ 156 @Override 157 public ICacheElement<K, V> get( K key ) 158 throws IOException 159 { 160 getCount++; 161 try 162 { 163 return remoteCacheClient.get( key ); 164 } 165 catch ( UnmarshalException ue ) 166 { 167 if ( log.isDebugEnabled() ) 168 { 169 log.debug( "Retrying the get owing to UnmarshalException." ); 170 } 171 172 try 173 { 174 return remoteCacheClient.get( key ); 175 } 176 catch ( IOException ex ) 177 { 178 if ( log.isInfoEnabled() ) 179 { 180 log.info( "Failed in retrying the get for the second time. " + ex.getMessage() ); 181 } 182 } 183 } 184 catch ( IOException ex ) 185 { 186 // We don't want to destroy the queue on a get failure. 187 // The RemoteCache will Zombie and queue. 188 // Since get does not use the queue, I don't want to kill the queue. 189 throw ex; 190 } 191 192 return null; 193 } 194 195 /** 196 * @param pattern 197 * @return Map 198 * @throws IOException 199 * 200 */ 201 @Override 202 public Map<K, ICacheElement<K, V>> getMatching( String pattern ) 203 throws IOException 204 { 205 getMatchingCount++; 206 try 207 { 208 return remoteCacheClient.getMatching( pattern ); 209 } 210 catch ( UnmarshalException ue ) 211 { 212 if ( log.isDebugEnabled() ) 213 { 214 log.debug( "Retrying the getMatching owing to UnmarshalException." ); 215 } 216 217 try 218 { 219 return remoteCacheClient.getMatching( pattern ); 220 } 221 catch ( IOException ex ) 222 { 223 if ( log.isInfoEnabled() ) 224 { 225 log.info( "Failed in retrying the getMatching for the second time. " + ex.getMessage() ); 226 } 227 } 228 } 229 catch ( IOException ex ) 230 { 231 // We don't want to destroy the queue on a get failure. 232 // The RemoteCache will Zombie and queue. 233 // Since get does not use the queue, I don't want to kill the queue. 234 throw ex; 235 } 236 237 return Collections.emptyMap(); 238 } 239 240 /** 241 * Gets multiple items from the cache based on the given set of keys. Sends the getMultiple 242 * request on to the server rather than looping through the requested keys. 243 * <p> 244 * @param keys 245 * @return a map of K key to ICacheElement<K, V> element, or an empty map if there is no 246 * data in cache for any of these keys 247 * @throws IOException 248 */ 249 @Override 250 public Map<K, ICacheElement<K, V>> getMultiple( Set<K> keys ) 251 throws IOException 252 { 253 getMultipleCount++; 254 try 255 { 256 return remoteCacheClient.getMultiple( keys ); 257 } 258 catch ( UnmarshalException ue ) 259 { 260 if ( log.isDebugEnabled() ) 261 { 262 log.debug( "Retrying the getMultiple owing to UnmarshalException..." ); 263 } 264 265 try 266 { 267 return remoteCacheClient.getMultiple( keys ); 268 } 269 catch ( IOException ex ) 270 { 271 if ( log.isInfoEnabled() ) 272 { 273 log.info( "Failed in retrying the getMultiple for the second time. " + ex.getMessage() ); 274 } 275 } 276 } 277 catch ( IOException ex ) 278 { 279 // We don't want to destroy the queue on a get failure. 280 // The RemoteCache will Zombie and queue. 281 // Since get does not use the queue, I don't want to kill the queue. 282 throw ex; 283 } 284 285 return new HashMap<K, ICacheElement<K, V>>(); 286 } 287 288 /** 289 * Return the keys in this cache. 290 * <p> 291 * @see org.apache.commons.jcs.auxiliary.AuxiliaryCache#getKeySet() 292 */ 293 @Override 294 public Set<K> getKeySet() throws IOException 295 { 296 return remoteCacheClient.getKeySet(); 297 } 298 299 /** 300 * Adds a remove request to the remote cache. 301 * <p> 302 * @param key 303 * @return if this was successful 304 * @throws IOException 305 */ 306 @Override 307 public boolean remove( K key ) 308 throws IOException 309 { 310 removeCount++; 311 try 312 { 313 cacheEventQueue.addRemoveEvent( key ); 314 } 315 catch ( IOException e ) 316 { 317 log.error( "Problem adding RemoveEvent to queue.", e ); 318 cacheEventQueue.destroy(); 319 throw e; 320 } 321 return false; 322 } 323 324 /** 325 * Adds a removeAll request to the remote cache. 326 * <p> 327 * @throws IOException 328 */ 329 @Override 330 public void removeAll() 331 throws IOException 332 { 333 try 334 { 335 cacheEventQueue.addRemoveAllEvent(); 336 } 337 catch ( IOException e ) 338 { 339 log.error( "Problem adding RemoveAllEvent to queue.", e ); 340 cacheEventQueue.destroy(); 341 throw e; 342 } 343 } 344 345 /** Adds a dispose request to the remote cache. */ 346 @Override 347 public void dispose() 348 { 349 try 350 { 351 cacheEventQueue.addDisposeEvent(); 352 } 353 catch ( IOException e ) 354 { 355 log.error( "Problem adding DisposeEvent to queue.", e ); 356 cacheEventQueue.destroy(); 357 } 358 } 359 360 /** 361 * No remote invocation. 362 * <p> 363 * @return The size value 364 */ 365 @Override 366 public int getSize() 367 { 368 return remoteCacheClient.getSize(); 369 } 370 371 /** 372 * No remote invocation. 373 * <p> 374 * @return The cacheType value 375 */ 376 @Override 377 public CacheType getCacheType() 378 { 379 return CacheType.REMOTE_CACHE; 380 } 381 382 /** 383 * Returns the asyn cache status. An error status indicates either the remote connection is not 384 * available, or the asyn queue has been unexpectedly destroyed. No remote invocation. 385 * <p> 386 * @return The status value 387 */ 388 @Override 389 public CacheStatus getStatus() 390 { 391 return cacheEventQueue.isWorking() ? remoteCacheClient.getStatus() : CacheStatus.ERROR; 392 } 393 394 /** 395 * Gets the cacheName attribute of the RemoteCacheNoWait object 396 * <p> 397 * @return The cacheName value 398 */ 399 @Override 400 public String getCacheName() 401 { 402 return remoteCacheClient.getCacheName(); 403 } 404 405 /** 406 * Replaces the remote cache service handle with the given handle and reset the event queue by 407 * starting up a new instance. 408 * <p> 409 * @param remote 410 */ 411 public void fixCache( ICacheServiceNonLocal<?, ?> remote ) 412 { 413 remoteCacheClient.fixCache( remote ); 414 resetEventQ(); 415 } 416 417 /** 418 * Resets the event q by first destroying the existing one and starting up new one. 419 * <p> 420 * There may be no good reason to kill the existing queue. We will sometimes need to set a new 421 * listener id, so we should create a new queue. We should let the old queue drain. If we were 422 * Connected to the failover, it would be best to finish sending items. 423 */ 424 public void resetEventQ() 425 { 426 ICacheEventQueue<K, V> previousQueue = cacheEventQueue; 427 428 this.cacheEventQueue = createCacheEventQueue(this.remoteCacheClient); 429 430 if ( previousQueue.isWorking() ) 431 { 432 // we don't expect anything, it would have all gone to the zombie 433 if ( log.isInfoEnabled() ) 434 { 435 log.info( "resetEventQ, previous queue has [" + previousQueue.size() + "] items queued up." ); 436 } 437 previousQueue.destroy(); 438 } 439 } 440 441 /** 442 * This is temporary. It allows the manager to get the lister. 443 * <p> 444 * @return the instance of the remote cache client used by this object 445 */ 446 protected IRemoteCacheClient<K, V> getRemoteCache() 447 { 448 return remoteCacheClient; 449 } 450 451 /** 452 * @return Returns the AuxiliaryCacheAttributes. 453 */ 454 @Override 455 public AuxiliaryCacheAttributes getAuxiliaryCacheAttributes() 456 { 457 return remoteCacheClient.getAuxiliaryCacheAttributes(); 458 } 459 460 /** 461 * This is for testing only. It allows you to take a look at the event queue. 462 * <p> 463 * @return ICacheEventQueue 464 */ 465 protected ICacheEventQueue<K, V> getCacheEventQueue() 466 { 467 return this.cacheEventQueue; 468 } 469 470 /** 471 * Returns the stats and the cache.toString(). 472 * <p> 473 * @see java.lang.Object#toString() 474 */ 475 @Override 476 public String toString() 477 { 478 return getStats() + "\n" + remoteCacheClient.toString(); 479 } 480 481 /** 482 * Returns the statistics in String form. 483 * <p> 484 * @return String 485 */ 486 @Override 487 public String getStats() 488 { 489 return getStatistics().toString(); 490 } 491 492 /** 493 * @return statistics about this communication 494 */ 495 @Override 496 public IStats getStatistics() 497 { 498 IStats stats = new Stats(); 499 stats.setTypeName( "Remote Cache No Wait" ); 500 501 ArrayList<IStatElement<?>> elems = new ArrayList<IStatElement<?>>(); 502 503 elems.add(new StatElement<CacheStatus>( "Status", getStatus() ) ); 504 505 // get the stats from the cache queue too 506 IStats cStats = this.remoteCacheClient.getStatistics(); 507 if ( cStats != null ) 508 { 509 elems.addAll(cStats.getStatElements()); 510 } 511 512 // get the stats from the event queue too 513 IStats eqStats = this.cacheEventQueue.getStatistics(); 514 elems.addAll(eqStats.getStatElements()); 515 516 elems.add(new StatElement<Integer>( "Get Count", Integer.valueOf(this.getCount) ) ); 517 elems.add(new StatElement<Integer>( "GetMatching Count", Integer.valueOf(this.getMatchingCount) ) ); 518 elems.add(new StatElement<Integer>( "GetMultiple Count", Integer.valueOf(this.getMultipleCount) ) ); 519 elems.add(new StatElement<Integer>( "Remove Count", Integer.valueOf(this.removeCount) ) ); 520 elems.add(new StatElement<Integer>( "Put Count", Integer.valueOf(this.putCount) ) ); 521 522 stats.setStatElements( elems ); 523 524 return stats; 525 } 526 527 /** 528 * this won't be called since we don't do ICache logging here. 529 * <p> 530 * @return String 531 */ 532 @Override 533 public String getEventLoggingExtraInfo() 534 { 535 return "Remote Cache No Wait"; 536 } 537}