001package org.apache.commons.jcs.engine; 002 003import java.io.IOException; 004import java.util.Map; 005import java.util.Set; 006import java.util.concurrent.ConcurrentHashMap; 007import java.util.concurrent.ConcurrentMap; 008import java.util.concurrent.CopyOnWriteArraySet; 009 010/* 011 * Licensed to the Apache Software Foundation (ASF) under one 012 * or more contributor license agreements. See the NOTICE file 013 * distributed with this work for additional information 014 * regarding copyright ownership. The ASF licenses this file 015 * to you under the Apache License, Version 2.0 (the 016 * "License"); you may not use this file except in compliance 017 * with the License. You may obtain a copy of the License at 018 * 019 * http://www.apache.org/licenses/LICENSE-2.0 020 * 021 * Unless required by applicable law or agreed to in writing, 022 * software distributed under the License is distributed on an 023 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 024 * KIND, either express or implied. See the License for the 025 * specific language governing permissions and limitations 026 * under the License. 027 */ 028 029import org.apache.commons.jcs.engine.behavior.ICacheListener; 030import org.apache.commons.jcs.engine.behavior.ICacheObserver; 031import org.apache.commons.logging.Log; 032import org.apache.commons.logging.LogFactory; 033 034/** 035 * Intercepts the requests to the underlying ICacheObserver object so that the listeners can be 036 * recorded locally for remote connection recovery purposes. (Durable subscription like those in JMS 037 * is not implemented at this stage for it can be too expensive.) 038 */ 039public class CacheWatchRepairable 040 implements ICacheObserver 041{ 042 /** The logger */ 043 private static final Log log = LogFactory.getLog( CacheWatchRepairable.class ); 044 045 /** the underlying ICacheObserver. */ 046 private ICacheObserver cacheWatch; 047 048 /** Map of cache regions. */ 049 private final ConcurrentMap<String, Set<ICacheListener<?, ?>>> cacheMap = 050 new ConcurrentHashMap<String, Set<ICacheListener<?, ?>>>(); 051 052 /** 053 * Replaces the underlying cache watch service and re-attaches all existing listeners to the new 054 * cache watch. 055 * <p> 056 * @param cacheWatch The new cacheWatch value 057 */ 058 public void setCacheWatch( ICacheObserver cacheWatch ) 059 { 060 this.cacheWatch = cacheWatch; 061 for (Map.Entry<String, Set<ICacheListener<?, ?>>> entry : cacheMap.entrySet()) 062 { 063 String cacheName = entry.getKey(); 064 for (ICacheListener<?, ?> listener : entry.getValue()) 065 { 066 try 067 { 068 if ( log.isInfoEnabled() ) 069 { 070 log.info( "Adding listener to cache watch. ICacheListener = " + listener 071 + " | ICacheObserver = " + cacheWatch ); 072 } 073 cacheWatch.addCacheListener( cacheName, listener ); 074 } 075 catch ( IOException ex ) 076 { 077 log.error( "Problem adding listener. ICacheListener = " + listener + " | ICacheObserver = " 078 + cacheWatch, ex ); 079 } 080 } 081 } 082 } 083 084 /** 085 * Adds a feature to the CacheListener attribute of the CacheWatchRepairable object 086 * <p> 087 * @param cacheName The feature to be added to the CacheListener attribute 088 * @param obj The feature to be added to the CacheListener attribute 089 * @throws IOException 090 */ 091 @Override 092 public <K, V> void addCacheListener( String cacheName, ICacheListener<K, V> obj ) 093 throws IOException 094 { 095 // Record the added cache listener locally, regardless of whether the 096 // remote add-listener operation succeeds or fails. 097 Set<ICacheListener<?, ?>> listenerSet = cacheMap.get( cacheName ); 098 if ( listenerSet == null ) 099 { 100 Set<ICacheListener<?, ?>> newListenerSet = new CopyOnWriteArraySet<ICacheListener<?, ?>>(); 101 listenerSet = cacheMap.putIfAbsent( cacheName, newListenerSet ); 102 103 if (listenerSet == null) 104 { 105 listenerSet = newListenerSet; 106 } 107 } 108 109 listenerSet.add( obj ); 110 111 if ( log.isInfoEnabled() ) 112 { 113 log.info( "Adding listener to cache watch. ICacheListener = " + obj 114 + " | ICacheObserver = " + cacheWatch + " | cacheName = " + cacheName ); 115 } 116 cacheWatch.addCacheListener( cacheName, obj ); 117 } 118 119 /** 120 * Adds a feature to the CacheListener attribute of the CacheWatchRepairable object 121 * <p> 122 * @param obj The feature to be added to the CacheListener attribute 123 * @throws IOException 124 */ 125 @Override 126 public <K, V> void addCacheListener( ICacheListener<K, V> obj ) 127 throws IOException 128 { 129 // Record the added cache listener locally, regardless of whether the 130 // remote add-listener operation succeeds or fails. 131 for (Set<ICacheListener<?, ?>> listenerSet : cacheMap.values()) 132 { 133 listenerSet.add( obj ); 134 } 135 136 if ( log.isInfoEnabled() ) 137 { 138 log.info( "Adding listener to cache watch. ICacheListener = " + obj 139 + " | ICacheObserver = " + cacheWatch ); 140 } 141 cacheWatch.addCacheListener( obj ); 142 } 143 144 /** 145 * Tell the server to release us. 146 * <p> 147 * @param cacheName 148 * @param obj 149 * @throws IOException 150 */ 151 @Override 152 public <K, V> void removeCacheListener( String cacheName, ICacheListener<K, V> obj ) 153 throws IOException 154 { 155 if ( log.isInfoEnabled() ) 156 { 157 log.info( "removeCacheListener, cacheName [" + cacheName + "]" ); 158 } 159 // Record the removal locally, regardless of whether the remote 160 // remove-listener operation succeeds or fails. 161 Set<ICacheListener<?, ?>> listenerSet = cacheMap.get( cacheName ); 162 if ( listenerSet != null ) 163 { 164 listenerSet.remove( obj ); 165 } 166 cacheWatch.removeCacheListener( cacheName, obj ); 167 } 168 169 /** 170 * @param obj 171 * @throws IOException 172 */ 173 @Override 174 public <K, V> void removeCacheListener( ICacheListener<K, V> obj ) 175 throws IOException 176 { 177 if ( log.isInfoEnabled() ) 178 { 179 log.info( "removeCacheListener, ICacheListener [" + obj + "]" ); 180 } 181 182 // Record the removal locally, regardless of whether the remote 183 // remove-listener operation succeeds or fails. 184 for (Set<ICacheListener<?, ?>> listenerSet : cacheMap.values()) 185 { 186 if ( log.isDebugEnabled() ) 187 { 188 log.debug( "Before removing [" + obj + "] the listenerSet = " + listenerSet ); 189 } 190 listenerSet.remove( obj ); 191 } 192 cacheWatch.removeCacheListener( obj ); 193 } 194}