001package org.apache.commons.jcs3.auxiliary.lateral.socket.tcp; 002 003/* 004 * Licensed to the Apache Software Foundation (ASF) under one 005 * or more contributor license agreements. See the NOTICE file 006 * distributed with this work for additional information 007 * regarding copyright ownership. The ASF licenses this file 008 * to you under the Apache License, Version 2.0 (the 009 * "License"); you may not use this file except in compliance 010 * with the License. You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, 015 * software distributed under the License is distributed on an 016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 017 * KIND, either express or implied. See the License for the 018 * specific language governing permissions and limitations 019 * under the License. 020 */ 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.concurrent.ConcurrentHashMap; 025 026import org.apache.commons.jcs3.auxiliary.AbstractAuxiliaryCacheFactory; 027import org.apache.commons.jcs3.auxiliary.AuxiliaryCacheAttributes; 028import org.apache.commons.jcs3.auxiliary.lateral.LateralCache; 029import org.apache.commons.jcs3.auxiliary.lateral.LateralCacheMonitor; 030import org.apache.commons.jcs3.auxiliary.lateral.LateralCacheNoWait; 031import org.apache.commons.jcs3.auxiliary.lateral.LateralCacheNoWaitFacade; 032import org.apache.commons.jcs3.auxiliary.lateral.behavior.ILateralCacheListener; 033import org.apache.commons.jcs3.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes; 034import org.apache.commons.jcs3.engine.CacheWatchRepairable; 035import org.apache.commons.jcs3.engine.ZombieCacheServiceNonLocal; 036import org.apache.commons.jcs3.engine.ZombieCacheWatch; 037import org.apache.commons.jcs3.engine.behavior.ICacheServiceNonLocal; 038import org.apache.commons.jcs3.engine.behavior.ICompositeCacheManager; 039import org.apache.commons.jcs3.engine.behavior.IElementSerializer; 040import org.apache.commons.jcs3.engine.behavior.IShutdownObserver; 041import org.apache.commons.jcs3.engine.control.CompositeCacheManager; 042import org.apache.commons.jcs3.engine.logging.behavior.ICacheEventLogger; 043import org.apache.commons.jcs3.log.Log; 044import org.apache.commons.jcs3.log.LogManager; 045import org.apache.commons.jcs3.utils.discovery.UDPDiscoveryManager; 046import org.apache.commons.jcs3.utils.discovery.UDPDiscoveryService; 047import org.apache.commons.jcs3.utils.serialization.StandardSerializer; 048 049/** 050 * Constructs a LateralCacheNoWaitFacade for the given configuration. Each lateral service / local 051 * relationship is managed by one manager. This manager can have multiple caches. The remote 052 * relationships are consolidated and restored via these managers. 053 * <p> 054 * The facade provides a front to the composite cache so the implementation is transparent. 055 */ 056public class LateralTCPCacheFactory 057 extends AbstractAuxiliaryCacheFactory 058{ 059 /** The logger */ 060 private static final Log log = LogManager.getLog( LateralTCPCacheFactory.class ); 061 062 /** Address to service map. */ 063 private ConcurrentHashMap<String, ICacheServiceNonLocal<?, ?>> csnlInstances; 064 065 /** Map of available discovery listener instances, keyed by port. */ 066 private ConcurrentHashMap<String, LateralTCPDiscoveryListener> lTCPDLInstances; 067 068 /** Monitor thread */ 069 private LateralCacheMonitor monitor; 070 071 /** 072 * Wrapper of the lateral cache watch service; or wrapper of a zombie 073 * service if failed to connect. 074 */ 075 private CacheWatchRepairable lateralWatch; 076 077 /** 078 * Creates a TCP lateral. 079 * <p> 080 * @param <K> cache key type 081 * @param <V> cache value type 082 * @param iaca the cache configuration object 083 * @param cacheMgr the cache manager 084 * @param cacheEventLogger the event logger 085 * @param elementSerializer the serializer to use when sending or receiving 086 * @return a LateralCacheNoWaitFacade 087 */ 088 @Override 089 public <K, V> LateralCacheNoWaitFacade<K, V> createCache( 090 final AuxiliaryCacheAttributes iaca, final ICompositeCacheManager cacheMgr, 091 final ICacheEventLogger cacheEventLogger, final IElementSerializer elementSerializer ) 092 { 093 final ITCPLateralCacheAttributes lac = (ITCPLateralCacheAttributes) iaca; 094 final ArrayList<LateralCacheNoWait<K, V>> noWaits = new ArrayList<>(); 095 096 // pairs up the tcp servers and set the tcpServer value and 097 // get the manager and then get the cache 098 // no servers are required. 099 if (lac.getTcpServers() != null && !lac.getTcpServers().isEmpty()) 100 { 101 final String servers[] = lac.getTcpServers().split("\\s*,\\s*"); 102 log.debug( "Configured for [{0}] servers.", servers.length ); 103 104 for (final String server : servers) 105 { 106 log.debug( "tcp server = {0}", server ); 107 final ITCPLateralCacheAttributes lacClone = (ITCPLateralCacheAttributes) lac.clone(); 108 lacClone.setTcpServer( server ); 109 110 final LateralCacheNoWait<K, V> lateralNoWait = createCacheNoWait(lacClone, cacheEventLogger, elementSerializer); 111 112 addListenerIfNeeded( lacClone, cacheMgr, elementSerializer ); 113 monitorCache(lateralNoWait); 114 noWaits.add( lateralNoWait ); 115 } 116 } 117 118 final ILateralCacheListener<K, V> listener = createListener( lac, cacheMgr, elementSerializer ); 119 120 // create the no wait facade. 121 final LateralCacheNoWaitFacade<K, V> lcnwf = 122 new LateralCacheNoWaitFacade<>(listener, noWaits, lac); 123 124 // create udp discovery if available. 125 createDiscoveryService( lac, lcnwf, cacheMgr, cacheEventLogger, elementSerializer ); 126 127 return lcnwf; 128 } 129 130 /** 131 * Create a LateralCacheNoWait for the server configured in lca 132 * 133 * @param <K> cache key type 134 * @param <V> cache value type 135 * @param lca the cache configuration object 136 * @param cacheEventLogger the event logger 137 * @param elementSerializer the serializer to use when sending or receiving 138 * @return a LateralCacheNoWait 139 */ 140 public <K, V> LateralCacheNoWait<K, V> createCacheNoWait( final ITCPLateralCacheAttributes lca, 141 final ICacheEventLogger cacheEventLogger, final IElementSerializer elementSerializer ) 142 { 143 final ICacheServiceNonLocal<K, V> lateralService = getCSNLInstance(lca, elementSerializer); 144 145 final LateralCache<K, V> cache = new LateralCache<>( lca, lateralService, this.monitor ); 146 cache.setCacheEventLogger( cacheEventLogger ); 147 cache.setElementSerializer( elementSerializer ); 148 149 log.debug( "Created cache for noWait, cache [{0}]", cache ); 150 151 final LateralCacheNoWait<K, V> lateralNoWait = new LateralCacheNoWait<>( cache ); 152 lateralNoWait.setIdentityKey(lca.getTcpServer()); 153 154 log.info( "Created LateralCacheNoWait for [{0}] LateralCacheNoWait = [{1}]", 155 lca, lateralNoWait ); 156 157 return lateralNoWait; 158 } 159 160 /** 161 * Initialize this factory 162 */ 163 @Override 164 public void initialize() 165 { 166 this.csnlInstances = new ConcurrentHashMap<>(); 167 this.lTCPDLInstances = new ConcurrentHashMap<>(); 168 169 // Create the monitoring daemon thread 170 this.monitor = new LateralCacheMonitor(this); 171 this.monitor.setDaemon( true ); 172 this.monitor.start(); 173 174 this.lateralWatch = new CacheWatchRepairable(); 175 this.lateralWatch.setCacheWatch( new ZombieCacheWatch() ); 176 } 177 178 /** 179 * Dispose of this factory, clean up shared resources 180 */ 181 @Override 182 public void dispose() 183 { 184 for (final ICacheServiceNonLocal<?, ?> service : this.csnlInstances.values()) 185 { 186 try 187 { 188 service.dispose(""); 189 } 190 catch (final IOException e) 191 { 192 log.error("Could not dispose service " + service, e); 193 } 194 } 195 196 this.csnlInstances.clear(); 197 198 // TODO: shut down discovery listeners 199 this.lTCPDLInstances.clear(); 200 201 if (this.monitor != null) 202 { 203 this.monitor.notifyShutdown(); 204 try 205 { 206 this.monitor.join(5000); 207 } 208 catch (final InterruptedException e) 209 { 210 // swallow 211 } 212 this.monitor = null; 213 } 214 } 215 216 /** 217 * Returns an instance of the cache service. 218 * <p> 219 * @param <K> cache key type 220 * @param <V> cache value type 221 * @param lca configuration for the creation of a new service instance 222 * 223 * @return ICacheServiceNonLocal<K, V> 224 * 225 * @deprecated Specify serializer 226 */ 227 @Deprecated 228 public <K, V> ICacheServiceNonLocal<K, V> getCSNLInstance( final ITCPLateralCacheAttributes lca ) 229 { 230 return getCSNLInstance(lca, new StandardSerializer()); 231 } 232 233 /** 234 * Returns an instance of the cache service. 235 * <p> 236 * @param <K> cache key type 237 * @param <V> cache value type 238 * @param lca configuration for the creation of a new service instance 239 * @param elementSerializer the serializer to use when sending or receiving 240 * 241 * @return ICacheServiceNonLocal<K, V> 242 * @since 3.1 243 */ 244 // Need to cast because of common map for all cache services 245 @SuppressWarnings("unchecked") 246 public <K, V> ICacheServiceNonLocal<K, V> getCSNLInstance(final ITCPLateralCacheAttributes lca, 247 final IElementSerializer elementSerializer) 248 { 249 final String key = lca.getTcpServer(); 250 251 return (ICacheServiceNonLocal<K, V>) csnlInstances.compute(key, (name, service) -> { 252 253 ICacheServiceNonLocal<?, ?> newService = service; 254 255 // If service creation did not succeed last time, force retry 256 if (service instanceof ZombieCacheServiceNonLocal) 257 { 258 log.info("Disposing of zombie service instance for [{0}]", name); 259 newService = null; 260 } 261 262 if (newService == null) 263 { 264 log.info( "Instance for [{0}] is null, creating", name ); 265 266 // Create the service 267 try 268 { 269 log.info( "Creating TCP service, lca = {0}", lca ); 270 271 newService = new LateralTCPService<>(lca, elementSerializer); 272 } 273 catch ( final IOException ex ) 274 { 275 // Failed to connect to the lateral server. 276 // Configure this LateralCacheManager instance to use the 277 // "zombie" services. 278 log.error( "Failure, lateral instance will use zombie service", ex ); 279 280 newService = new ZombieCacheServiceNonLocal<>(lca.getZombieQueueMaxSize()); 281 282 // Notify the cache monitor about the error, and kick off 283 // the recovery process. 284 monitor.notifyError(); 285 } 286 } 287 288 return newService; 289 }); 290 } 291 292 /** 293 * Add cache instance to monitor 294 * 295 * @param cache the cache instance 296 * @since 3.1 297 */ 298 public void monitorCache(final LateralCacheNoWait<?, ?> cache) 299 { 300 monitor.addCache(cache); 301 } 302 303 /** 304 * Gets the instance attribute of the LateralCacheTCPListener class. 305 * <p> 306 * @param ilca ITCPLateralCacheAttributes 307 * @param cacheManager a reference to the global cache manager 308 * @param cacheEventLogger Reference to the cache event logger for auxiliary cache creation 309 * @param elementSerializer Reference to the cache element serializer for auxiliary cache 310 * 311 * @return The instance value 312 */ 313 private LateralTCPDiscoveryListener getDiscoveryListener(final ITCPLateralCacheAttributes ilca, 314 final ICompositeCacheManager cacheManager, final ICacheEventLogger cacheEventLogger, 315 final IElementSerializer elementSerializer) 316 { 317 final String key = ilca.getUdpDiscoveryAddr() + ":" + ilca.getUdpDiscoveryPort(); 318 319 return lTCPDLInstances.computeIfAbsent(key, key1 -> { 320 log.info("Created new discovery listener for cacheName {0} and request {1}", 321 ilca.getCacheName(), key1); 322 return new LateralTCPDiscoveryListener( this.getName(), 323 (CompositeCacheManager) cacheManager, 324 cacheEventLogger, elementSerializer); 325 }); 326 } 327 328 /** 329 * Add listener for receivers 330 * <p> 331 * @param iaca cache configuration attributes 332 * @param cacheMgr the composite cache manager 333 * @param serializer the serializer to use when receiving 334 */ 335 private void addListenerIfNeeded( final ITCPLateralCacheAttributes iaca, final ICompositeCacheManager cacheMgr, final IElementSerializer elementSerializer ) 336 { 337 // don't create a listener if we are not receiving. 338 if ( iaca.isReceive() ) 339 { 340 try 341 { 342 addLateralCacheListener(iaca.getCacheName(), createListener(iaca, cacheMgr, elementSerializer)); 343 } 344 catch ( final IOException ioe ) 345 { 346 log.error("Problem creating lateral listener", ioe); 347 } 348 } 349 else 350 { 351 log.debug( "Not creating a listener since we are not receiving." ); 352 } 353 } 354 355 /** 356 * Adds the lateral cache listener to the underlying cache-watch service. 357 * <p> 358 * @param cacheName The feature to be added to the LateralCacheListener attribute 359 * @param listener The feature to be added to the LateralCacheListener attribute 360 * @throws IOException 361 */ 362 private <K, V> void addLateralCacheListener( final String cacheName, final ILateralCacheListener<K, V> listener ) 363 throws IOException 364 { 365 synchronized ( this.lateralWatch ) 366 { 367 lateralWatch.addCacheListener( cacheName, listener ); 368 } 369 } 370 371 /** 372 * Makes sure a listener gets created. It will get monitored as soon as it 373 * is used. 374 * <p> 375 * This should be called by create cache. 376 * <p> 377 * @param attr ITCPLateralCacheAttributes 378 * @param cacheMgr the composite cache manager 379 * @param serializer the serializer to use when receiving 380 * 381 * @return the listener if created, else null 382 */ 383 private static <K, V> ILateralCacheListener<K, V> createListener( final ITCPLateralCacheAttributes attr, 384 final ICompositeCacheManager cacheMgr, final IElementSerializer elementSerializer ) 385 { 386 ILateralCacheListener<K, V> listener = null; 387 388 // don't create a listener if we are not receiving. 389 if ( attr.isReceive() ) 390 { 391 log.info( "Getting listener for {0}", attr ); 392 393 // make a listener. if one doesn't exist 394 listener = LateralTCPListener.getInstance( attr, cacheMgr, elementSerializer ); 395 396 // register for shutdown notification 397 cacheMgr.registerShutdownObserver( (IShutdownObserver) listener ); 398 } 399 else 400 { 401 log.debug( "Not creating a listener since we are not receiving." ); 402 } 403 404 return listener; 405 } 406 407 /** 408 * Creates the discovery service. Only creates this for tcp laterals right now. 409 * <p> 410 * @param lac ITCPLateralCacheAttributes 411 * @param lcnwf the lateral facade 412 * @param cacheMgr a reference to the global cache manager 413 * @param cacheEventLogger Reference to the cache event logger for auxiliary cache creation 414 * @param elementSerializer Reference to the cache element serializer for auxiliary cache 415 */ 416 private synchronized <K, V> void createDiscoveryService( 417 final ITCPLateralCacheAttributes lac, 418 final LateralCacheNoWaitFacade<K, V> lcnwf, 419 final ICompositeCacheManager cacheMgr, 420 final ICacheEventLogger cacheEventLogger, 421 final IElementSerializer elementSerializer ) 422 { 423 UDPDiscoveryService discovery = null; 424 425 // create the UDP discovery for the TCP lateral 426 if ( lac.isUdpDiscoveryEnabled() ) 427 { 428 // One can be used for all regions 429 final LateralTCPDiscoveryListener discoveryListener = 430 getDiscoveryListener(lac, cacheMgr, cacheEventLogger, elementSerializer); 431 discoveryListener.addNoWaitFacade( lac.getCacheName(), lcnwf ); 432 433 // need a factory for this so it doesn't 434 // get dereferenced, also we don't want one for every region. 435 discovery = UDPDiscoveryManager.getInstance().getService( 436 lac.getUdpDiscoveryAddr(), lac.getUdpDiscoveryPort(), 437 lac.getTcpListenerHost(), lac.getTcpListenerPort(), lac.getUdpTTL(), 438 cacheMgr, elementSerializer); 439 440 discovery.addParticipatingCacheName( lac.getCacheName() ); 441 discovery.addDiscoveryListener( discoveryListener ); 442 443 log.info( "Registered TCP lateral cache [{0}] with UDPDiscoveryService.", 444 lac::getCacheName); 445 } 446 } 447}