View Javadoc
1   package org.apache.commons.jcs.utils.discovery;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.IOException;
23  import java.net.UnknownHostException;
24  import java.util.ArrayList;
25  import java.util.HashSet;
26  import java.util.Set;
27  import java.util.concurrent.CopyOnWriteArraySet;
28  import java.util.concurrent.ScheduledExecutorService;
29  import java.util.concurrent.TimeUnit;
30  
31  import org.apache.commons.jcs.engine.behavior.IRequireScheduler;
32  import org.apache.commons.jcs.engine.behavior.IShutdownObserver;
33  import org.apache.commons.jcs.utils.discovery.behavior.IDiscoveryListener;
34  import org.apache.commons.jcs.utils.net.HostNameUtil;
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  
38  /**
39   * This service creates a listener that can create lateral caches and add them to the no wait list.
40   * <p>
41   * It also creates a sender that periodically broadcasts its availability.
42   * <p>
43   * The sender also broadcasts a request for other caches to broadcast their addresses.
44   * <p>
45   * @author Aaron Smuts
46   */
47  public class UDPDiscoveryService
48      implements IShutdownObserver, IRequireScheduler
49  {
50      /** The logger */
51      private static final Log log = LogFactory.getLog( UDPDiscoveryService.class );
52  
53      /** thread that listens for messages */
54      private Thread udpReceiverThread;
55  
56      /** the runnable that the receiver thread runs */
57      private UDPDiscoveryReceiver receiver;
58  
59      /** the runnable that sends messages via the clock daemon */
60      private UDPDiscoverySenderThread sender = null;
61  
62      /** attributes */
63      private UDPDiscoveryAttributes udpDiscoveryAttributes = null;
64  
65      /** is this shut down? */
66      private boolean shutdown = false;
67  
68      /** This is a set of services that have been discovered. */
69      private Set<DiscoveredService> discoveredServices = new CopyOnWriteArraySet<DiscoveredService>();
70  
71      /** This a list of regions that are configured to use discovery. */
72      private final Set<String> cacheNames = new CopyOnWriteArraySet<String>();
73  
74      /** Set of listeners. */
75      private final Set<IDiscoveryListener> discoveryListeners = new CopyOnWriteArraySet<IDiscoveryListener>();
76  
77      /**
78       * @param attributes
79       */
80      public UDPDiscoveryService( UDPDiscoveryAttributes attributes)
81      {
82          udpDiscoveryAttributes = (UDPDiscoveryAttributes) attributes.clone();
83  
84          try
85          {
86              // todo, you should be able to set this
87              udpDiscoveryAttributes.setServiceAddress( HostNameUtil.getLocalHostAddress() );
88          }
89          catch ( UnknownHostException e )
90          {
91              log.error( "Couldn't get localhost address", e );
92          }
93  
94          try
95          {
96              // todo need some kind of recovery here.
97              receiver = new UDPDiscoveryReceiver( this, getUdpDiscoveryAttributes().getUdpDiscoveryAddr(),
98                                                   getUdpDiscoveryAttributes().getUdpDiscoveryPort() );
99          }
100         catch ( IOException e )
101         {
102             log.error( "Problem creating UDPDiscoveryReceiver, address ["
103                 + getUdpDiscoveryAttributes().getUdpDiscoveryAddr() + "] port ["
104                 + getUdpDiscoveryAttributes().getUdpDiscoveryPort() + "] we won't be able to find any other caches", e );
105         }
106 
107         // create a sender thread
108         sender = new UDPDiscoverySenderThread( getUdpDiscoveryAttributes(), getCacheNames() );
109     }
110 
111     /**
112      * @see org.apache.commons.jcs.engine.behavior.IRequireScheduler#setScheduledExecutorService(java.util.concurrent.ScheduledExecutorService)
113      */
114     @Override
115     public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutor)
116     {
117         if (sender != null)
118         {
119             scheduledExecutor.scheduleAtFixedRate(sender, 0, 15, TimeUnit.SECONDS);
120         }
121 
122         /** removes things that have been idle for too long */
123         UDPCleanupRunner cleanup = new UDPCleanupRunner( this );
124         // I'm going to use this as both, but it could happen
125         // that something could hang around twice the time using this as the
126         // delay and the idle time.
127         scheduledExecutor.scheduleAtFixedRate(cleanup, 0, getUdpDiscoveryAttributes().getMaxIdleTimeSec(), TimeUnit.SECONDS);
128     }
129 
130     /**
131      * Send a passive broadcast in response to a request broadcast. Never send a request for a
132      * request. We can respond to our own requests, since a request broadcast is not intended as a
133      * connection request. We might want to only send messages, so we would send a request, but
134      * never a passive broadcast.
135      */
136     protected void serviceRequestBroadcast()
137     {
138         UDPDiscoverySender sender1 = null;
139         try
140         {
141             // create this connection each time.
142             // more robust
143             sender1 = new UDPDiscoverySender( getUdpDiscoveryAttributes().getUdpDiscoveryAddr(),
144                                              getUdpDiscoveryAttributes().getUdpDiscoveryPort() );
145 
146             sender1.passiveBroadcast( getUdpDiscoveryAttributes().getServiceAddress(), getUdpDiscoveryAttributes()
147                 .getServicePort(), this.getCacheNames() );
148 
149             // todo we should consider sending a request broadcast every so
150             // often.
151 
152             if ( log.isDebugEnabled() )
153             {
154                 log.debug( "Called sender to issue a passive broadcast" );
155             }
156         }
157         catch ( Exception e )
158         {
159             log.error( "Problem calling the UDP Discovery Sender. address ["
160                 + getUdpDiscoveryAttributes().getUdpDiscoveryAddr() + "] port ["
161                 + getUdpDiscoveryAttributes().getUdpDiscoveryPort() + "]", e );
162         }
163         finally
164         {
165             try
166             {
167                 if ( sender1 != null )
168                 {
169                     sender1.destroy();
170                 }
171             }
172             catch ( Exception e )
173             {
174                 log.error( "Problem closing Passive Broadcast sender, while servicing a request broadcast.", e );
175             }
176         }
177     }
178 
179     /**
180      * Adds a region to the list that is participating in discovery.
181      * <p>
182      * @param cacheName
183      */
184     public void addParticipatingCacheName( String cacheName )
185     {
186         cacheNames.add( cacheName );
187         sender.setCacheNames( getCacheNames() );
188     }
189 
190     /**
191      * Removes the discovered service from the list and calls the discovery listener.
192      * <p>
193      * @param service
194      */
195     public void removeDiscoveredService( DiscoveredService service )
196     {
197         boolean contained = getDiscoveredServices().remove( service );
198 
199         if ( contained )
200         {
201             if ( log.isInfoEnabled() )
202             {
203                 log.info( "Removing " + service );
204             }
205         }
206 
207         for (IDiscoveryListener listener : getDiscoveryListeners())
208         {
209             listener.removeDiscoveredService( service );
210         }
211     }
212 
213     /**
214      * Add a service to the list. Update the held copy if we already know about it.
215      * <p>
216      * @param discoveredService discovered service
217      */
218     protected void addOrUpdateService( DiscoveredService discoveredService )
219     {
220         synchronized ( getDiscoveredServices() )
221         {
222             // Since this is a set we can add it over an over.
223             // We want to replace the old one, since we may add info that is not part of the equals.
224             // The equals method on the object being added is intentionally restricted.
225             if ( !getDiscoveredServices().contains( discoveredService ) )
226             {
227                 if ( log.isInfoEnabled() )
228                 {
229                     log.info( "Set does not contain service. I discovered " + discoveredService );
230                 }
231                 if ( log.isDebugEnabled() )
232                 {
233                     log.debug( "Adding service in the set " + discoveredService );
234                 }
235                 getDiscoveredServices().add( discoveredService );
236             }
237             else
238             {
239                 if ( log.isDebugEnabled() )
240                 {
241                     log.debug( "Set contains service." );
242                 }
243                 if ( log.isDebugEnabled() )
244                 {
245                     log.debug( "Updating service in the set " + discoveredService );
246                 }
247 
248                 // Update the list of cache names if it has changed.
249                 DiscoveredService theOldServiceInformation = null;
250                 // need to update the time this sucks. add has no effect convert to a map
251                 for (DiscoveredService service1 : getDiscoveredServices())
252                 {
253                     if ( discoveredService.equals( service1 ) )
254                     {
255                         theOldServiceInformation = service1;
256                         break;
257                     }
258                 }
259                 if ( theOldServiceInformation != null )
260                 {
261                     if ( !theOldServiceInformation.getCacheNames().equals( discoveredService.getCacheNames() ) )
262                     {
263                         if ( log.isInfoEnabled() )
264                         {
265                             log.info( "List of cache names changed for service: " + discoveredService );
266                         }
267                     }
268                 }
269 
270                 // replace it, we want to reset the payload and the last heard from time.
271                 getDiscoveredServices().remove( discoveredService );
272                 getDiscoveredServices().add( discoveredService );
273             }
274         }
275         // Always Notify the listeners
276         // If we don't do this, then if a region using the default config is initialized after notification,
277         // it will never get the service in it's no wait list.
278         // Leave it to the listeners to decide what to do.
279         for (IDiscoveryListener listener : getDiscoveryListeners())
280         {
281             listener.addDiscoveredService( discoveredService );
282         }
283 
284     }
285 
286     /**
287      * Get all the cache names we have facades for.
288      * <p>
289      * @return ArrayList
290      */
291     protected ArrayList<String> getCacheNames()
292     {
293         ArrayList<String> names = new ArrayList<String>();
294         names.addAll( cacheNames );
295         return names;
296     }
297 
298     /**
299      * @param attr The UDPDiscoveryAttributes to set.
300      */
301     public void setUdpDiscoveryAttributes( UDPDiscoveryAttributes attr )
302     {
303         this.udpDiscoveryAttributes = attr;
304     }
305 
306     /**
307      * @return Returns the lca.
308      */
309     public UDPDiscoveryAttributes getUdpDiscoveryAttributes()
310     {
311         return this.udpDiscoveryAttributes;
312     }
313 
314     /**
315      * Start necessary receiver thread
316      */
317     public void startup()
318     {
319         udpReceiverThread = new Thread( receiver );
320         udpReceiverThread.setDaemon( true );
321         // udpReceiverThread.setName( t.getName() + "--UDPReceiver" );
322         udpReceiverThread.start();
323     }
324 
325     /**
326      * Shuts down the receiver.
327      */
328     @Override
329     public void shutdown()
330     {
331         if ( !shutdown )
332         {
333             shutdown = true;
334 
335             if ( log.isInfoEnabled() )
336             {
337                 log.info( "Shutting down UDP discovery service receiver." );
338             }
339 
340             try
341             {
342                 // no good way to do this right now.
343                 receiver.shutdown();
344                 udpReceiverThread.interrupt();
345             }
346             catch ( Exception e )
347             {
348                 log.error( "Problem interrupting UDP receiver thread." );
349             }
350 
351             if ( log.isInfoEnabled() )
352             {
353                 log.info( "Shutting down UDP discovery service sender." );
354             }
355 
356             // also call the shutdown on the sender thread itself, which
357             // will result in a remove command.
358             try
359             {
360                 sender.shutdown();
361             }
362             catch ( Exception e )
363             {
364                 log.error( "Problem issuing remove broadcast via UDP sender." );
365             }
366         }
367         else
368         {
369             if ( log.isDebugEnabled() )
370             {
371                 log.debug( "Shutdown already called." );
372             }
373         }
374     }
375 
376     /**
377      * @return Returns the discoveredServices.
378      */
379     public synchronized Set<DiscoveredService> getDiscoveredServices()
380     {
381         return discoveredServices;
382     }
383 
384     /**
385      * @return the discoveryListeners
386      */
387     private Set<IDiscoveryListener> getDiscoveryListeners()
388     {
389         return discoveryListeners;
390     }
391 
392     /**
393      * @return the discoveryListeners
394      */
395     public Set<IDiscoveryListener> getCopyOfDiscoveryListeners()
396     {
397         Set<IDiscoveryListener> copy = new HashSet<IDiscoveryListener>();
398         copy.addAll( getDiscoveryListeners() );
399         return copy;
400     }
401 
402     /**
403      * Adds a listener.
404      * <p>
405      * @param listener
406      * @return true if it wasn't already in the set
407      */
408     public boolean addDiscoveryListener( IDiscoveryListener listener )
409     {
410         return getDiscoveryListeners().add( listener );
411     }
412 
413     /**
414      * Removes a listener.
415      * <p>
416      * @param listener
417      * @return true if it was in the set
418      */
419     public boolean removeDiscoveryListener( IDiscoveryListener listener )
420     {
421         return getDiscoveryListeners().remove( listener );
422     }
423 }