001package org.apache.commons.jcs.auxiliary.lateral.socket.tcp; 002 003/* 004 * Licensed to the Apache Software Foundation (ASF) under one 005 * or more contributor license agreements. See the NOTICE file 006 * distributed with this work for additional information 007 * regarding copyright ownership. The ASF licenses this file 008 * to you under the Apache License, Version 2.0 (the 009 * "License"); you may not use this file except in compliance 010 * with the License. You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, 015 * software distributed under the License is distributed on an 016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 017 * KIND, either express or implied. See the License for the 018 * specific language governing permissions and limitations 019 * under the License. 020 */ 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.StringTokenizer; 025import java.util.concurrent.ConcurrentHashMap; 026import java.util.concurrent.locks.ReentrantLock; 027 028import org.apache.commons.jcs.auxiliary.AbstractAuxiliaryCacheFactory; 029import org.apache.commons.jcs.auxiliary.AuxiliaryCacheAttributes; 030import org.apache.commons.jcs.auxiliary.lateral.LateralCache; 031import org.apache.commons.jcs.auxiliary.lateral.LateralCacheMonitor; 032import org.apache.commons.jcs.auxiliary.lateral.LateralCacheNoWait; 033import org.apache.commons.jcs.auxiliary.lateral.LateralCacheNoWaitFacade; 034import org.apache.commons.jcs.auxiliary.lateral.behavior.ILateralCacheListener; 035import org.apache.commons.jcs.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes; 036import org.apache.commons.jcs.engine.CacheWatchRepairable; 037import org.apache.commons.jcs.engine.ZombieCacheServiceNonLocal; 038import org.apache.commons.jcs.engine.ZombieCacheWatch; 039import org.apache.commons.jcs.engine.behavior.ICache; 040import org.apache.commons.jcs.engine.behavior.ICacheServiceNonLocal; 041import org.apache.commons.jcs.engine.behavior.ICompositeCacheManager; 042import org.apache.commons.jcs.engine.behavior.IElementSerializer; 043import org.apache.commons.jcs.engine.behavior.IShutdownObserver; 044import org.apache.commons.jcs.engine.logging.behavior.ICacheEventLogger; 045import org.apache.commons.jcs.utils.discovery.UDPDiscoveryManager; 046import org.apache.commons.jcs.utils.discovery.UDPDiscoveryService; 047import org.apache.commons.logging.Log; 048import org.apache.commons.logging.LogFactory; 049 050/** 051 * Constructs a LateralCacheNoWaitFacade for the given configuration. Each lateral service / local 052 * relationship is managed by one manager. This manager can have multiple caches. The remote 053 * relationships are consolidated and restored via these managers. 054 * <p> 055 * The facade provides a front to the composite cache so the implementation is transparent. 056 */ 057public class LateralTCPCacheFactory 058 extends AbstractAuxiliaryCacheFactory 059{ 060 /** The logger */ 061 private static final Log log = LogFactory.getLog( LateralTCPCacheFactory.class ); 062 063 /** Address to service map. */ 064 private ConcurrentHashMap<String, ICacheServiceNonLocal<?, ?>> csnlInstances; 065 066 /** Lock for initialization of address to service map */ 067 private ReentrantLock csnlLock; 068 069 /** Map of available discovery listener instances, keyed by port. */ 070 private ConcurrentHashMap<String, LateralTCPDiscoveryListener> lTCPDLInstances; 071 072 /** Monitor thread */ 073 private LateralCacheMonitor monitor; 074 075 /** 076 * Wrapper of the lateral cache watch service; or wrapper of a zombie 077 * service if failed to connect. 078 */ 079 private CacheWatchRepairable lateralWatch; 080 081 /** 082 * Creates a TCP lateral. 083 * <p> 084 * @param iaca 085 * @param cacheMgr 086 * @param cacheEventLogger 087 * @param elementSerializer 088 * @return LateralCacheNoWaitFacade 089 */ 090 @Override 091 public <K, V> LateralCacheNoWaitFacade<K, V> createCache( 092 AuxiliaryCacheAttributes iaca, ICompositeCacheManager cacheMgr, 093 ICacheEventLogger cacheEventLogger, IElementSerializer elementSerializer ) 094 { 095 ITCPLateralCacheAttributes lac = (ITCPLateralCacheAttributes) iaca; 096 ArrayList<ICache<K, V>> noWaits = new ArrayList<ICache<K, V>>(); 097 098 // pairs up the tcp servers and set the tcpServer value and 099 // get the manager and then get the cache 100 // no servers are required. 101 if ( lac.getTcpServers() != null ) 102 { 103 StringTokenizer it = new StringTokenizer( lac.getTcpServers(), "," ); 104 if ( log.isDebugEnabled() ) 105 { 106 log.debug( "Configured for [" + it.countTokens() + "] servers." ); 107 } 108 while ( it.hasMoreElements() ) 109 { 110 String server = (String) it.nextElement(); 111 if ( log.isDebugEnabled() ) 112 { 113 log.debug( "tcp server = " + server ); 114 } 115 ITCPLateralCacheAttributes lacC = (ITCPLateralCacheAttributes) lac.clone(); 116 lacC.setTcpServer( server ); 117 118 LateralCacheNoWait<K, V> lateralNoWait = createCacheNoWait(lacC, cacheEventLogger, elementSerializer); 119 120 addListenerIfNeeded( lacC, cacheMgr ); 121 monitor.addCache(lateralNoWait); 122 noWaits.add( lateralNoWait ); 123 } 124 } 125 126 ILateralCacheListener<K, V> listener = createListener( lac, cacheMgr ); 127 128 // create the no wait facade. 129 @SuppressWarnings("unchecked") // No generic arrays in java 130 LateralCacheNoWait<K, V>[] lcnwArray = noWaits.toArray( new LateralCacheNoWait[0] ); 131 LateralCacheNoWaitFacade<K, V> lcnwf = 132 new LateralCacheNoWaitFacade<K, V>(listener, lcnwArray, lac ); 133 134 // create udp discovery if available. 135 createDiscoveryService( lac, lcnwf, cacheMgr, cacheEventLogger, elementSerializer ); 136 137 return lcnwf; 138 } 139 140 protected <K, V> LateralCacheNoWait<K, V> createCacheNoWait( ITCPLateralCacheAttributes lca, 141 ICacheEventLogger cacheEventLogger, IElementSerializer elementSerializer ) 142 { 143 ICacheServiceNonLocal<K, V> lateralService = getCSNLInstance(lca); 144 145 LateralCache<K, V> cache = new LateralCache<K, V>( lca, lateralService, this.monitor ); 146 cache.setCacheEventLogger( cacheEventLogger ); 147 cache.setElementSerializer( elementSerializer ); 148 149 if ( log.isDebugEnabled() ) 150 { 151 log.debug( "Created cache for noWait, cache [" + cache + "]" ); 152 } 153 154 LateralCacheNoWait<K, V> lateralNoWait = new LateralCacheNoWait<K, V>( cache ); 155 lateralNoWait.setCacheEventLogger( cacheEventLogger ); 156 lateralNoWait.setElementSerializer( elementSerializer ); 157 158 if ( log.isInfoEnabled() ) 159 { 160 log.info( "Created LateralCacheNoWait for [" + lca + "] LateralCacheNoWait = [" + lateralNoWait 161 + "]" ); 162 } 163 164 return lateralNoWait; 165 } 166 167 /** 168 * Initialize this factory 169 */ 170 @Override 171 public void initialize() 172 { 173 this.csnlInstances = new ConcurrentHashMap<String, ICacheServiceNonLocal<?, ?>>(); 174 this.csnlLock = new ReentrantLock(); 175 this.lTCPDLInstances = new ConcurrentHashMap<String, LateralTCPDiscoveryListener>(); 176 177 // Create the monitoring daemon thread 178 this.monitor = new LateralCacheMonitor(this); 179 this.monitor.setDaemon( true ); 180 this.monitor.start(); 181 182 this.lateralWatch = new CacheWatchRepairable(); 183 this.lateralWatch.setCacheWatch( new ZombieCacheWatch() ); 184 } 185 186 /** 187 * Dispose of this factory, clean up shared resources 188 */ 189 @Override 190 public void dispose() 191 { 192 for (ICacheServiceNonLocal<?, ?> service : this.csnlInstances.values()) 193 { 194 try 195 { 196 service.dispose(""); 197 } 198 catch (IOException e) 199 { 200 log.error("Could not dispose service " + service, e); 201 } 202 } 203 204 this.csnlInstances.clear(); 205 206 // TODO: shut down discovery listeners 207 this.lTCPDLInstances.clear(); 208 209 if (this.monitor != null) 210 { 211 this.monitor.notifyShutdown(); 212 try 213 { 214 this.monitor.join(5000); 215 } 216 catch (InterruptedException e) 217 { 218 // swallow 219 } 220 this.monitor = null; 221 } 222 } 223 224 /** 225 * Returns an instance of the cache service. 226 * <p> 227 * @param lca configuration for the creation of a new service instance 228 * 229 * @return ICacheServiceNonLocal<K, V> 230 */ 231 // Need to cast because of common map for all cache services 232 @SuppressWarnings("unchecked") 233 public <K, V> ICacheServiceNonLocal<K, V> getCSNLInstance( ITCPLateralCacheAttributes lca ) 234 { 235 String key = lca.getTcpServer(); 236 237 ICacheServiceNonLocal<K, V> service = (ICacheServiceNonLocal<K, V>)csnlInstances.get( key ); 238 239 if ( service == null || service instanceof ZombieCacheServiceNonLocal ) 240 { 241 csnlLock.lock(); 242 243 try 244 { 245 // double check 246 service = (ICacheServiceNonLocal<K, V>)csnlInstances.get( key ); 247 248 // If service creation did not succeed last time, force retry 249 if ( service instanceof ZombieCacheServiceNonLocal) 250 { 251 service = null; 252 log.info("Disposing of zombie service instance for [" + key + "]"); 253 } 254 255 if ( service == null ) 256 { 257 log.info( "Instance for [" + key + "] is null, creating" ); 258 259 // Create the service 260 try 261 { 262 if ( log.isInfoEnabled() ) 263 { 264 log.info( "Creating TCP service, lca = " + lca ); 265 } 266 267 service = new LateralTCPService<K, V>( lca ); 268 } 269 catch ( IOException ex ) 270 { 271 // Failed to connect to the lateral server. 272 // Configure this LateralCacheManager instance to use the 273 // "zombie" services. 274 log.error( "Failure, lateral instance will use zombie service", ex ); 275 276 service = new ZombieCacheServiceNonLocal<K, V>( lca.getZombieQueueMaxSize() ); 277 278 // Notify the cache monitor about the error, and kick off 279 // the recovery process. 280 monitor.notifyError(); 281 } 282 283 csnlInstances.put( key, service ); 284 } 285 } 286 finally 287 { 288 csnlLock.unlock(); 289 } 290 } 291 292 return service; 293 } 294 295 /** 296 * Gets the instance attribute of the LateralCacheTCPListener class. 297 * <p> 298 * @param ilca ITCPLateralCacheAttributes 299 * @param cacheManager a reference to the global cache manager 300 * 301 * @return The instance value 302 */ 303 private LateralTCPDiscoveryListener getDiscoveryListener( ITCPLateralCacheAttributes ilca, ICompositeCacheManager cacheManager ) 304 { 305 String key = ilca.getUdpDiscoveryAddr() + ":" + ilca.getUdpDiscoveryPort(); 306 LateralTCPDiscoveryListener ins = null; 307 308 LateralTCPDiscoveryListener newListener = new LateralTCPDiscoveryListener( this.getName(), cacheManager); 309 ins = lTCPDLInstances.putIfAbsent(key, newListener ); 310 311 if ( ins == null ) 312 { 313 ins = newListener; 314 315 if ( log.isInfoEnabled() ) 316 { 317 log.info( "Created new discovery listener for " + key + " cacheName for request " + ilca.getCacheName() ); 318 } 319 } 320 321 return ins; 322 } 323 324 /** 325 * Add listener for receivers 326 * <p> 327 * @param iaca cache configuration attributes 328 * @param cacheMgr the composite cache manager 329 */ 330 private void addListenerIfNeeded( ITCPLateralCacheAttributes iaca, ICompositeCacheManager cacheMgr ) 331 { 332 // don't create a listener if we are not receiving. 333 if ( iaca.isReceive() ) 334 { 335 try 336 { 337 addLateralCacheListener( iaca.getCacheName(), LateralTCPListener.getInstance( iaca, cacheMgr ) ); 338 } 339 catch ( IOException ioe ) 340 { 341 log.error( "Problem creating lateral listener", ioe ); 342 } 343 } 344 else 345 { 346 if ( log.isDebugEnabled() ) 347 { 348 log.debug( "Not creating a listener since we are not receiving." ); 349 } 350 } 351 } 352 353 /** 354 * Adds the lateral cache listener to the underlying cache-watch service. 355 * <p> 356 * @param cacheName The feature to be added to the LateralCacheListener attribute 357 * @param listener The feature to be added to the LateralCacheListener attribute 358 * @throws IOException 359 */ 360 private <K, V> void addLateralCacheListener( String cacheName, ILateralCacheListener<K, V> listener ) 361 throws IOException 362 { 363 synchronized ( this.lateralWatch ) 364 { 365 lateralWatch.addCacheListener( cacheName, listener ); 366 } 367 } 368 369 /** 370 * Makes sure a listener gets created. It will get monitored as soon as it 371 * is used. 372 * <p> 373 * This should be called by create cache. 374 * <p> 375 * @param attr ITCPLateralCacheAttributes 376 * @param cacheMgr 377 * 378 * @return the listener if created, else null 379 */ 380 private <K, V> ILateralCacheListener<K, V> createListener( ITCPLateralCacheAttributes attr, 381 ICompositeCacheManager cacheMgr ) 382 { 383 ILateralCacheListener<K, V> listener = null; 384 385 // don't create a listener if we are not receiving. 386 if ( attr.isReceive() ) 387 { 388 if ( log.isInfoEnabled() ) 389 { 390 log.info( "Getting listener for " + attr ); 391 } 392 393 // make a listener. if one doesn't exist 394 listener = LateralTCPListener.getInstance( attr, cacheMgr ); 395 396 // register for shutdown notification 397 cacheMgr.registerShutdownObserver( (IShutdownObserver) listener ); 398 } 399 else 400 { 401 if ( log.isDebugEnabled() ) 402 { 403 log.debug( "Not creating a listener since we are not receiving." ); 404 } 405 } 406 407 return listener; 408 } 409 410 /** 411 * Creates the discovery service. Only creates this for tcp laterals right now. 412 * <p> 413 * @param lac ITCPLateralCacheAttributes 414 * @param lcnwf 415 * @param cacheMgr 416 * @param cacheEventLogger 417 * @param elementSerializer 418 * @return null if none is created. 419 */ 420 private synchronized <K, V> UDPDiscoveryService createDiscoveryService( 421 ITCPLateralCacheAttributes lac, 422 LateralCacheNoWaitFacade<K, V> lcnwf, 423 ICompositeCacheManager cacheMgr, 424 ICacheEventLogger cacheEventLogger, 425 IElementSerializer elementSerializer ) 426 { 427 UDPDiscoveryService discovery = null; 428 429 // create the UDP discovery for the TCP lateral 430 if ( lac.isUdpDiscoveryEnabled() ) 431 { 432 // One can be used for all regions 433 LateralTCPDiscoveryListener discoveryListener = getDiscoveryListener( lac, cacheMgr ); 434 discoveryListener.addNoWaitFacade( lac.getCacheName(), lcnwf ); 435 436 // need a factory for this so it doesn't 437 // get dereferenced, also we don't want one for every region. 438 discovery = UDPDiscoveryManager.getInstance().getService( lac.getUdpDiscoveryAddr(), 439 lac.getUdpDiscoveryPort(), 440 lac.getTcpListenerPort(), cacheMgr); 441 442 discovery.addParticipatingCacheName( lac.getCacheName() ); 443 discovery.addDiscoveryListener( discoveryListener ); 444 445 if ( log.isInfoEnabled() ) 446 { 447 log.info( "Registered TCP lateral cache [" + lac.getCacheName() + "] with UDPDiscoveryService." ); 448 } 449 } 450 return discovery; 451 } 452}