001package org.apache.commons.jcs3.auxiliary.lateral; 002 003/* 004 * Licensed to the Apache Software Foundation (ASF) under one 005 * or more contributor license agreements. See the NOTICE file 006 * distributed with this work for additional information 007 * regarding copyright ownership. The ASF licenses this file 008 * to you under the Apache License, Version 2.0 (the 009 * "License"); you may not use this file except in compliance 010 * with the License. You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, 015 * software distributed under the License is distributed on an 016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 017 * KIND, either express or implied. See the License for the 018 * specific language governing permissions and limitations 019 * under the License. 020 */ 021 022import java.io.IOException; 023import java.rmi.UnmarshalException; 024import java.util.ArrayList; 025import java.util.Collections; 026import java.util.HashMap; 027import java.util.Map; 028import java.util.Map.Entry; 029import java.util.Set; 030import java.util.stream.Collectors; 031 032import org.apache.commons.jcs3.auxiliary.AbstractAuxiliaryCache; 033import org.apache.commons.jcs3.auxiliary.lateral.behavior.ILateralCacheAttributes; 034import org.apache.commons.jcs3.engine.CacheAdaptor; 035import org.apache.commons.jcs3.engine.CacheEventQueueFactory; 036import org.apache.commons.jcs3.engine.CacheInfo; 037import org.apache.commons.jcs3.engine.CacheStatus; 038import org.apache.commons.jcs3.engine.behavior.ICacheElement; 039import org.apache.commons.jcs3.engine.behavior.ICacheEventQueue; 040import org.apache.commons.jcs3.engine.behavior.ICacheServiceNonLocal; 041import org.apache.commons.jcs3.engine.stats.StatElement; 042import org.apache.commons.jcs3.engine.stats.Stats; 043import org.apache.commons.jcs3.engine.stats.behavior.IStatElement; 044import org.apache.commons.jcs3.engine.stats.behavior.IStats; 045import org.apache.commons.jcs3.log.Log; 046import org.apache.commons.jcs3.log.LogManager; 047 048/** 049 * Used to queue up update requests to the underlying cache. These requests will be processed in 050 * their order of arrival via the cache event queue processor. 051 */ 052public class LateralCacheNoWait<K, V> 053 extends AbstractAuxiliaryCache<K, V> 054{ 055 /** The logger. */ 056 private static final Log log = LogManager.getLog( LateralCacheNoWait.class ); 057 058 /** The cache */ 059 private final LateralCache<K, V> cache; 060 061 /** Identify this object */ 062 private String identityKey; 063 064 /** The event queue */ 065 private ICacheEventQueue<K, V> eventQueue; 066 067 /** times get called */ 068 private int getCount; 069 070 /** times remove called */ 071 private int removeCount; 072 073 /** times put called */ 074 private int putCount; 075 076 /** 077 * Constructs with the given lateral cache, and fires up an event queue for asynchronous 078 * processing. 079 * <p> 080 * @param cache 081 */ 082 public LateralCacheNoWait( final LateralCache<K, V> cache ) 083 { 084 this.cache = cache; 085 this.identityKey = cache.getCacheName(); 086 this.setCacheEventLogger(cache.getCacheEventLogger()); 087 this.setElementSerializer(cache.getElementSerializer()); 088 089 log.debug( "Constructing LateralCacheNoWait, LateralCache = [{0}]", cache ); 090 091 final CacheEventQueueFactory<K, V> fact = new CacheEventQueueFactory<>(); 092 this.eventQueue = fact.createCacheEventQueue( new CacheAdaptor<>( cache ), 093 CacheInfo.listenerId, cache.getCacheName(), 094 getAuxiliaryCacheAttributes().getEventQueuePoolName(), 095 getAuxiliaryCacheAttributes().getEventQueueType() ); 096 097 // need each no wait to handle each of its real updates and removes, 098 // since there may 099 // be more than one per cache? alternative is to have the cache 100 // perform updates using a different method that specifies the listener 101 // this.q = new CacheEventQueue(new CacheAdaptor(this), 102 // LateralCacheInfo.listenerId, cache.getCacheName()); 103 if ( cache.getStatus() == CacheStatus.ERROR ) 104 { 105 eventQueue.destroy(); 106 } 107 } 108 109 /** 110 * The identifying key to this no wait 111 * 112 * @return the identity key 113 * @since 3.1 114 */ 115 public String getIdentityKey() 116 { 117 return identityKey; 118 } 119 120 /** 121 * Set the identifying key to this no wait 122 * 123 * @param identityKey the identityKey to set 124 * @since 3.1 125 */ 126 public void setIdentityKey(String identityKey) 127 { 128 this.identityKey = identityKey; 129 } 130 131 /** 132 * @param ce 133 * @throws IOException 134 */ 135 @Override 136 public void update( final ICacheElement<K, V> ce ) 137 throws IOException 138 { 139 putCount++; 140 try 141 { 142 eventQueue.addPutEvent( ce ); 143 } 144 catch ( final IOException ex ) 145 { 146 log.error( ex ); 147 eventQueue.destroy(); 148 } 149 } 150 151 /** 152 * Synchronously reads from the lateral cache. 153 * <p> 154 * @param key 155 * @return ICacheElement<K, V> if found, else null 156 */ 157 @Override 158 public ICacheElement<K, V> get( final K key ) 159 { 160 getCount++; 161 if ( this.getStatus() != CacheStatus.ERROR ) 162 { 163 try 164 { 165 return cache.get( key ); 166 } 167 catch ( final UnmarshalException ue ) 168 { 169 log.debug( "Retrying the get owing to UnmarshalException..." ); 170 try 171 { 172 return cache.get( key ); 173 } 174 catch ( final IOException ex ) 175 { 176 log.error( "Failed in retrying the get for the second time." ); 177 eventQueue.destroy(); 178 } 179 } 180 catch ( final IOException ex ) 181 { 182 eventQueue.destroy(); 183 } 184 } 185 return null; 186 } 187 188 /** 189 * Gets multiple items from the cache based on the given set of keys. 190 * <p> 191 * @param keys 192 * @return a map of K key to ICacheElement<K, V> element, or an empty map if there is no 193 * data in cache for any of these keys 194 */ 195 @Override 196 public Map<K, ICacheElement<K, V>> getMultiple(final Set<K> keys) 197 { 198 if ( keys != null && !keys.isEmpty() ) 199 { 200 return keys.stream() 201 .collect(Collectors.toMap( 202 key -> key, 203 this::get)).entrySet().stream() 204 .filter(entry -> entry.getValue() != null) 205 .collect(Collectors.toMap( 206 Entry::getKey, 207 Entry::getValue)); 208 } 209 210 return new HashMap<>(); 211 } 212 213 /** 214 * Synchronously reads from the lateral cache. 215 * <p> 216 * @param pattern 217 * @return ICacheElement<K, V> if found, else empty 218 */ 219 @Override 220 public Map<K, ICacheElement<K, V>> getMatching(final String pattern) 221 { 222 getCount++; 223 if ( this.getStatus() != CacheStatus.ERROR ) 224 { 225 try 226 { 227 return cache.getMatching( pattern ); 228 } 229 catch ( final UnmarshalException ue ) 230 { 231 log.debug( "Retrying the get owing to UnmarshalException." ); 232 try 233 { 234 return cache.getMatching( pattern ); 235 } 236 catch ( final IOException ex ) 237 { 238 log.error( "Failed in retrying the get for the second time." ); 239 eventQueue.destroy(); 240 } 241 } 242 catch ( final IOException ex ) 243 { 244 eventQueue.destroy(); 245 } 246 } 247 return Collections.emptyMap(); 248 } 249 250 /** 251 * Return the keys in this cache. 252 * <p> 253 * @see org.apache.commons.jcs3.auxiliary.AuxiliaryCache#getKeySet() 254 */ 255 @Override 256 public Set<K> getKeySet() throws IOException 257 { 258 try 259 { 260 return cache.getKeySet(); 261 } 262 catch ( final IOException ex ) 263 { 264 log.error( ex ); 265 eventQueue.destroy(); 266 } 267 return Collections.emptySet(); 268 } 269 270 /** 271 * Adds a remove request to the lateral cache. 272 * <p> 273 * @param key 274 * @return always false 275 */ 276 @Override 277 public boolean remove( final K key ) 278 { 279 removeCount++; 280 try 281 { 282 eventQueue.addRemoveEvent( key ); 283 } 284 catch ( final IOException ex ) 285 { 286 log.error( ex ); 287 eventQueue.destroy(); 288 } 289 return false; 290 } 291 292 /** Adds a removeAll request to the lateral cache. */ 293 @Override 294 public void removeAll() 295 { 296 try 297 { 298 eventQueue.addRemoveAllEvent(); 299 } 300 catch ( final IOException ex ) 301 { 302 log.error( ex ); 303 eventQueue.destroy(); 304 } 305 } 306 307 /** Adds a dispose request to the lateral cache. */ 308 @Override 309 public void dispose() 310 { 311 try 312 { 313 eventQueue.addDisposeEvent(); 314 } 315 catch ( final IOException ex ) 316 { 317 log.error( ex ); 318 eventQueue.destroy(); 319 } 320 } 321 322 /** 323 * No lateral invocation. 324 * <p> 325 * @return The size value 326 */ 327 @Override 328 public int getSize() 329 { 330 return cache.getSize(); 331 } 332 333 /** 334 * No lateral invocation. 335 * <p> 336 * @return The cacheType value 337 */ 338 @Override 339 public CacheType getCacheType() 340 { 341 return cache.getCacheType(); 342 } 343 344 /** 345 * Returns the async cache status. An error status indicates either the lateral connection is not 346 * available, or the asyn queue has been unexpectedly destroyed. No lateral invocation. 347 * <p> 348 * @return The status value 349 */ 350 @Override 351 public CacheStatus getStatus() 352 { 353 return eventQueue.isWorking() ? cache.getStatus() : CacheStatus.ERROR; 354 } 355 356 /** 357 * Gets the cacheName attribute of the LateralCacheNoWait object 358 * <p> 359 * @return The cacheName value 360 */ 361 @Override 362 public String getCacheName() 363 { 364 return cache.getCacheName(); 365 } 366 367 /** 368 * Replaces the lateral cache service handle with the given handle and reset the queue by 369 * starting up a new instance. 370 * <p> 371 * @param lateral 372 */ 373 public void fixCache( final ICacheServiceNonLocal<K, V> lateral ) 374 { 375 cache.fixCache( lateral ); 376 resetEventQ(); 377 } 378 379 /** 380 * Resets the event q by first destroying the existing one and starting up new one. 381 */ 382 public void resetEventQ() 383 { 384 if ( eventQueue.isWorking() ) 385 { 386 eventQueue.destroy(); 387 } 388 final CacheEventQueueFactory<K, V> fact = new CacheEventQueueFactory<>(); 389 this.eventQueue = fact.createCacheEventQueue( new CacheAdaptor<>( cache ), 390 CacheInfo.listenerId, cache.getCacheName(), 391 getAuxiliaryCacheAttributes().getEventQueuePoolName(), 392 getAuxiliaryCacheAttributes().getEventQueueType() ); 393 } 394 395 /** 396 * @return Returns the AuxiliaryCacheAttributes. 397 */ 398 @Override 399 public ILateralCacheAttributes getAuxiliaryCacheAttributes() 400 { 401 return cache.getAuxiliaryCacheAttributes(); 402 } 403 404 /** 405 * getStats 406 * @return String 407 */ 408 @Override 409 public String getStats() 410 { 411 return getStatistics().toString(); 412 } 413 414 /** 415 * this won't be called since we don't do ICache logging here. 416 * <p> 417 * @return String 418 */ 419 @Override 420 public String getEventLoggingExtraInfo() 421 { 422 return "Lateral Cache No Wait"; 423 } 424 425 /** 426 * @return statistics about this communication 427 */ 428 @Override 429 public IStats getStatistics() 430 { 431 final IStats stats = new Stats(); 432 stats.setTypeName( "Lateral Cache No Wait" ); 433 434 // get the stats from the event queue too 435 final IStats eqStats = this.eventQueue.getStatistics(); 436 final ArrayList<IStatElement<?>> elems = new ArrayList<>(eqStats.getStatElements()); 437 438 elems.add(new StatElement<>( "Get Count", Integer.valueOf(this.getCount) ) ); 439 elems.add(new StatElement<>( "Remove Count", Integer.valueOf(this.removeCount) ) ); 440 elems.add(new StatElement<>( "Put Count", Integer.valueOf(this.putCount) ) ); 441 elems.add(new StatElement<>( "Attributes", cache.getAuxiliaryCacheAttributes() ) ); 442 443 stats.setStatElements( elems ); 444 445 return stats; 446 } 447 448 /** 449 * @return debugging info. 450 */ 451 @Override 452 public String toString() 453 { 454 final StringBuilder buf = new StringBuilder(); 455 buf.append( " LateralCacheNoWait " ); 456 buf.append( " Status = " + this.getStatus() ); 457 buf.append( " cache = [" + cache.toString() + "]" ); 458 return buf.toString(); 459 } 460}