View Javadoc

1   package org.apache.jcs.utils.discovery;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.net.UnknownHostException;
23  import java.util.ArrayList;
24  import java.util.HashSet;
25  import java.util.Set;
26  import java.util.concurrent.CopyOnWriteArraySet;
27  import java.util.concurrent.ScheduledExecutorService;
28  import java.util.concurrent.TimeUnit;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.jcs.engine.behavior.IRequireScheduler;
33  import org.apache.jcs.engine.behavior.IShutdownObserver;
34  import org.apache.jcs.engine.logging.behavior.ICacheEventLogger;
35  import org.apache.jcs.utils.discovery.behavior.IDiscoveryListener;
36  import org.apache.jcs.utils.net.HostNameUtil;
37  
38  /**
39   * This service creates a listener that can create lateral caches and add them to the no wait list.
40   * <p>
41   * It also creates a sender that periodically broadcasts its availability.
42   * <p>
43   * The sender also broadcasts a request for other caches to broadcast their addresses.
44   * <p>
45   * @author Aaron Smuts
46   */
47  public class UDPDiscoveryService
48      implements IShutdownObserver, IRequireScheduler
49  {
50      /** The logger */
51      private final static Log log = LogFactory.getLog( UDPDiscoveryService.class );
52  
53      /** thread that listens for messages */
54      private Thread udpReceiverThread;
55  
56      /** the runnable that the receiver thread runs */
57      private UDPDiscoveryReceiver receiver;
58  
59      /** the runnable that sends messages via the clock daemon */
60      private UDPDiscoverySenderThread sender = null;
61  
62      /** attributes */
63      private UDPDiscoveryAttributes udpDiscoveryAttributes = null;
64  
65      /** is this shut down? */
66      private boolean shutdown = false;
67  
68      /** This is a set of services that have been discovered. */
69      private Set<DiscoveredService> discoveredServices = new CopyOnWriteArraySet<DiscoveredService>();
70  
71      /** This a list of regions that are configured to use discovery. */
72      private final Set<String> cacheNames = new CopyOnWriteArraySet<String>();
73  
74      /** Set of listeners. */
75      private final Set<IDiscoveryListener> discoveryListeners = new CopyOnWriteArraySet<IDiscoveryListener>();
76  
77      /**
78       * @param attributes
79       * @param cacheEventLogger
80       */
81      public UDPDiscoveryService( UDPDiscoveryAttributes attributes, ICacheEventLogger cacheEventLogger )
82      {
83          udpDiscoveryAttributes = (UDPDiscoveryAttributes) attributes.clone();
84  
85          try
86          {
87              // todo, you should be able to set this
88              udpDiscoveryAttributes.setServiceAddress( HostNameUtil.getLocalHostAddress() );
89          }
90          catch ( UnknownHostException e1 )
91          {
92              log.error( "Couldn't get localhost address", e1 );
93          }
94  
95          try
96          {
97              // todo need some kind of recovery here.
98              receiver = new UDPDiscoveryReceiver( this, getUdpDiscoveryAttributes().getUdpDiscoveryAddr(),
99                                                   getUdpDiscoveryAttributes().getUdpDiscoveryPort() );
100             udpReceiverThread = new Thread( receiver );
101             udpReceiverThread.setDaemon( true );
102             // udpReceiverThread.setName( t.getName() + "--UDPReceiver" );
103             udpReceiverThread.start();
104         }
105         catch ( Exception e )
106         {
107             log.error( "Problem creating UDPDiscoveryReceiver, address ["
108                 + getUdpDiscoveryAttributes().getUdpDiscoveryAddr() + "] port ["
109                 + getUdpDiscoveryAttributes().getUdpDiscoveryPort() + "] we won't be able to find any other caches", e );
110         }
111 
112         // create a sender thread
113         sender = new UDPDiscoverySenderThread( getUdpDiscoveryAttributes(), getCacheNames() );
114     }
115 
116     /**
117      * @see org.apache.jcs.engine.behavior.IRequireScheduler#setScheduledExecutorService(java.util.concurrent.ScheduledExecutorService)
118      */
119     public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutor)
120     {
121         if (sender != null)
122         {
123             scheduledExecutor.scheduleAtFixedRate(sender, 0, 15, TimeUnit.SECONDS);
124         }
125 
126         /** removes things that have been idle for too long */
127         UDPCleanupRunner cleanup = new UDPCleanupRunner( this );
128         // I'm going to use this as both, but it could happen
129         // that something could hang around twice the time using this as the
130         // delay and the idle time.
131         scheduledExecutor.scheduleAtFixedRate(cleanup, 0, getUdpDiscoveryAttributes().getMaxIdleTimeSec(), TimeUnit.SECONDS);
132     }
133 
134     /**
135      * Send a passive broadcast in response to a request broadcast. Never send a request for a
136      * request. We can respond to our own requests, since a request broadcast is not intended as a
137      * connection request. We might want to only send messages, so we would send a request, but
138      * never a passive broadcast.
139      */
140     protected void serviceRequestBroadcast()
141     {
142         UDPDiscoverySender sender = null;
143         try
144         {
145             // create this connection each time.
146             // more robust
147             sender = new UDPDiscoverySender( getUdpDiscoveryAttributes().getUdpDiscoveryAddr(),
148                                              getUdpDiscoveryAttributes().getUdpDiscoveryPort() );
149 
150             sender.passiveBroadcast( getUdpDiscoveryAttributes().getServiceAddress(), getUdpDiscoveryAttributes()
151                 .getServicePort(), this.getCacheNames() );
152 
153             // todo we should consider sending a request broadcast every so
154             // often.
155 
156             if ( log.isDebugEnabled() )
157             {
158                 log.debug( "Called sender to issue a passive broadcast" );
159             }
160         }
161         catch ( Exception e )
162         {
163             log.error( "Problem calling the UDP Discovery Sender. address ["
164                 + getUdpDiscoveryAttributes().getUdpDiscoveryAddr() + "] port ["
165                 + getUdpDiscoveryAttributes().getUdpDiscoveryPort() + "]", e );
166         }
167         finally
168         {
169             try
170             {
171                 if ( sender != null )
172                 {
173                     sender.destroy();
174                 }
175             }
176             catch ( Exception e )
177             {
178                 log.error( "Problem closing Passive Broadcast sender, while servicing a request broadcast.", e );
179             }
180         }
181     }
182 
183     /**
184      * Adds a region to the list that is participating in discovery.
185      * <p>
186      * @param cacheName
187      */
188     public void addParticipatingCacheName( String cacheName )
189     {
190         cacheNames.add( cacheName );
191         sender.setCacheNames( getCacheNames() );
192     }
193 
194     /**
195      * Removes the discovered service from the list and calls the discovery listener.
196      * <p>
197      * @param service
198      */
199     public void removeDiscoveredService( DiscoveredService service )
200     {
201         boolean contained = getDiscoveredServices().remove( service );
202 
203         if ( contained )
204         {
205             if ( log.isInfoEnabled() )
206             {
207                 log.info( "Removing " + service );
208             }
209         }
210 
211         for (IDiscoveryListener listener : getDiscoveryListeners())
212         {
213             listener.removeDiscoveredService( service );
214         }
215     }
216 
217     /**
218      * Add a service to the list. Update the held copy if we already know about it.
219      * <p>
220      * @param discoveredService discovered service
221      */
222     protected void addOrUpdateService( DiscoveredService discoveredService )
223     {
224         synchronized ( getDiscoveredServices() )
225         {
226             // Since this is a set we can add it over an over.
227             // We want to replace the old one, since we may add info that is not part of the equals.
228             // The equals method on the object being added is intentionally restricted.
229             if ( !getDiscoveredServices().contains( discoveredService ) )
230             {
231                 if ( log.isInfoEnabled() )
232                 {
233                     log.info( "Set does not contain service. I discovered " + discoveredService );
234                 }
235                 if ( log.isDebugEnabled() )
236                 {
237                     log.debug( "Adding service in the set " + discoveredService );
238                 }
239                 getDiscoveredServices().add( discoveredService );
240             }
241             else
242             {
243                 if ( log.isDebugEnabled() )
244                 {
245                     log.debug( "Set contains service." );
246                 }
247                 if ( log.isDebugEnabled() )
248                 {
249                     log.debug( "Updating service in the set " + discoveredService );
250                 }
251 
252                 // Update the list of cache names if it has changed.
253                 DiscoveredService theOldServiceInformation = null;
254                 // need to update the time this sucks. add has no effect convert to a map
255                 for (DiscoveredService service1 : getDiscoveredServices())
256                 {
257                     if ( discoveredService.equals( service1 ) )
258                     {
259                         theOldServiceInformation = service1;
260                         break;
261                     }
262                 }
263                 if ( theOldServiceInformation != null )
264                 {
265                     if ( !theOldServiceInformation.getCacheNames().equals( discoveredService.getCacheNames() ) )
266                     {
267                         if ( log.isInfoEnabled() )
268                         {
269                             log.info( "List of cache names changed for service: " + discoveredService );
270                         }
271                     }
272                 }
273 
274                 // replace it, we want to reset the payload and the last heard from time.
275                 getDiscoveredServices().remove( discoveredService );
276                 getDiscoveredServices().add( discoveredService );
277             }
278         }
279         // Always Notify the listeners
280         // If we don't do this, then if a region using the default config is initialized after notification,
281         // it will never get the service in it's no wait list.
282         // Leave it to the listeners to decide what to do.
283         for (IDiscoveryListener listener : getDiscoveryListeners())
284         {
285             listener.addDiscoveredService( discoveredService );
286         }
287 
288     }
289 
290     /**
291      * Get all the cache names we have facades for.
292      * <p>
293      * @return ArrayList
294      */
295     protected ArrayList<String> getCacheNames()
296     {
297         ArrayList<String> names = new ArrayList<String>();
298         names.addAll( cacheNames );
299         return names;
300     }
301 
302     /**
303      * @param attr The UDPDiscoveryAttributes to set.
304      */
305     public void setUdpDiscoveryAttributes( UDPDiscoveryAttributes attr )
306     {
307         this.udpDiscoveryAttributes = attr;
308     }
309 
310     /**
311      * @return Returns the lca.
312      */
313     public UDPDiscoveryAttributes getUdpDiscoveryAttributes()
314     {
315         return this.udpDiscoveryAttributes;
316     }
317 
318     /**
319      * Shuts down the receiver.
320      */
321     public void shutdown()
322     {
323         if ( !shutdown )
324         {
325             shutdown = true;
326 
327             if ( log.isInfoEnabled() )
328             {
329                 log.info( "Shutting down UDP discovery service receiver." );
330             }
331 
332             try
333             {
334                 // no good way to do this right now.
335                 receiver.shutdown();
336                 udpReceiverThread.interrupt();
337             }
338             catch ( Exception e )
339             {
340                 log.error( "Problem interrupting UDP receiver thread." );
341             }
342 
343             if ( log.isInfoEnabled() )
344             {
345                 log.info( "Shutting down UDP discovery service sender." );
346             }
347 
348             // also call the shutdown on the sender thread itself, which
349             // will result in a remove command.
350             try
351             {
352                 sender.shutdown();
353             }
354             catch ( Exception e )
355             {
356                 log.error( "Problem issuing remove broadcast via UDP sender." );
357             }
358         }
359         else
360         {
361             if ( log.isDebugEnabled() )
362             {
363                 log.debug( "Shutdown already called." );
364             }
365         }
366     }
367 
368     /**
369      * Call shutdown to be safe.
370      * <p>
371      * @throws Throwable on error
372      */
373     @Override
374     protected void finalize()
375         throws Throwable
376     {
377         super.finalize();
378 
379         // TODO reconsider this, since it uses the logger
380         shutdown();
381     }
382 
383     /**
384      * @param discoveredServices The discoveredServices to set.
385      */
386     public synchronized void setDiscoveredServices( Set<DiscoveredService> discoveredServices )
387     {
388         this.discoveredServices = discoveredServices;
389     }
390 
391     /**
392      * @return Returns the discoveredServices.
393      */
394     public synchronized Set<DiscoveredService> getDiscoveredServices()
395     {
396         return discoveredServices;
397     }
398 
399     /**
400      * @return the discoveryListeners
401      */
402     private Set<IDiscoveryListener> getDiscoveryListeners()
403     {
404         return discoveryListeners;
405     }
406 
407     /**
408      * @return the discoveryListeners
409      */
410     public Set<IDiscoveryListener> getCopyOfDiscoveryListeners()
411     {
412         Set<IDiscoveryListener> copy = new HashSet<IDiscoveryListener>();
413         copy.addAll( getDiscoveryListeners() );
414         return copy;
415     }
416 
417     /**
418      * Adds a listener.
419      * <p>
420      * @param listener
421      * @return true if it wasn't already in the set
422      */
423     public boolean addDiscoveryListener( IDiscoveryListener listener )
424     {
425         return getDiscoveryListeners().add( listener );
426     }
427 
428     /**
429      * Removes a listener.
430      * <p>
431      * @param listener
432      * @return true if it was in the set
433      */
434     public boolean removeDiscoveryListener( IDiscoveryListener listener )
435     {
436         return getDiscoveryListeners().remove( listener );
437     }
438 }