001package org.apache.commons.jcs.utils.discovery; 002 003/* 004 * Licensed to the Apache Software Foundation (ASF) under one 005 * or more contributor license agreements. See the NOTICE file 006 * distributed with this work for additional information 007 * regarding copyright ownership. The ASF licenses this file 008 * to you under the Apache License, Version 2.0 (the 009 * "License"); you may not use this file except in compliance 010 * with the License. You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, 015 * software distributed under the License is distributed on an 016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 017 * KIND, either express or implied. See the License for the 018 * specific language governing permissions and limitations 019 * under the License. 020 */ 021 022import java.io.IOException; 023import java.net.UnknownHostException; 024import java.util.ArrayList; 025import java.util.HashSet; 026import java.util.Set; 027import java.util.concurrent.CopyOnWriteArraySet; 028import java.util.concurrent.ScheduledExecutorService; 029import java.util.concurrent.TimeUnit; 030 031import org.apache.commons.jcs.engine.behavior.IRequireScheduler; 032import org.apache.commons.jcs.engine.behavior.IShutdownObserver; 033import org.apache.commons.jcs.utils.discovery.behavior.IDiscoveryListener; 034import org.apache.commons.jcs.utils.net.HostNameUtil; 035import org.apache.commons.logging.Log; 036import org.apache.commons.logging.LogFactory; 037 038/** 039 * This service creates a listener that can create lateral caches and add them to the no wait list. 040 * <p> 041 * It also creates a sender that periodically broadcasts its availability. 042 * <p> 043 * The sender also broadcasts a request for other caches to broadcast their addresses. 044 * <p> 045 * @author Aaron Smuts 046 */ 047public class UDPDiscoveryService 048 implements IShutdownObserver, IRequireScheduler 049{ 050 /** The logger */ 051 private static final Log log = LogFactory.getLog( UDPDiscoveryService.class ); 052 053 /** thread that listens for messages */ 054 private Thread udpReceiverThread; 055 056 /** the runnable that the receiver thread runs */ 057 private UDPDiscoveryReceiver receiver; 058 059 /** the runnable that sends messages via the clock daemon */ 060 private UDPDiscoverySenderThread sender = null; 061 062 /** attributes */ 063 private UDPDiscoveryAttributes udpDiscoveryAttributes = null; 064 065 /** is this shut down? */ 066 private boolean shutdown = false; 067 068 /** This is a set of services that have been discovered. */ 069 private Set<DiscoveredService> discoveredServices = new CopyOnWriteArraySet<DiscoveredService>(); 070 071 /** This a list of regions that are configured to use discovery. */ 072 private final Set<String> cacheNames = new CopyOnWriteArraySet<String>(); 073 074 /** Set of listeners. */ 075 private final Set<IDiscoveryListener> discoveryListeners = new CopyOnWriteArraySet<IDiscoveryListener>(); 076 077 /** 078 * @param attributes 079 */ 080 public UDPDiscoveryService( UDPDiscoveryAttributes attributes) 081 { 082 udpDiscoveryAttributes = (UDPDiscoveryAttributes) attributes.clone(); 083 084 try 085 { 086 // todo, you should be able to set this 087 udpDiscoveryAttributes.setServiceAddress( HostNameUtil.getLocalHostAddress() ); 088 } 089 catch ( UnknownHostException e ) 090 { 091 log.error( "Couldn't get localhost address", e ); 092 } 093 094 try 095 { 096 // todo need some kind of recovery here. 097 receiver = new UDPDiscoveryReceiver( this, getUdpDiscoveryAttributes().getUdpDiscoveryAddr(), 098 getUdpDiscoveryAttributes().getUdpDiscoveryPort() ); 099 } 100 catch ( IOException e ) 101 { 102 log.error( "Problem creating UDPDiscoveryReceiver, address [" 103 + getUdpDiscoveryAttributes().getUdpDiscoveryAddr() + "] port [" 104 + getUdpDiscoveryAttributes().getUdpDiscoveryPort() + "] we won't be able to find any other caches", e ); 105 } 106 107 // create a sender thread 108 sender = new UDPDiscoverySenderThread( getUdpDiscoveryAttributes(), getCacheNames() ); 109 } 110 111 /** 112 * @see org.apache.commons.jcs.engine.behavior.IRequireScheduler#setScheduledExecutorService(java.util.concurrent.ScheduledExecutorService) 113 */ 114 @Override 115 public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutor) 116 { 117 if (sender != null) 118 { 119 scheduledExecutor.scheduleAtFixedRate(sender, 0, 15, TimeUnit.SECONDS); 120 } 121 122 /** removes things that have been idle for too long */ 123 UDPCleanupRunner cleanup = new UDPCleanupRunner( this ); 124 // I'm going to use this as both, but it could happen 125 // that something could hang around twice the time using this as the 126 // delay and the idle time. 127 scheduledExecutor.scheduleAtFixedRate(cleanup, 0, getUdpDiscoveryAttributes().getMaxIdleTimeSec(), TimeUnit.SECONDS); 128 } 129 130 /** 131 * Send a passive broadcast in response to a request broadcast. Never send a request for a 132 * request. We can respond to our own requests, since a request broadcast is not intended as a 133 * connection request. We might want to only send messages, so we would send a request, but 134 * never a passive broadcast. 135 */ 136 protected void serviceRequestBroadcast() 137 { 138 UDPDiscoverySender sender1 = null; 139 try 140 { 141 // create this connection each time. 142 // more robust 143 sender1 = new UDPDiscoverySender( getUdpDiscoveryAttributes().getUdpDiscoveryAddr(), 144 getUdpDiscoveryAttributes().getUdpDiscoveryPort() ); 145 146 sender1.passiveBroadcast( getUdpDiscoveryAttributes().getServiceAddress(), getUdpDiscoveryAttributes() 147 .getServicePort(), this.getCacheNames() ); 148 149 // todo we should consider sending a request broadcast every so 150 // often. 151 152 if ( log.isDebugEnabled() ) 153 { 154 log.debug( "Called sender to issue a passive broadcast" ); 155 } 156 } 157 catch ( Exception e ) 158 { 159 log.error( "Problem calling the UDP Discovery Sender. address [" 160 + getUdpDiscoveryAttributes().getUdpDiscoveryAddr() + "] port [" 161 + getUdpDiscoveryAttributes().getUdpDiscoveryPort() + "]", e ); 162 } 163 finally 164 { 165 try 166 { 167 if ( sender1 != null ) 168 { 169 sender1.destroy(); 170 } 171 } 172 catch ( Exception e ) 173 { 174 log.error( "Problem closing Passive Broadcast sender, while servicing a request broadcast.", e ); 175 } 176 } 177 } 178 179 /** 180 * Adds a region to the list that is participating in discovery. 181 * <p> 182 * @param cacheName 183 */ 184 public void addParticipatingCacheName( String cacheName ) 185 { 186 cacheNames.add( cacheName ); 187 sender.setCacheNames( getCacheNames() ); 188 } 189 190 /** 191 * Removes the discovered service from the list and calls the discovery listener. 192 * <p> 193 * @param service 194 */ 195 public void removeDiscoveredService( DiscoveredService service ) 196 { 197 boolean contained = getDiscoveredServices().remove( service ); 198 199 if ( contained ) 200 { 201 if ( log.isInfoEnabled() ) 202 { 203 log.info( "Removing " + service ); 204 } 205 } 206 207 for (IDiscoveryListener listener : getDiscoveryListeners()) 208 { 209 listener.removeDiscoveredService( service ); 210 } 211 } 212 213 /** 214 * Add a service to the list. Update the held copy if we already know about it. 215 * <p> 216 * @param discoveredService discovered service 217 */ 218 protected void addOrUpdateService( DiscoveredService discoveredService ) 219 { 220 synchronized ( getDiscoveredServices() ) 221 { 222 // Since this is a set we can add it over an over. 223 // We want to replace the old one, since we may add info that is not part of the equals. 224 // The equals method on the object being added is intentionally restricted. 225 if ( !getDiscoveredServices().contains( discoveredService ) ) 226 { 227 if ( log.isInfoEnabled() ) 228 { 229 log.info( "Set does not contain service. I discovered " + discoveredService ); 230 } 231 if ( log.isDebugEnabled() ) 232 { 233 log.debug( "Adding service in the set " + discoveredService ); 234 } 235 getDiscoveredServices().add( discoveredService ); 236 } 237 else 238 { 239 if ( log.isDebugEnabled() ) 240 { 241 log.debug( "Set contains service." ); 242 } 243 if ( log.isDebugEnabled() ) 244 { 245 log.debug( "Updating service in the set " + discoveredService ); 246 } 247 248 // Update the list of cache names if it has changed. 249 DiscoveredService theOldServiceInformation = null; 250 // need to update the time this sucks. add has no effect convert to a map 251 for (DiscoveredService service1 : getDiscoveredServices()) 252 { 253 if ( discoveredService.equals( service1 ) ) 254 { 255 theOldServiceInformation = service1; 256 break; 257 } 258 } 259 if ( theOldServiceInformation != null ) 260 { 261 if ( !theOldServiceInformation.getCacheNames().equals( discoveredService.getCacheNames() ) ) 262 { 263 if ( log.isInfoEnabled() ) 264 { 265 log.info( "List of cache names changed for service: " + discoveredService ); 266 } 267 } 268 } 269 270 // replace it, we want to reset the payload and the last heard from time. 271 getDiscoveredServices().remove( discoveredService ); 272 getDiscoveredServices().add( discoveredService ); 273 } 274 } 275 // Always Notify the listeners 276 // If we don't do this, then if a region using the default config is initialized after notification, 277 // it will never get the service in it's no wait list. 278 // Leave it to the listeners to decide what to do. 279 for (IDiscoveryListener listener : getDiscoveryListeners()) 280 { 281 listener.addDiscoveredService( discoveredService ); 282 } 283 284 } 285 286 /** 287 * Get all the cache names we have facades for. 288 * <p> 289 * @return ArrayList 290 */ 291 protected ArrayList<String> getCacheNames() 292 { 293 ArrayList<String> names = new ArrayList<String>(); 294 names.addAll( cacheNames ); 295 return names; 296 } 297 298 /** 299 * @param attr The UDPDiscoveryAttributes to set. 300 */ 301 public void setUdpDiscoveryAttributes( UDPDiscoveryAttributes attr ) 302 { 303 this.udpDiscoveryAttributes = attr; 304 } 305 306 /** 307 * @return Returns the lca. 308 */ 309 public UDPDiscoveryAttributes getUdpDiscoveryAttributes() 310 { 311 return this.udpDiscoveryAttributes; 312 } 313 314 /** 315 * Start necessary receiver thread 316 */ 317 public void startup() 318 { 319 udpReceiverThread = new Thread( receiver ); 320 udpReceiverThread.setDaemon( true ); 321 // udpReceiverThread.setName( t.getName() + "--UDPReceiver" ); 322 udpReceiverThread.start(); 323 } 324 325 /** 326 * Shuts down the receiver. 327 */ 328 @Override 329 public void shutdown() 330 { 331 if ( !shutdown ) 332 { 333 shutdown = true; 334 335 if ( log.isInfoEnabled() ) 336 { 337 log.info( "Shutting down UDP discovery service receiver." ); 338 } 339 340 try 341 { 342 // no good way to do this right now. 343 receiver.shutdown(); 344 udpReceiverThread.interrupt(); 345 } 346 catch ( Exception e ) 347 { 348 log.error( "Problem interrupting UDP receiver thread." ); 349 } 350 351 if ( log.isInfoEnabled() ) 352 { 353 log.info( "Shutting down UDP discovery service sender." ); 354 } 355 356 // also call the shutdown on the sender thread itself, which 357 // will result in a remove command. 358 try 359 { 360 sender.shutdown(); 361 } 362 catch ( Exception e ) 363 { 364 log.error( "Problem issuing remove broadcast via UDP sender." ); 365 } 366 } 367 else 368 { 369 if ( log.isDebugEnabled() ) 370 { 371 log.debug( "Shutdown already called." ); 372 } 373 } 374 } 375 376 /** 377 * @return Returns the discoveredServices. 378 */ 379 public synchronized Set<DiscoveredService> getDiscoveredServices() 380 { 381 return discoveredServices; 382 } 383 384 /** 385 * @return the discoveryListeners 386 */ 387 private Set<IDiscoveryListener> getDiscoveryListeners() 388 { 389 return discoveryListeners; 390 } 391 392 /** 393 * @return the discoveryListeners 394 */ 395 public Set<IDiscoveryListener> getCopyOfDiscoveryListeners() 396 { 397 Set<IDiscoveryListener> copy = new HashSet<IDiscoveryListener>(); 398 copy.addAll( getDiscoveryListeners() ); 399 return copy; 400 } 401 402 /** 403 * Adds a listener. 404 * <p> 405 * @param listener 406 * @return true if it wasn't already in the set 407 */ 408 public boolean addDiscoveryListener( IDiscoveryListener listener ) 409 { 410 return getDiscoveryListeners().add( listener ); 411 } 412 413 /** 414 * Removes a listener. 415 * <p> 416 * @param listener 417 * @return true if it was in the set 418 */ 419 public boolean removeDiscoveryListener( IDiscoveryListener listener ) 420 { 421 return getDiscoveryListeners().remove( listener ); 422 } 423}