001package org.apache.commons.jcs.auxiliary.lateral.socket.tcp; 002 003/* 004 * Licensed to the Apache Software Foundation (ASF) under one 005 * or more contributor license agreements. See the NOTICE file 006 * distributed with this work for additional information 007 * regarding copyright ownership. The ASF licenses this file 008 * to you under the Apache License, Version 2.0 (the 009 * "License"); you may not use this file except in compliance 010 * with the License. You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, 015 * software distributed under the License is distributed on an 016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 017 * KIND, either express or implied. See the License for the 018 * specific language governing permissions and limitations 019 * under the License. 020 */ 021 022import org.apache.commons.jcs.auxiliary.AuxiliaryCache; 023import org.apache.commons.jcs.auxiliary.AuxiliaryCacheAttributes; 024import org.apache.commons.jcs.auxiliary.lateral.LateralCacheAttributes; 025import org.apache.commons.jcs.auxiliary.lateral.LateralCacheNoWait; 026import org.apache.commons.jcs.auxiliary.lateral.LateralCacheNoWaitFacade; 027import org.apache.commons.jcs.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes; 028import org.apache.commons.jcs.engine.behavior.ICompositeCacheManager; 029import org.apache.commons.jcs.utils.discovery.DiscoveredService; 030import org.apache.commons.jcs.utils.discovery.behavior.IDiscoveryListener; 031import org.apache.commons.logging.Log; 032import org.apache.commons.logging.LogFactory; 033 034import java.util.ArrayList; 035import java.util.Collections; 036import java.util.HashMap; 037import java.util.HashSet; 038import java.util.Map; 039import java.util.Set; 040 041/** 042 * This knows how to add and remove discovered services. It observes UDP discovery events. 043 * <p> 044 * We can have one listener per region, or one shared by all regions. 045 */ 046public class LateralTCPDiscoveryListener 047 implements IDiscoveryListener 048{ 049 /** The log factory */ 050 private static final Log log = LogFactory.getLog( LateralTCPDiscoveryListener.class ); 051 052 /** 053 * Map of no wait facades. these are used to determine which regions are locally configured to 054 * use laterals. 055 */ 056 private final Map<String, LateralCacheNoWaitFacade<?, ?>> facades = 057 Collections.synchronizedMap( new HashMap<String, LateralCacheNoWaitFacade<?, ?>>() ); 058 059 /** 060 * List of regions that are configured differently here than on another server. We keep track of 061 * this to limit the amount of info logging. 062 */ 063 private final Set<String> knownDifferentlyConfiguredRegions = 064 Collections.synchronizedSet( new HashSet<String>() ); 065 066 /** The name of the cache factory */ 067 private String factoryName; 068 069 /** Reference to the cache manager for auxiliary cache access */ 070 private ICompositeCacheManager cacheManager; 071 072 /** 073 * This plugs into the udp discovery system. It will receive add and remove events. 074 * <p> 075 * @param factoryName the name of the related cache factory 076 * @param cacheManager the global cache manager 077 */ 078 protected LateralTCPDiscoveryListener( String factoryName, ICompositeCacheManager cacheManager ) 079 { 080 this.factoryName = factoryName; 081 this.cacheManager = cacheManager; 082 } 083 084 /** 085 * Adds a nowait facade under this cachename. If one already existed, it will be overridden. 086 * <p> 087 * This adds nowaits to a facade for the region name. If the region has no facade, then it is 088 * not configured to use the lateral cache, and no facade will be created. 089 * <p> 090 * @param cacheName - the region name 091 * @param facade - facade (for region) => multiple lateral clients. 092 * @return true if the facade was not already registered. 093 */ 094 public synchronized boolean addNoWaitFacade( String cacheName, LateralCacheNoWaitFacade<?, ?> facade ) 095 { 096 boolean isNew = !containsNoWaitFacade( cacheName ); 097 098 // override or put anew, it doesn't matter 099 facades.put( cacheName, facade ); 100 knownDifferentlyConfiguredRegions.remove( cacheName ); 101 102 return isNew; 103 } 104 105 /** 106 * Allows us to see if the facade is present. 107 * <p> 108 * @param cacheName - facades are for a region 109 * @return do we contain the no wait. true if so 110 */ 111 public boolean containsNoWaitFacade( String cacheName ) 112 { 113 return facades.containsKey( cacheName ); 114 } 115 116 /** 117 * Allows us to see if the facade is present and if it has the no wait. 118 * <p> 119 * @param cacheName - facades are for a region 120 * @param noWait - is this no wait in the facade 121 * @return do we contain the no wait. true if so 122 */ 123 public <K, V> boolean containsNoWait( String cacheName, LateralCacheNoWait<K, V> noWait ) 124 { 125 @SuppressWarnings("unchecked") // Need to cast because of common map for all facades 126 LateralCacheNoWaitFacade<K, V> facade = (LateralCacheNoWaitFacade<K, V>)facades.get( noWait.getCacheName() ); 127 if ( facade == null ) 128 { 129 return false; 130 } 131 132 return facade.containsNoWait( noWait ); 133 } 134 135 /** 136 * When a broadcast is received from the UDP Discovery receiver, for each cacheName in the 137 * message, the add no wait will be called here. To add a no wait, the facade is looked up for 138 * this cache name. 139 * <p> 140 * Each region has a facade. The facade contains a list of end points--the other tcp lateral 141 * services. 142 * <p> 143 * @param noWait 144 * @return true if we found the no wait and added it. False if the no wait was not present or it 145 * we already had it. 146 */ 147 protected <K, V> boolean addNoWait( LateralCacheNoWait<K, V> noWait ) 148 { 149 @SuppressWarnings("unchecked") // Need to cast because of common map for all facades 150 LateralCacheNoWaitFacade<K, V> facade = (LateralCacheNoWaitFacade<K, V>)facades.get( noWait.getCacheName() ); 151 if ( log.isDebugEnabled() ) 152 { 153 log.debug( "addNoWait > Got facade for " + noWait.getCacheName() + " = " + facade ); 154 } 155 156 if ( facade != null ) 157 { 158 boolean isNew = facade.addNoWait( noWait ); 159 if ( log.isDebugEnabled() ) 160 { 161 log.debug( "Called addNoWait, isNew = " + isNew ); 162 } 163 return isNew; 164 } 165 else 166 { 167 if ( !knownDifferentlyConfiguredRegions.contains( noWait.getCacheName() ) ) 168 { 169 if ( log.isInfoEnabled() ) 170 { 171 log.info( "addNoWait > Different nodes are configured differently or region [" 172 + noWait.getCacheName() + "] is not yet used on this side. " ); 173 } 174 knownDifferentlyConfiguredRegions.add( noWait.getCacheName() ); 175 } 176 return false; 177 } 178 } 179 180 /** 181 * Look up the facade for the name. If it doesn't exist, then the region is not configured for 182 * use with the lateral cache. If it is present, remove the item from the no wait list. 183 * <p> 184 * @param noWait 185 * @return true if we found the no wait and removed it. False if the no wait was not present. 186 */ 187 protected <K, V> boolean removeNoWait( LateralCacheNoWait<K, V> noWait ) 188 { 189 @SuppressWarnings("unchecked") // Need to cast because of common map for all facades 190 LateralCacheNoWaitFacade<K, V> facade = (LateralCacheNoWaitFacade<K, V>)facades.get( noWait.getCacheName() ); 191 if ( log.isDebugEnabled() ) 192 { 193 log.debug( "removeNoWait > Got facade for " + noWait.getCacheName() + " = " + facade ); 194 } 195 196 if ( facade != null ) 197 { 198 boolean removed = facade.removeNoWait( noWait ); 199 if ( log.isDebugEnabled() ) 200 { 201 log.debug( "Called removeNoWait, removed " + removed ); 202 } 203 return removed; 204 } 205 else 206 { 207 if ( !knownDifferentlyConfiguredRegions.contains( noWait.getCacheName() ) ) 208 { 209 if ( log.isInfoEnabled() ) 210 { 211 log.info( "removeNoWait > Different nodes are configured differently or region [" 212 + noWait.getCacheName() + "] is not yet used on this side. " ); 213 } 214 knownDifferentlyConfiguredRegions.add( noWait.getCacheName() ); 215 } 216 return false; 217 } 218 } 219 220 /** 221 * Creates the lateral cache if needed. 222 * <p> 223 * We could go to the composite cache manager and get the the cache for the region. This would 224 * force a full configuration of the region. One advantage of this would be that the creation of 225 * the later would go through the factory, which would add the item to the no wait list. But we 226 * don't want to do this. This would force this client to have all the regions as the other. 227 * This might not be desired. We don't want to send or receive for a region here that is either 228 * not used or not configured to use the lateral. 229 * <p> 230 * Right now, I'm afraid that the region will get puts if another instance has the region 231 * configured to use the lateral and our address is configured. This might be a bug, but it 232 * shouldn't happen with discovery. 233 * <p> 234 * @param service 235 */ 236 @Override 237 public void addDiscoveredService( DiscoveredService service ) 238 { 239 // get a cache and add it to the no waits 240 // the add method should not add the same. 241 // we need the listener port from the original config. 242 ArrayList<String> regions = service.getCacheNames(); 243 String serverAndPort = service.getServiceAddress() + ":" + service.getServicePort(); 244 245 if ( regions != null ) 246 { 247 // for each region get the cache 248 for (String cacheName : regions) 249 { 250 AuxiliaryCache<?, ?> ic = cacheManager.getAuxiliaryCache(factoryName, cacheName); 251 252 if ( log.isDebugEnabled() ) 253 { 254 log.debug( "Got cache, ic = " + ic ); 255 } 256 257 // add this to the nowaits for this cachename 258 if ( ic != null ) 259 { 260 AuxiliaryCacheAttributes aca = ic.getAuxiliaryCacheAttributes(); 261 if (aca instanceof ITCPLateralCacheAttributes) 262 { 263 ITCPLateralCacheAttributes lca = (ITCPLateralCacheAttributes)aca; 264 if (lca.getTransmissionType() != LateralCacheAttributes.Type.TCP 265 || !serverAndPort.equals(lca.getTcpServer()) ) 266 { 267 // skip caches not belonging to this service 268 continue; 269 } 270 } 271 272 addNoWait( (LateralCacheNoWait<?, ?>) ic ); 273 if ( log.isDebugEnabled() ) 274 { 275 log.debug( "Called addNoWait for cacheName [" + cacheName + "]" ); 276 } 277 } 278 } 279 } 280 else 281 { 282 log.warn( "No cache names found in message " + service ); 283 } 284 } 285 286 /** 287 * Removes the lateral cache. 288 * <p> 289 * We need to tell the manager that this instance is bad, so it will reconnect the sender if it 290 * comes back. 291 * <p> 292 * @param service 293 */ 294 @Override 295 public void removeDiscoveredService( DiscoveredService service ) 296 { 297 // get a cache and add it to the no waits 298 // the add method should not add the same. 299 // we need the listener port from the original config. 300 ArrayList<String> regions = service.getCacheNames(); 301 String serverAndPort = service.getServiceAddress() + ":" + service.getServicePort(); 302 303 if ( regions != null ) 304 { 305 // for each region get the cache 306 for (String cacheName : regions) 307 { 308 AuxiliaryCache<?, ?> ic = cacheManager.getAuxiliaryCache(factoryName, cacheName); 309 310 if ( log.isDebugEnabled() ) 311 { 312 log.debug( "Got cache, ic = " + ic ); 313 } 314 315 // remove this to the nowaits for this cachename 316 if ( ic != null ) 317 { 318 AuxiliaryCacheAttributes aca = ic.getAuxiliaryCacheAttributes(); 319 if (aca instanceof ITCPLateralCacheAttributes) 320 { 321 ITCPLateralCacheAttributes lca = (ITCPLateralCacheAttributes)aca; 322 if (lca.getTransmissionType() != LateralCacheAttributes.Type.TCP 323 || !serverAndPort.equals(lca.getTcpServer()) ) 324 { 325 // skip caches not belonging to this service 326 continue; 327 } 328 } 329 330 removeNoWait( (LateralCacheNoWait<?, ?>) ic ); 331 if ( log.isDebugEnabled() ) 332 { 333 log.debug( "Called removeNoWait for cacheName [" + cacheName + "]" ); 334 } 335 } 336 } 337 } 338 else 339 { 340 log.warn( "No cache names found in message " + service ); 341 } 342 } 343}