001package org.apache.commons.jcs.auxiliary.lateral; 002 003/* 004 * Licensed to the Apache Software Foundation (ASF) under one 005 * or more contributor license agreements. See the NOTICE file 006 * distributed with this work for additional information 007 * regarding copyright ownership. The ASF licenses this file 008 * to you under the Apache License, Version 2.0 (the 009 * "License"); you may not use this file except in compliance 010 * with the License. You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, 015 * software distributed under the License is distributed on an 016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 017 * KIND, either express or implied. See the License for the 018 * specific language governing permissions and limitations 019 * under the License. 020 */ 021 022import java.io.IOException; 023import java.util.Collections; 024import java.util.HashMap; 025import java.util.Map; 026import java.util.Set; 027 028import org.apache.commons.jcs.auxiliary.AbstractAuxiliaryCacheEventLogging; 029import org.apache.commons.jcs.auxiliary.AuxiliaryCacheAttributes; 030import org.apache.commons.jcs.auxiliary.lateral.behavior.ILateralCacheAttributes; 031import org.apache.commons.jcs.engine.CacheInfo; 032import org.apache.commons.jcs.engine.CacheStatus; 033import org.apache.commons.jcs.engine.ZombieCacheServiceNonLocal; 034import org.apache.commons.jcs.engine.behavior.ICacheElement; 035import org.apache.commons.jcs.engine.behavior.ICacheServiceNonLocal; 036import org.apache.commons.jcs.engine.behavior.IZombie; 037import org.apache.commons.jcs.engine.stats.Stats; 038import org.apache.commons.jcs.engine.stats.behavior.IStats; 039import org.apache.commons.logging.Log; 040import org.apache.commons.logging.LogFactory; 041 042/** 043 * Lateral distributor. Returns null on get by default. Net search not implemented. 044 */ 045public class LateralCache<K, V> 046 extends AbstractAuxiliaryCacheEventLogging<K, V> 047{ 048 /** The logger. */ 049 private static final Log log = LogFactory.getLog( LateralCache.class ); 050 051 /** generalize this, use another interface */ 052 private final ILateralCacheAttributes lateralCacheAttributes; 053 054 /** The region name */ 055 final String cacheName; 056 057 /** either http, socket.udp, or socket.tcp can set in config */ 058 private ICacheServiceNonLocal<K, V> lateralCacheService; 059 060 /** Monitors the connection. */ 061 private LateralCacheMonitor monitor; 062 063 /** 064 * Constructor for the LateralCache object 065 * <p> 066 * @param cattr 067 * @param lateral 068 * @param monitor 069 */ 070 public LateralCache( ILateralCacheAttributes cattr, ICacheServiceNonLocal<K, V> lateral, LateralCacheMonitor monitor ) 071 { 072 this.cacheName = cattr.getCacheName(); 073 this.lateralCacheAttributes = cattr; 074 this.lateralCacheService = lateral; 075 this.monitor = monitor; 076 } 077 078 /** 079 * Constructor for the LateralCache object 080 * <p> 081 * @param cattr 082 */ 083 public LateralCache( ILateralCacheAttributes cattr ) 084 { 085 this.cacheName = cattr.getCacheName(); 086 this.lateralCacheAttributes = cattr; 087 } 088 089 /** 090 * Update lateral. 091 * <p> 092 * @param ce 093 * @throws IOException 094 */ 095 @Override 096 protected void processUpdate( ICacheElement<K, V> ce ) 097 throws IOException 098 { 099 try 100 { 101 if (ce != null) 102 { 103 if ( log.isDebugEnabled() ) 104 { 105 log.debug( "update: lateral = [" + lateralCacheService + "], " + "CacheInfo.listenerId = " 106 + CacheInfo.listenerId ); 107 } 108 lateralCacheService.update( ce, CacheInfo.listenerId ); 109 } 110 } 111 catch ( IOException ex ) 112 { 113 handleException( ex, "Failed to put [" + ce.getKey() + "] to " + ce.getCacheName() + "@" + lateralCacheAttributes ); 114 } 115 } 116 117 /** 118 * The performance costs are too great. It is not recommended that you enable lateral gets. 119 * <p> 120 * @param key 121 * @return ICacheElement<K, V> or null 122 * @throws IOException 123 */ 124 @Override 125 protected ICacheElement<K, V> processGet( K key ) 126 throws IOException 127 { 128 ICacheElement<K, V> obj = null; 129 130 if ( this.lateralCacheAttributes.getPutOnlyMode() ) 131 { 132 return null; 133 } 134 try 135 { 136 obj = lateralCacheService.get( cacheName, key ); 137 } 138 catch ( Exception e ) 139 { 140 log.error( e ); 141 handleException( e, "Failed to get [" + key + "] from " + lateralCacheAttributes.getCacheName() + "@" + lateralCacheAttributes ); 142 } 143 return obj; 144 } 145 146 /** 147 * @param pattern 148 * @return A map of K key to ICacheElement<K, V> element, or an empty map if there is no 149 * data in cache for any of these keys 150 * @throws IOException 151 */ 152 @Override 153 protected Map<K, ICacheElement<K, V>> processGetMatching( String pattern ) 154 throws IOException 155 { 156 if ( this.lateralCacheAttributes.getPutOnlyMode() ) 157 { 158 return Collections.emptyMap(); 159 } 160 try 161 { 162 return lateralCacheService.getMatching( cacheName, pattern ); 163 } 164 catch ( IOException e ) 165 { 166 log.error( e ); 167 handleException( e, "Failed to getMatching [" + pattern + "] from " + lateralCacheAttributes.getCacheName() + "@" + lateralCacheAttributes ); 168 return Collections.emptyMap(); 169 } 170 } 171 172 /** 173 * Gets multiple items from the cache based on the given set of keys. 174 * <p> 175 * @param keys 176 * @return a map of K key to ICacheElement<K, V> element, or an empty map if there is no 177 * data in cache for any of these keys 178 * @throws IOException 179 */ 180 @Override 181 protected Map<K, ICacheElement<K, V>> processGetMultiple( Set<K> keys ) 182 throws IOException 183 { 184 Map<K, ICacheElement<K, V>> elements = new HashMap<K, ICacheElement<K, V>>(); 185 186 if ( keys != null && !keys.isEmpty() ) 187 { 188 for (K key : keys) 189 { 190 ICacheElement<K, V> element = get( key ); 191 192 if ( element != null ) 193 { 194 elements.put( key, element ); 195 } 196 } 197 } 198 199 return elements; 200 } 201 202 /** 203 * Return the keys in this cache. 204 * <p> 205 * @see org.apache.commons.jcs.auxiliary.AuxiliaryCache#getKeySet() 206 */ 207 @Override 208 public Set<K> getKeySet() throws IOException 209 { 210 try 211 { 212 return lateralCacheService.getKeySet( cacheName ); 213 } 214 catch ( IOException ex ) 215 { 216 handleException( ex, "Failed to get key set from " + lateralCacheAttributes.getCacheName() + "@" 217 + lateralCacheAttributes ); 218 } 219 return Collections.emptySet(); 220 } 221 222 /** 223 * Synchronously remove from the remote cache; if failed, replace the remote handle with a 224 * zombie. 225 * <p> 226 * @param key 227 * @return false always 228 * @throws IOException 229 */ 230 @Override 231 protected boolean processRemove( K key ) 232 throws IOException 233 { 234 if ( log.isDebugEnabled() ) 235 { 236 log.debug( "removing key:" + key ); 237 } 238 239 try 240 { 241 lateralCacheService.remove( cacheName, key, CacheInfo.listenerId ); 242 } 243 catch ( IOException ex ) 244 { 245 handleException( ex, "Failed to remove " + key + " from " + lateralCacheAttributes.getCacheName() + "@" + lateralCacheAttributes ); 246 } 247 return false; 248 } 249 250 /** 251 * Synchronously removeAll from the remote cache; if failed, replace the remote handle with a 252 * zombie. 253 * <p> 254 * @throws IOException 255 */ 256 @Override 257 protected void processRemoveAll() 258 throws IOException 259 { 260 try 261 { 262 lateralCacheService.removeAll( cacheName, CacheInfo.listenerId ); 263 } 264 catch ( IOException ex ) 265 { 266 handleException( ex, "Failed to remove all from " + lateralCacheAttributes.getCacheName() + "@" + lateralCacheAttributes ); 267 } 268 } 269 270 /** 271 * Synchronously dispose the cache. Not sure we want this. 272 * <p> 273 * @throws IOException 274 */ 275 @Override 276 protected void processDispose() 277 throws IOException 278 { 279 log.debug( "Disposing of lateral cache" ); 280 281 ///* HELP: This section did nothing but generate compilation warnings. 282 // TODO: may limit this functionality. It is dangerous. 283 // asmuts -- Added functionality to help with warnings. I'm not getting 284 // any. 285 try 286 { 287 lateralCacheService.dispose( this.lateralCacheAttributes.getCacheName() ); 288 // Should remove connection 289 } 290 catch ( IOException ex ) 291 { 292 log.error( "Couldn't dispose", ex ); 293 handleException( ex, "Failed to dispose " + lateralCacheAttributes.getCacheName() ); 294 } 295 } 296 297 /** 298 * Returns the cache status. 299 * <p> 300 * @return The status value 301 */ 302 @Override 303 public CacheStatus getStatus() 304 { 305 return this.lateralCacheService instanceof IZombie ? CacheStatus.ERROR : CacheStatus.ALIVE; 306 } 307 308 /** 309 * Returns the current cache size. 310 * <p> 311 * @return The size value 312 */ 313 @Override 314 public int getSize() 315 { 316 return 0; 317 } 318 319 /** 320 * Gets the cacheType attribute of the LateralCache object 321 * <p> 322 * @return The cacheType value 323 */ 324 @Override 325 public CacheType getCacheType() 326 { 327 return CacheType.LATERAL_CACHE; 328 } 329 330 /** 331 * Gets the cacheName attribute of the LateralCache object 332 * <p> 333 * @return The cacheName value 334 */ 335 @Override 336 public String getCacheName() 337 { 338 return cacheName; 339 } 340 341 /** 342 * Not yet sure what to do here. 343 * <p> 344 * @param ex 345 * @param msg 346 * @throws IOException 347 */ 348 private void handleException( Exception ex, String msg ) 349 throws IOException 350 { 351 log.error( "Disabling lateral cache due to error " + msg, ex ); 352 353 lateralCacheService = new ZombieCacheServiceNonLocal<K, V>( lateralCacheAttributes.getZombieQueueMaxSize() ); 354 // may want to flush if region specifies 355 // Notify the cache monitor about the error, and kick off the recovery 356 // process. 357 monitor.notifyError(); 358 359 // could stop the net search if it is built and try to reconnect? 360 if ( ex instanceof IOException ) 361 { 362 throw (IOException) ex; 363 } 364 throw new IOException( ex.getMessage() ); 365 } 366 367 /** 368 * Replaces the current remote cache service handle with the given handle. 369 * <p> 370 * @param restoredLateral 371 */ 372 public void fixCache( ICacheServiceNonLocal<K, V> restoredLateral ) 373 { 374 if ( this.lateralCacheService != null && this.lateralCacheService instanceof ZombieCacheServiceNonLocal ) 375 { 376 ZombieCacheServiceNonLocal<K, V> zombie = (ZombieCacheServiceNonLocal<K, V>) this.lateralCacheService; 377 this.lateralCacheService = restoredLateral; 378 try 379 { 380 zombie.propagateEvents( restoredLateral ); 381 } 382 catch ( Exception e ) 383 { 384 try 385 { 386 handleException( e, "Problem propagating events from Zombie Queue to new Lateral Service." ); 387 } 388 catch ( IOException e1 ) 389 { 390 // swallow, since this is just expected kick back. Handle always throws 391 } 392 } 393 } 394 else 395 { 396 this.lateralCacheService = restoredLateral; 397 } 398 } 399 400 /** 401 * getStats 402 * <p> 403 * @return String 404 */ 405 @Override 406 public String getStats() 407 { 408 return ""; 409 } 410 411 /** 412 * @return Returns the AuxiliaryCacheAttributes. 413 */ 414 @Override 415 public AuxiliaryCacheAttributes getAuxiliaryCacheAttributes() 416 { 417 return lateralCacheAttributes; 418 } 419 420 /** 421 * @return debugging data. 422 */ 423 @Override 424 public String toString() 425 { 426 StringBuilder buf = new StringBuilder(); 427 buf.append( "\n LateralCache " ); 428 buf.append( "\n Cache Name [" + lateralCacheAttributes.getCacheName() + "]" ); 429 buf.append( "\n cattr = [" + lateralCacheAttributes + "]" ); 430 return buf.toString(); 431 } 432 433 /** 434 * @return extra data. 435 */ 436 @Override 437 public String getEventLoggingExtraInfo() 438 { 439 return null; 440 } 441 442 /** 443 * The NoWait on top does not call out to here yet. 444 * <p> 445 * @return almost nothing 446 */ 447 @Override 448 public IStats getStatistics() 449 { 450 IStats stats = new Stats(); 451 stats.setTypeName( "LateralCache" ); 452 return stats; 453 } 454}