001 package org.apache.jcs.auxiliary.lateral.socket.tcp;
002
003 import java.io.Serializable;
004 import java.util.ArrayList;
005 import java.util.Collections;
006 import java.util.HashMap;
007 import java.util.HashSet;
008 import java.util.Map;
009 import java.util.Set;
010
011 import org.apache.commons.logging.Log;
012 import org.apache.commons.logging.LogFactory;
013 import org.apache.jcs.auxiliary.lateral.LateralCacheAttributes;
014 import org.apache.jcs.auxiliary.lateral.LateralCacheNoWait;
015 import org.apache.jcs.auxiliary.lateral.LateralCacheNoWaitFacade;
016 import org.apache.jcs.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
017 import org.apache.jcs.engine.behavior.ICache;
018 import org.apache.jcs.engine.behavior.ICompositeCacheManager;
019 import org.apache.jcs.engine.behavior.IElementSerializer;
020 import org.apache.jcs.engine.logging.behavior.ICacheEventLogger;
021 import org.apache.jcs.utils.discovery.DiscoveredService;
022 import org.apache.jcs.utils.discovery.behavior.IDiscoveryListener;
023
024 /**
025 * This knows how to add and remove discovered services. It observes UDP discovery events.
026 * <p>
027 * We can have one listener per region, or one shared by all regions.
028 */
029 public class LateralTCPDiscoveryListener
030 implements IDiscoveryListener
031 {
032 /** The log factory */
033 private final static Log log = LogFactory.getLog( LateralTCPDiscoveryListener.class );
034
035 /**
036 * Map of no wait facades. these are used to determine which regions are locally configured to
037 * use laterals.
038 */
039 private final Map<String, LateralCacheNoWaitFacade<? extends Serializable, ? extends Serializable>> facades =
040 Collections.synchronizedMap( new HashMap<String, LateralCacheNoWaitFacade<? extends Serializable, ? extends Serializable>>() );
041
042 /**
043 * List of regions that are configured differently here than on another server. We keep track of
044 * this to limit the amount of info logging.
045 */
046 private final Set<String> knownDifferentlyConfiguredRegions =
047 Collections.synchronizedSet( new HashSet<String>() );
048
049 /** The cache manager. */
050 private final ICompositeCacheManager cacheMgr;
051
052 /** The event logger. */
053 protected ICacheEventLogger cacheEventLogger;
054
055 /** The serializer. */
056 protected IElementSerializer elementSerializer;
057
058 /**
059 * This plugs into the udp discovery system. It will receive add and remove events.
060 * <p>
061 * @param cacheMgr
062 * @param cacheEventLogger
063 * @param elementSerializer
064 */
065 protected LateralTCPDiscoveryListener( ICompositeCacheManager cacheMgr, ICacheEventLogger cacheEventLogger,
066 IElementSerializer elementSerializer )
067 {
068 this.cacheMgr = cacheMgr;
069 this.cacheEventLogger = cacheEventLogger;
070 this.elementSerializer = elementSerializer;
071 }
072
073 /**
074 * Adds a nowait facade under this cachename. If one already existed, it will be overridden.
075 * <p>
076 * This adds nowaits to a facade for the region name. If the region has no facade, then it is
077 * not configured to use the lateral cache, and no facade will be created.
078 * <p>
079 * @param cacheName - the region name
080 * @param facade - facade (for region) => multiple lateral clients.
081 * @return true if the facade was not already registered.
082 */
083 public synchronized boolean addNoWaitFacade( String cacheName, LateralCacheNoWaitFacade<? extends Serializable, ? extends Serializable> facade )
084 {
085 boolean isNew = !containsNoWaitFacade( cacheName );
086
087 // override or put anew, it doesn't matter
088 facades.put( cacheName, facade );
089 knownDifferentlyConfiguredRegions.remove( cacheName );
090
091 return isNew;
092 }
093
094 /**
095 * Allows us to see if the facade is present.
096 * <p>
097 * @param cacheName - facades are for a region
098 * @return do we contain the no wait. true if so
099 */
100 public boolean containsNoWaitFacade( String cacheName )
101 {
102 return facades.containsKey( cacheName );
103 }
104
105 /**
106 * Allows us to see if the facade is present and if it has the no wait.
107 * <p>
108 * @param cacheName - facades are for a region
109 * @param noWait - is this no wait in the facade
110 * @return do we contain the no wait. true if so
111 */
112 public <K extends Serializable, V extends Serializable> boolean containsNoWait( String cacheName, LateralCacheNoWait<K, V> noWait )
113 {
114 @SuppressWarnings("unchecked") // Need to cast because of common map for all facades
115 LateralCacheNoWaitFacade<K, V> facade = (LateralCacheNoWaitFacade<K, V>)facades.get( noWait.getCacheName() );
116 if ( facade == null )
117 {
118 return false;
119 }
120
121 return facade.containsNoWait( noWait );
122 }
123
124 /**
125 * When a broadcast is received from the UDP Discovery receiver, for each cacheName in the
126 * message, the add no wait will be called here. To add a no wait, the facade is looked up for
127 * this cache name.
128 * <p>
129 * Each region has a facade. The facade contains a list of end points--the other tcp lateral
130 * services.
131 * <p>
132 * @param noWait
133 * @return true if we found the no wait and added it. False if the no wait was not present or it
134 * we already had it.
135 */
136 protected <K extends Serializable, V extends Serializable> boolean addNoWait( LateralCacheNoWait<K, V> noWait )
137 {
138 @SuppressWarnings("unchecked") // Need to cast because of common map for all facades
139 LateralCacheNoWaitFacade<K, V> facade = (LateralCacheNoWaitFacade<K, V>)facades.get( noWait.getCacheName() );
140 if ( log.isDebugEnabled() )
141 {
142 log.debug( "addNoWait > Got facade for " + noWait.getCacheName() + " = " + facade );
143 }
144
145 if ( facade != null )
146 {
147 boolean isNew = facade.addNoWait( noWait );
148 if ( log.isDebugEnabled() )
149 {
150 log.debug( "Called addNoWait, isNew = " + isNew );
151 }
152 return isNew;
153 }
154 else
155 {
156 if ( !knownDifferentlyConfiguredRegions.contains( noWait.getCacheName() ) )
157 {
158 if ( log.isInfoEnabled() )
159 {
160 log.info( "addNoWait > Different nodes are configured differently or region ["
161 + noWait.getCacheName() + "] is not yet used on this side. " );
162 }
163 knownDifferentlyConfiguredRegions.add( noWait.getCacheName() );
164 }
165 return false;
166 }
167 }
168
169 /**
170 * Look up the facade for the name. If it doesn't exist, then the region is not configured for
171 * use with the lateral cache. If it is present, remove the item from the no wait list.
172 * <p>
173 * @param noWait
174 * @return true if we found the no wait and removed it. False if the no wait was not present.
175 */
176 protected <K extends Serializable, V extends Serializable> boolean removeNoWait( LateralCacheNoWait<K, V> noWait )
177 {
178 @SuppressWarnings("unchecked") // Need to cast because of common map for all facades
179 LateralCacheNoWaitFacade<K, V> facade = (LateralCacheNoWaitFacade<K, V>)facades.get( noWait.getCacheName() );
180 if ( log.isDebugEnabled() )
181 {
182 log.debug( "removeNoWait > Got facade for " + noWait.getCacheName() + " = " + facade );
183 }
184
185 if ( facade != null )
186 {
187 boolean removed = facade.removeNoWait( noWait );
188 if ( log.isDebugEnabled() )
189 {
190 log.debug( "Called removeNoWait, removed " + removed );
191 }
192 return removed;
193 }
194 else
195 {
196 if ( !knownDifferentlyConfiguredRegions.contains( noWait.getCacheName() ) )
197 {
198 if ( log.isInfoEnabled() )
199 {
200 log.info( "removeNoWait > Different nodes are configured differently or region ["
201 + noWait.getCacheName() + "] is not yet used on this side. " );
202 }
203 knownDifferentlyConfiguredRegions.add( noWait.getCacheName() );
204 }
205 return false;
206 }
207 }
208
209 /**
210 * Creates the lateral cache if needed.
211 * <p>
212 * We could go to the composite cache manager and get the the cache for the region. This would
213 * force a full configuration of the region. One advantage of this would be that the creation of
214 * the later would go through the factory, which would add the item to the no wait list. But we
215 * don't want to do this. This would force this client to have all the regions as the other.
216 * This might not be desired. We don't want to send or receive for a region here that is either
217 * not used or not configured to use the lateral.
218 * <p>
219 * Right now, I'm afraid that the region will get puts if another instance has the region
220 * configured to use the lateral and our address is configured. This might be a bug, but it
221 * shouldn't happen with discovery.
222 * <p>
223 * @param service
224 */
225 public void addDiscoveredService( DiscoveredService service )
226 {
227 // get a cache and add it to the no waits
228 // the add method should not add the same.
229 // we need the listener port from the original config.
230 LateralTCPCacheManager lcm = findManagerForServiceEndPoint( service );
231
232 ArrayList<String> regions = service.getCacheNames();
233 if ( regions != null )
234 {
235 // for each region get the cache
236 for (String cacheName : regions)
237 {
238 try
239 {
240 ICache<? extends Serializable, ? extends Serializable> ic = lcm.getCache( cacheName );
241
242 if ( log.isDebugEnabled() )
243 {
244 log.debug( "Got cache, ic = " + ic );
245 }
246
247 // add this to the nowaits for this cachename
248 if ( ic != null )
249 {
250 addNoWait( (LateralCacheNoWait<? extends Serializable, ? extends Serializable>) ic );
251 if ( log.isDebugEnabled() )
252 {
253 log.debug( "Called addNoWait for cacheName [" + cacheName + "]" );
254 }
255 }
256 }
257 catch ( Exception e )
258 {
259 log.error( "Problem creating no wait", e );
260 }
261 }
262 }
263 else
264 {
265 log.warn( "No cache names found in message " + service );
266 }
267 }
268
269 /**
270 * Removes the lateral cache.
271 * <p>
272 * We need to tell the manager that this instance is bad, so it will reconnect the sender if it
273 * comes back.
274 * <p>
275 * @param service
276 */
277 public void removeDiscoveredService( DiscoveredService service )
278 {
279 // get a cache and add it to the no waits
280 // the add method should not add the same.
281 // we need the listener port from the original config.
282 LateralTCPCacheManager lcm = findManagerForServiceEndPoint( service );
283
284 ArrayList<String> regions = service.getCacheNames();
285 if ( regions != null )
286 {
287 // for each region get the cache
288 for (String cacheName : regions)
289 {
290 try
291 {
292 ICache<? extends Serializable, ? extends Serializable> ic = lcm.getCache( cacheName );
293
294 if ( log.isDebugEnabled() )
295 {
296 log.debug( "Got cache, ic = " + ic );
297 }
298
299 // remove this to the nowaits for this cachename
300 if ( ic != null )
301 {
302 removeNoWait( (LateralCacheNoWait<? extends Serializable, ? extends Serializable>) ic );
303 if ( log.isDebugEnabled() )
304 {
305 log.debug( "Called removeNoWait for cacheName [" + cacheName + "]" );
306 }
307 }
308 }
309 catch ( Exception e )
310 {
311 log.error( "Problem removing no wait", e );
312 }
313 }
314 }
315 else
316 {
317 log.warn( "No cache names found in message " + service );
318 }
319 }
320
321 /**
322 * Gets the appropriate manager.
323 * <p>
324 * @param service
325 * @return LateralTCPCacheManager configured for that end point.
326 */
327 private LateralTCPCacheManager findManagerForServiceEndPoint( DiscoveredService service )
328 {
329 ITCPLateralCacheAttributes lca = new TCPLateralCacheAttributes();
330 lca.setTransmissionType( LateralCacheAttributes.TCP );
331 lca.setTcpServer( service.getServiceAddress() + ":" + service.getServicePort() );
332 LateralTCPCacheManager lcm = LateralTCPCacheManager.getInstance( lca, cacheMgr, cacheEventLogger,
333 elementSerializer );
334 return lcm;
335 }
336 }