View Javadoc
1   package org.apache.commons.jcs3.utils.discovery;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.IOException;
23  import java.net.Inet4Address;
24  import java.net.Inet6Address;
25  import java.net.InetAddress;
26  import java.net.NetworkInterface;
27  import java.net.UnknownHostException;
28  import java.util.ArrayList;
29  import java.util.Enumeration;
30  import java.util.HashSet;
31  import java.util.Set;
32  import java.util.concurrent.ConcurrentHashMap;
33  import java.util.concurrent.ConcurrentMap;
34  import java.util.concurrent.CopyOnWriteArraySet;
35  import java.util.concurrent.ScheduledExecutorService;
36  import java.util.concurrent.ScheduledFuture;
37  import java.util.concurrent.TimeUnit;
38  import java.util.concurrent.atomic.AtomicBoolean;
39  
40  import org.apache.commons.jcs3.engine.behavior.IElementSerializer;
41  import org.apache.commons.jcs3.engine.behavior.IRequireScheduler;
42  import org.apache.commons.jcs3.engine.behavior.IShutdownObserver;
43  import org.apache.commons.jcs3.log.Log;
44  import org.apache.commons.jcs3.log.LogManager;
45  import org.apache.commons.jcs3.utils.discovery.behavior.IDiscoveryListener;
46  import org.apache.commons.jcs3.utils.net.HostNameUtil;
47  import org.apache.commons.jcs3.utils.serialization.StandardSerializer;
48  
49  /**
50   * This service creates a listener that can create lateral caches and add them to the no wait list.
51   * <p>
52   * It also creates a sender that periodically broadcasts its availability.
53   * </p>
54   * <p>
55   * The sender also broadcasts a request for other caches to broadcast their addresses.
56   * </p>
57   */
58  public class UDPDiscoveryService
59      implements IShutdownObserver, IRequireScheduler
60  {
61      /** The logger */
62      private static final Log log = LogManager.getLog( UDPDiscoveryService.class );
63  
64      /** thread that listens for messages */
65      private Thread udpReceiverThread;
66  
67      /** the runnable that the receiver thread runs */
68      private UDPDiscoveryReceiver receiver;
69  
70      /** attributes */
71      private UDPDiscoveryAttributes udpDiscoveryAttributes;
72  
73      /** Used to serialize messages */
74      private final IElementSerializer serializer;
75  
76      /** is this shut down? */
77      private final AtomicBoolean shutdown = new AtomicBoolean(false);
78  
79      /** This is a set of services that have been discovered. */
80      private final ConcurrentMap<Integer, DiscoveredService> discoveredServices =
81              new ConcurrentHashMap<>();
82  
83      /** This a list of regions that are configured to use discovery. */
84      private final Set<String> cacheNames = new CopyOnWriteArraySet<>();
85  
86      /** Set of listeners. */
87      private final Set<IDiscoveryListener> discoveryListeners = new CopyOnWriteArraySet<>();
88  
89      /** Handle to cancel the scheduled broadcast task */
90      private ScheduledFuture<?> broadcastTaskFuture;
91  
92      /** Handle to cancel the scheduled cleanup task */
93      private ScheduledFuture<?> cleanupTaskFuture;
94  
95      /**
96       * Constructor
97       *
98       * @param attributes settings of the service
99       * @deprecated Specify serializer implementation explicitly
100      */
101     @Deprecated
102     public UDPDiscoveryService(final UDPDiscoveryAttributes attributes)
103     {
104         this(attributes, new StandardSerializer());
105     }
106 
107     /**
108      * Constructor
109      *
110      * @param attributes settings of service
111      * @param serializer the serializer to use to send and receive messages
112      * @since 3.1
113      */
114     public UDPDiscoveryService(final UDPDiscoveryAttributes attributes, IElementSerializer serializer)
115     {
116         this.udpDiscoveryAttributes = attributes.clone();
117         this.serializer = serializer;
118 
119         try
120         {
121             InetAddress multicastAddress = InetAddress.getByName(
122                     getUdpDiscoveryAttributes().getUdpDiscoveryAddr());
123 
124             // Set service address if still empty
125             if (getUdpDiscoveryAttributes().getServiceAddress() == null ||
126                     getUdpDiscoveryAttributes().getServiceAddress().isEmpty())
127             {
128                 // Use same interface as for multicast
129                 NetworkInterface serviceInterface = null;
130                 if (getUdpDiscoveryAttributes().getUdpDiscoveryInterface() != null)
131                 {
132                     serviceInterface = NetworkInterface.getByName(
133                             getUdpDiscoveryAttributes().getUdpDiscoveryInterface());
134                 }
135                 else
136                 {
137                     serviceInterface = HostNameUtil.getMulticastNetworkInterface();
138                 }
139 
140                 try
141                 {
142                     InetAddress serviceAddress = null;
143 
144                     for (Enumeration<InetAddress> addresses = serviceInterface.getInetAddresses();
145                             addresses.hasMoreElements();)
146                     {
147                         serviceAddress = addresses.nextElement();
148 
149                         if (multicastAddress instanceof Inet6Address)
150                         {
151                             if (serviceAddress instanceof Inet6Address &&
152                                 !serviceAddress.isLoopbackAddress() &&
153                                 !serviceAddress.isMulticastAddress() &&
154                                 serviceAddress.isLinkLocalAddress())
155                             {
156                                 // if Multicast uses IPv6, try to publish our IPv6 address
157                                 break;
158                             }
159                         }
160                         else
161                         {
162                             if (serviceAddress instanceof Inet4Address &&
163                                 !serviceAddress.isLoopbackAddress() &&
164                                 !serviceAddress.isMulticastAddress() &&
165                                 serviceAddress.isSiteLocalAddress())
166                             {
167                                 // if Multicast uses IPv4, try to publish our IPv4 address
168                                 break;
169                             }
170                         }
171                     }
172 
173                     if (serviceAddress == null)
174                     {
175                         // Nothing found for given interface, fall back
176                         serviceAddress = HostNameUtil.getLocalHostLANAddress();
177                     }
178 
179                     getUdpDiscoveryAttributes().setServiceAddress(serviceAddress.getHostAddress());
180                 }
181                 catch ( final UnknownHostException e )
182                 {
183                     log.error( "Couldn't get local host address", e );
184                 }
185             }
186 
187             // todo need some kind of recovery here.
188             receiver = new UDPDiscoveryReceiver( this,
189                     getUdpDiscoveryAttributes().getUdpDiscoveryInterface(),
190                     multicastAddress,
191                     getUdpDiscoveryAttributes().getUdpDiscoveryPort() );
192         }
193         catch ( final IOException e )
194         {
195             log.error( "Problem creating UDPDiscoveryReceiver, address [{0}] "
196                     + "port [{1}] we won't be able to find any other caches",
197                     getUdpDiscoveryAttributes().getUdpDiscoveryAddr(),
198                     getUdpDiscoveryAttributes().getUdpDiscoveryPort(), e );
199         }
200 
201         // initiate sender broadcast
202         initiateBroadcast();
203     }
204 
205     /**
206      * @see org.apache.commons.jcs3.engine.behavior.IRequireScheduler#setScheduledExecutorService(java.util.concurrent.ScheduledExecutorService)
207      */
208     @Override
209     public void setScheduledExecutorService(final ScheduledExecutorService scheduledExecutor)
210     {
211         this.broadcastTaskFuture = scheduledExecutor.scheduleAtFixedRate(
212                 this::serviceRequestBroadcast, 0, 15, TimeUnit.SECONDS);
213 
214         /** removes things that have been idle for too long */
215         // I'm going to use this as both, but it could happen
216         // that something could hang around twice the time using this as the
217         // delay and the idle time.
218         this.cleanupTaskFuture = scheduledExecutor.scheduleAtFixedRate(
219                 this::cleanup, 0,
220                 getUdpDiscoveryAttributes().getMaxIdleTimeSec(), TimeUnit.SECONDS);
221     }
222 
223     /**
224      * This goes through the list of services and removes those that we haven't heard from in longer
225      * than the max idle time.
226      *
227      * @since 3.1
228      */
229     protected void cleanup()
230     {
231         final long now = System.currentTimeMillis();
232 
233         // the listeners need to be notified.
234         getDiscoveredServices().stream()
235             .filter(service -> {
236                 if (now - service.getLastHearFromTime() > getUdpDiscoveryAttributes().getMaxIdleTimeSec() * 1000)
237                 {
238                     log.info( "Removing service, since we haven't heard from it in "
239                             + "{0} seconds. service = {1}",
240                             getUdpDiscoveryAttributes().getMaxIdleTimeSec(), service );
241                     return true;
242                 }
243 
244                 return false;
245             })
246             // remove the bad ones
247             // call this so the listeners get notified
248             .forEach(this::removeDiscoveredService);
249     }
250 
251     /**
252      * Initial request that the other caches let it know their addresses.
253      *
254      * @since 3.1
255      */
256     public void initiateBroadcast()
257     {
258         log.debug( "Creating sender for discoveryAddress = [{0}] and "
259                 + "discoveryPort = [{1}] myHostName = [{2}] and port = [{3}]",
260                 () -> getUdpDiscoveryAttributes().getUdpDiscoveryAddr(),
261                 () -> getUdpDiscoveryAttributes().getUdpDiscoveryPort(),
262                 () -> getUdpDiscoveryAttributes().getServiceAddress(),
263                 () -> getUdpDiscoveryAttributes().getServicePort() );
264 
265         try (UDPDiscoverySender sender = new UDPDiscoverySender(
266                 getUdpDiscoveryAttributes(), getSerializer()))
267         {
268             sender.requestBroadcast();
269 
270             log.debug( "Sent a request broadcast to the group" );
271         }
272         catch ( final IOException e )
273         {
274             log.error( "Problem sending a Request Broadcast", e );
275         }
276     }
277 
278     /**
279      * Send a passive broadcast in response to a request broadcast. Never send a request for a
280      * request. We can respond to our own requests, since a request broadcast is not intended as a
281      * connection request. We might want to only send messages, so we would send a request, but
282      * never a passive broadcast.
283      */
284     protected void serviceRequestBroadcast()
285     {
286         // create this connection each time.
287         // more robust
288         try (UDPDiscoverySender sender = new UDPDiscoverySender(
289                 getUdpDiscoveryAttributes(), getSerializer()))
290         {
291             sender.passiveBroadcast(
292                     getUdpDiscoveryAttributes().getServiceAddress(),
293                     getUdpDiscoveryAttributes().getServicePort(),
294                     this.getCacheNames() );
295 
296             log.debug( "Called sender to issue a passive broadcast" );
297         }
298         catch ( final IOException e )
299         {
300             log.error( "Problem calling the UDP Discovery Sender, address [{0}] "
301                     + "port [{1}]",
302                     getUdpDiscoveryAttributes().getUdpDiscoveryAddr(),
303                     getUdpDiscoveryAttributes().getUdpDiscoveryPort(), e );
304         }
305     }
306 
307     /**
308      * Issues a remove broadcast to the others.
309      *
310      * @since 3.1
311      */
312     protected void shutdownBroadcast()
313     {
314         // create this connection each time.
315         // more robust
316         try (UDPDiscoverySender sender = new UDPDiscoverySender(
317                 getUdpDiscoveryAttributes(), getSerializer()))
318         {
319             sender.removeBroadcast(
320                     getUdpDiscoveryAttributes().getServiceAddress(),
321                     getUdpDiscoveryAttributes().getServicePort(),
322                     this.getCacheNames() );
323 
324             log.debug( "Called sender to issue a remove broadcast in shutdown." );
325         }
326         catch ( final IOException e )
327         {
328             log.error( "Problem calling the UDP Discovery Sender", e );
329         }
330     }
331 
332     /**
333      * Adds a region to the list that is participating in discovery.
334      * <p>
335      * @param cacheName
336      */
337     public void addParticipatingCacheName( final String cacheName )
338     {
339         cacheNames.add( cacheName );
340     }
341 
342     /**
343      * Removes the discovered service from the list and calls the discovery listener.
344      * <p>
345      * @param service
346      */
347     public void removeDiscoveredService( final DiscoveredService service )
348     {
349         if (discoveredServices.remove(service.hashCode()) != null)
350         {
351             log.info( "Removing {0}", service );
352         }
353 
354         getDiscoveryListeners().forEach(listener -> listener.removeDiscoveredService(service));
355     }
356 
357     /**
358      * Add a service to the list. Update the held copy if we already know about it.
359      * <p>
360      * @param discoveredService discovered service
361      */
362     protected void addOrUpdateService( final DiscoveredService discoveredService )
363     {
364         // We want to replace the old one, since we may add info that is not part of the equals.
365         // The equals method on the object being added is intentionally restricted.
366         discoveredServices.merge(discoveredService.hashCode(), discoveredService, (oldService, newService) -> {
367             log.debug( "Set contains service." );
368             log.debug( "Updating service in the set {0}", newService );
369 
370             // Update the list of cache names if it has changed.
371             // need to update the time this sucks. add has no effect convert to a map
372             if (!oldService.getCacheNames().equals(newService.getCacheNames()))
373             {
374                 log.info( "List of cache names changed for service: {0}", newService );
375 
376                 // replace it, we want to reset the payload and the last heard from time.
377                 return newService;
378             }
379 
380             if (oldService.getLastHearFromTime() != newService.getLastHearFromTime())
381             {
382                 return newService;
383             }
384 
385             return oldService;
386         });
387 
388         // Always Notify the listeners
389         // If we don't do this, then if a region using the default config is initialized after notification,
390         // it will never get the service in it's no wait list.
391         // Leave it to the listeners to decide what to do.
392         getDiscoveryListeners().forEach(listener -> listener.addDiscoveredService(discoveredService));
393     }
394 
395     /**
396      * Get all the cache names we have facades for.
397      * <p>
398      * @return ArrayList
399      */
400     protected ArrayList<String> getCacheNames()
401     {
402         return new ArrayList<>(cacheNames);
403     }
404 
405     /**
406      * @param attr The UDPDiscoveryAttributes to set.
407      */
408     public void setUdpDiscoveryAttributes( final UDPDiscoveryAttributes attr )
409     {
410         this.udpDiscoveryAttributes = attr;
411     }
412 
413     /**
414      * @return Returns the lca.
415      */
416     public UDPDiscoveryAttributes getUdpDiscoveryAttributes()
417     {
418         return this.udpDiscoveryAttributes;
419     }
420 
421     /**
422      * Return the serializer implementation
423      *
424      * @return the serializer
425      * @since 3.1
426      */
427     public IElementSerializer getSerializer()
428     {
429         return serializer;
430     }
431 
432     /**
433      * Start necessary receiver thread
434      */
435     public void startup()
436     {
437         udpReceiverThread = new Thread(receiver);
438         udpReceiverThread.setDaemon(true);
439         // udpReceiverThread.setName( t.getName() + "--UDPReceiver" );
440         udpReceiverThread.start();
441     }
442 
443     /**
444      * Shuts down the receiver.
445      */
446     @Override
447     public void shutdown()
448     {
449         if (shutdown.compareAndSet(false, true))
450         {
451             // Stop the scheduled tasks
452             if (broadcastTaskFuture != null)
453             {
454                 broadcastTaskFuture.cancel(false);
455             }
456             if (cleanupTaskFuture != null)
457             {
458                 cleanupTaskFuture.cancel(false);
459             }
460 
461             if (receiver != null)
462             {
463                 log.info( "Shutting down UDP discovery service receiver." );
464                 receiver.shutdown();
465             }
466 
467             log.info( "Shutting down UDP discovery service sender." );
468             // also call the shutdown on the sender thread itself, which
469             // will result in a remove command.
470             shutdownBroadcast();
471         }
472         else
473         {
474             log.debug( "Shutdown already called." );
475         }
476     }
477 
478     /**
479      * @return Returns the discoveredServices.
480      */
481     public Set<DiscoveredService> getDiscoveredServices()
482     {
483         return new HashSet<>(discoveredServices.values());
484     }
485 
486     /**
487      * @return the discoveryListeners
488      */
489     private Set<IDiscoveryListener> getDiscoveryListeners()
490     {
491         return discoveryListeners;
492     }
493 
494     /**
495      * @return the discoveryListeners
496      */
497     public Set<IDiscoveryListener> getCopyOfDiscoveryListeners()
498     {
499         return new HashSet<>(getDiscoveryListeners());
500     }
501 
502     /**
503      * Adds a listener.
504      * <p>
505      * @param listener
506      * @return true if it wasn't already in the set
507      */
508     public boolean addDiscoveryListener( final IDiscoveryListener listener )
509     {
510         return getDiscoveryListeners().add( listener );
511     }
512 
513     /**
514      * Removes a listener.
515      * <p>
516      * @param listener
517      * @return true if it was in the set
518      */
519     public boolean removeDiscoveryListener( final IDiscoveryListener listener )
520     {
521         return getDiscoveryListeners().remove( listener );
522     }
523 }