001package org.apache.commons.jcs.auxiliary.remote; 002 003/* 004 * Licensed to the Apache Software Foundation (ASF) under one 005 * or more contributor license agreements. See the NOTICE file 006 * distributed with this work for additional information 007 * regarding copyright ownership. The ASF licenses this file 008 * to you under the Apache License, Version 2.0 (the 009 * "License"); you may not use this file except in compliance 010 * with the License. You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, 015 * software distributed under the License is distributed on an 016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 017 * KIND, either express or implied. See the License for the 018 * specific language governing permissions and limitations 019 * under the License. 020 */ 021 022import java.io.IOException; 023import java.rmi.Naming; 024import java.util.concurrent.ConcurrentHashMap; 025import java.util.concurrent.ConcurrentMap; 026import java.util.concurrent.locks.ReentrantLock; 027 028import org.apache.commons.jcs.auxiliary.remote.behavior.IRemoteCacheAttributes; 029import org.apache.commons.jcs.auxiliary.remote.behavior.IRemoteCacheClient; 030import org.apache.commons.jcs.auxiliary.remote.behavior.IRemoteCacheListener; 031import org.apache.commons.jcs.engine.CacheStatus; 032import org.apache.commons.jcs.engine.CacheWatchRepairable; 033import org.apache.commons.jcs.engine.ZombieCacheServiceNonLocal; 034import org.apache.commons.jcs.engine.ZombieCacheWatch; 035import org.apache.commons.jcs.engine.behavior.ICacheObserver; 036import org.apache.commons.jcs.engine.behavior.ICacheServiceNonLocal; 037import org.apache.commons.jcs.engine.behavior.ICompositeCacheManager; 038import org.apache.commons.jcs.engine.behavior.IElementSerializer; 039import org.apache.commons.jcs.engine.logging.behavior.ICacheEventLogger; 040import org.apache.commons.logging.Log; 041import org.apache.commons.logging.LogFactory; 042 043/** 044 * An instance of RemoteCacheManager corresponds to one remote connection of a specific host and 045 * port. All RemoteCacheManager instances are monitored by the singleton RemoteCacheMonitor 046 * monitoring daemon for error detection and recovery. 047 * <p> 048 * Getting an instance of the remote cache has the effect of getting a handle on the remote server. 049 * Listeners are not registered with the server until a cache is requested from the manager. 050 */ 051public class RemoteCacheManager 052{ 053 /** The logger */ 054 private static final Log log = LogFactory.getLog( RemoteCacheManager.class ); 055 056 /** Contains instances of RemoteCacheNoWait managed by a RemoteCacheManager instance. */ 057 private final ConcurrentMap<String, RemoteCacheNoWait<?, ?>> caches = 058 new ConcurrentHashMap<String, RemoteCacheNoWait<?, ?>>(); 059 060 /** Lock for initialization of caches */ 061 private ReentrantLock cacheLock = new ReentrantLock(); 062 063 /** The event logger. */ 064 private final ICacheEventLogger cacheEventLogger; 065 066 /** The serializer. */ 067 private final IElementSerializer elementSerializer; 068 069 /** Handle to the remote cache service; or a zombie handle if failed to connect. */ 070 private ICacheServiceNonLocal<?, ?> remoteService; 071 072 /** 073 * Wrapper of the remote cache watch service; or wrapper of a zombie service if failed to 074 * connect. 075 */ 076 private CacheWatchRepairable remoteWatch; 077 078 /** The cache manager listeners will need to use to get a cache. */ 079 private final ICompositeCacheManager cacheMgr; 080 081 /** For error notification */ 082 private final RemoteCacheMonitor monitor; 083 084 /** The service found through lookup */ 085 private final String registry; 086 087 /** can it be restored */ 088 private boolean canFix = true; 089 090 /** 091 * Constructs an instance to with the given remote connection parameters. If the connection 092 * cannot be made, "zombie" services will be temporarily used until a successful re-connection 093 * is made by the monitoring daemon. 094 * <p> 095 * @param cattr cache attributes 096 * @param cacheMgr the cache hub 097 * @param monitor the cache monitor thread for error notifications 098 * @param cacheEventLogger 099 * @param elementSerializer 100 */ 101 protected RemoteCacheManager( IRemoteCacheAttributes cattr, ICompositeCacheManager cacheMgr, 102 RemoteCacheMonitor monitor, 103 ICacheEventLogger cacheEventLogger, IElementSerializer elementSerializer) 104 { 105 this.cacheMgr = cacheMgr; 106 this.monitor = monitor; 107 this.cacheEventLogger = cacheEventLogger; 108 this.elementSerializer = elementSerializer; 109 this.remoteWatch = new CacheWatchRepairable(); 110 111 this.registry = RemoteUtils.getNamingURL(cattr.getRemoteLocation(), cattr.getRemoteServiceName()); 112 113 try 114 { 115 lookupRemoteService(); 116 } 117 catch (IOException e) 118 { 119 log.error("Could not find server", e); 120 // Notify the cache monitor about the error, and kick off the 121 // recovery process. 122 monitor.notifyError(); 123 } 124 } 125 126 /** 127 * Lookup remote service from registry 128 * @throws IOException if the remote service could not be found 129 * 130 */ 131 protected void lookupRemoteService() throws IOException 132 { 133 if ( log.isInfoEnabled() ) 134 { 135 log.info( "Looking up server [" + registry + "]" ); 136 } 137 try 138 { 139 Object obj = Naming.lookup( registry ); 140 if ( log.isInfoEnabled() ) 141 { 142 log.info( "Server found: " + obj ); 143 } 144 145 // Successful connection to the remote server. 146 this.remoteService = (ICacheServiceNonLocal<?, ?>) obj; 147 if ( log.isDebugEnabled() ) 148 { 149 log.debug( "Remote Service = " + remoteService ); 150 } 151 remoteWatch.setCacheWatch( (ICacheObserver) remoteService ); 152 } 153 catch ( Exception ex ) 154 { 155 // Failed to connect to the remote server. 156 // Configure this RemoteCacheManager instance to use the "zombie" 157 // services. 158 this.remoteService = new ZombieCacheServiceNonLocal<String, String>(); 159 remoteWatch.setCacheWatch( new ZombieCacheWatch() ); 160 throw new IOException( "Problem finding server at [" + registry + "]", ex ); 161 } 162 } 163 164 /** 165 * Adds the remote cache listener to the underlying cache-watch service. 166 * <p> 167 * @param cattr The feature to be added to the RemoteCacheListener attribute 168 * @param listener The feature to be added to the RemoteCacheListener attribute 169 * @throws IOException 170 */ 171 public <K, V> void addRemoteCacheListener( IRemoteCacheAttributes cattr, IRemoteCacheListener<K, V> listener ) 172 throws IOException 173 { 174 if ( cattr.isReceive() ) 175 { 176 if ( log.isInfoEnabled() ) 177 { 178 log.info( "The remote cache is configured to receive events from the remote server. " 179 + "We will register a listener. remoteWatch = " + remoteWatch + " | IRemoteCacheListener = " 180 + listener + " | cacheName " + cattr.getCacheName() ); 181 } 182 183 remoteWatch.addCacheListener( cattr.getCacheName(), listener ); 184 } 185 else 186 { 187 if ( log.isInfoEnabled() ) 188 { 189 log.info( "The remote cache is configured to NOT receive events from the remote server. " 190 + "We will NOT register a listener." ); 191 } 192 } 193 } 194 195 /** 196 * Removes a listener. When the primary recovers the failover must deregister itself for a 197 * region. The failover runner will call this method to de-register. We do not want to deregister 198 * all listeners to a remote server, in case a failover is a primary of another region. Having 199 * one regions failover act as another servers primary is not currently supported. 200 * <p> 201 * @param cattr 202 * @throws IOException 203 */ 204 public void removeRemoteCacheListener( IRemoteCacheAttributes cattr ) 205 throws IOException 206 { 207 RemoteCacheNoWait<?, ?> cache = caches.get( cattr.getCacheName() ); 208 if ( cache != null ) 209 { 210 removeListenerFromCache(cache); 211 } 212 else 213 { 214 if ( cattr.isReceive() ) 215 { 216 log.warn( "Trying to deregister Cache Listener that was never registered." ); 217 } 218 else if ( log.isDebugEnabled() ) 219 { 220 log.debug( "Since the remote cache is configured to not receive, " 221 + "there is no listener to deregister." ); 222 } 223 } 224 } 225 226 // common helper method 227 private void removeListenerFromCache(RemoteCacheNoWait<?, ?> cache) throws IOException 228 { 229 IRemoteCacheClient<?, ?> rc = cache.getRemoteCache(); 230 if ( log.isDebugEnabled() ) 231 { 232 log.debug( "Found cache for [" + cache.getCacheName() + "], deregistering listener." ); 233 } 234 // could also store the listener for a server in the manager. 235 IRemoteCacheListener<?, ?> listener = rc.getListener(); 236 remoteWatch.removeCacheListener( cache.getCacheName(), listener ); 237 } 238 239 /** 240 * Gets a RemoteCacheNoWait from the RemoteCacheManager. The RemoteCacheNoWait objects are 241 * identified by the cache name value of the RemoteCacheAttributes object. 242 * <p> 243 * If the client is configured to register a listener, this call results on a listener being 244 * created if one isn't already registered with the remote cache for this region. 245 * <p> 246 * @param cattr 247 * @return The cache value 248 */ 249 @SuppressWarnings("unchecked") // Need to cast because of common map for all caches 250 public <K, V> RemoteCacheNoWait<K, V> getCache( IRemoteCacheAttributes cattr ) 251 { 252 RemoteCacheNoWait<K, V> remoteCacheNoWait = (RemoteCacheNoWait<K, V>) caches.get( cattr.getCacheName() ); 253 254 if ( remoteCacheNoWait == null ) 255 { 256 cacheLock.lock(); 257 258 try 259 { 260 remoteCacheNoWait = (RemoteCacheNoWait<K, V>) caches.get( cattr.getCacheName() ); 261 262 if (remoteCacheNoWait == null) 263 { 264 remoteCacheNoWait = newRemoteCacheNoWait(cattr); 265 caches.put( cattr.getCacheName(), remoteCacheNoWait ); 266 } 267 } 268 finally 269 { 270 cacheLock.unlock(); 271 } 272 } 273 274 // might want to do some listener sanity checking here. 275 276 return remoteCacheNoWait; 277 } 278 279 /** 280 * Create new RemoteCacheNoWait instance 281 * 282 * @param cattr the cache configuration 283 * @return the instance 284 */ 285 protected <K, V> RemoteCacheNoWait<K, V> newRemoteCacheNoWait(IRemoteCacheAttributes cattr) 286 { 287 RemoteCacheNoWait<K, V> remoteCacheNoWait; 288 // create a listener first and pass it to the remotecache 289 // sender. 290 RemoteCacheListener<K, V> listener = null; 291 try 292 { 293 listener = new RemoteCacheListener<K, V>( cattr, cacheMgr, elementSerializer ); 294 addRemoteCacheListener( cattr, listener ); 295 } 296 catch ( IOException ioe ) 297 { 298 log.error( "Problem adding listener. Message: " + ioe.getMessage() 299 + " | RemoteCacheListener = " + listener, ioe ); 300 } 301 catch ( Exception e ) 302 { 303 log.error( "Problem adding listener. Message: " + e.getMessage() + " | RemoteCacheListener = " 304 + listener, e ); 305 } 306 307 IRemoteCacheClient<K, V> remoteCacheClient = 308 new RemoteCache<K, V>( cattr, (ICacheServiceNonLocal<K, V>) remoteService, listener, monitor ); 309 remoteCacheClient.setCacheEventLogger( cacheEventLogger ); 310 remoteCacheClient.setElementSerializer( elementSerializer ); 311 312 remoteCacheNoWait = new RemoteCacheNoWait<K, V>( remoteCacheClient ); 313 remoteCacheNoWait.setCacheEventLogger( cacheEventLogger ); 314 remoteCacheNoWait.setElementSerializer( elementSerializer ); 315 316 return remoteCacheNoWait; 317 } 318 319 /** Shutdown all. */ 320 public void release() 321 { 322 cacheLock.lock(); 323 324 try 325 { 326 for (RemoteCacheNoWait<?, ?> c : caches.values()) 327 { 328 try 329 { 330 if ( log.isInfoEnabled() ) 331 { 332 log.info( "freeCache [" + c.getCacheName() + "]" ); 333 } 334 335 removeListenerFromCache(c); 336 c.dispose(); 337 } 338 catch ( IOException ex ) 339 { 340 log.error( "Problem releasing " + c.getCacheName(), ex ); 341 } 342 } 343 344 caches.clear(); 345 } 346 finally 347 { 348 cacheLock.unlock(); 349 } 350 } 351 352 /** 353 * Fixes up all the caches managed by this cache manager. 354 */ 355 public void fixCaches() 356 { 357 if ( !canFix ) 358 { 359 return; 360 } 361 362 if ( log.isInfoEnabled() ) 363 { 364 log.info( "Fixing caches. ICacheServiceNonLocal " + remoteService + " | IRemoteCacheObserver " + remoteWatch ); 365 } 366 367 for (RemoteCacheNoWait<?, ?> c : caches.values()) 368 { 369 if (c.getStatus() == CacheStatus.ERROR) 370 { 371 c.fixCache( remoteService ); 372 } 373 } 374 375 if ( log.isInfoEnabled() ) 376 { 377 String msg = "Remote connection to " + registry + " resumed."; 378 if ( cacheEventLogger != null ) 379 { 380 cacheEventLogger.logApplicationEvent( "RemoteCacheManager", "fix", msg ); 381 } 382 log.info( msg ); 383 } 384 } 385 386 /** 387 * Returns true if the connection to the remote host can be 388 * successfully re-established. 389 * <p> 390 * @return true if we found a failover server 391 */ 392 public boolean canFixCaches() 393 { 394 try 395 { 396 lookupRemoteService(); 397 } 398 catch (IOException e) 399 { 400 log.error("Could not find server", e); 401 canFix = false; 402 } 403 404 return canFix; 405 } 406}