View Javadoc
1   package org.apache.commons.jcs3.utils.discovery;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.IOException;
23  import java.net.Inet6Address;
24  import java.net.InetAddress;
25  import java.net.InetSocketAddress;
26  import java.net.NetworkInterface;
27  import java.net.StandardProtocolFamily;
28  import java.net.StandardSocketOptions;
29  import java.nio.ByteBuffer;
30  import java.nio.channels.DatagramChannel;
31  import java.nio.channels.MembershipKey;
32  import java.nio.channels.SelectionKey;
33  import java.nio.channels.Selector;
34  import java.util.Iterator;
35  import java.util.concurrent.ArrayBlockingQueue;
36  import java.util.concurrent.ExecutorService;
37  import java.util.concurrent.atomic.AtomicBoolean;
38  import java.util.concurrent.atomic.AtomicInteger;
39  
40  import org.apache.commons.jcs3.engine.CacheInfo;
41  import org.apache.commons.jcs3.engine.behavior.IElementSerializer;
42  import org.apache.commons.jcs3.engine.behavior.IShutdownObserver;
43  import org.apache.commons.jcs3.log.Log;
44  import org.apache.commons.jcs3.log.LogManager;
45  import org.apache.commons.jcs3.utils.net.HostNameUtil;
46  import org.apache.commons.jcs3.utils.threadpool.PoolConfiguration;
47  import org.apache.commons.jcs3.utils.threadpool.PoolConfiguration.WhenBlockedPolicy;
48  import org.apache.commons.jcs3.utils.threadpool.ThreadPoolManager;
49  
50  /** Receives UDP Discovery messages. */
51  public class UDPDiscoveryReceiver
52      implements Runnable, IShutdownObserver
53  {
54      /** The log factory */
55      private static final Log log = LogManager.getLog( UDPDiscoveryReceiver.class );
56  
57      /** The channel used for communication. */
58      private DatagramChannel multicastChannel;
59  
60      /** The group membership key. */
61      private MembershipKey multicastGroupKey;
62  
63      /** The selector. */
64      private Selector selector;
65  
66      /**
67       * TODO: Consider using the threadpool manager to get this thread pool. For now place a tight
68       * restriction on the pool size
69       */
70      private static final int maxPoolSize = 2;
71  
72      /** The processor */
73      private final ExecutorService pooledExecutor;
74  
75      /** number of messages received. For debugging and testing. */
76      private final AtomicInteger cnt = new AtomicInteger(0);
77  
78      /** Service to get cache names and handle request broadcasts */
79      private final UDPDiscoveryService service;
80  
81      /** Serializer */
82      private IElementSerializer serializer;
83  
84      /** Is it shutdown. */
85      private final AtomicBoolean shutdown = new AtomicBoolean(false);
86  
87      /**
88       * Constructor for the UDPDiscoveryReceiver object.
89       * <p>
90       * We determine our own host using InetAddress
91       *<p>
92       * @param service
93       * @param multicastInterfaceString
94       * @param multicastAddressString
95       * @param multicastPort
96       * @throws IOException
97       */
98      public UDPDiscoveryReceiver( final UDPDiscoveryService service,
99              final String multicastInterfaceString,
100             final String multicastAddressString,
101             final int multicastPort )
102         throws IOException
103     {
104         this(service, multicastInterfaceString,
105                 InetAddress.getByName( multicastAddressString ),
106                 multicastPort);
107     }
108 
109     /**
110      * Constructor for the UDPDiscoveryReceiver object.
111      * <p>
112      * @param service
113      * @param multicastInterfaceString
114      * @param multicastAddress
115      * @param multicastPort
116      * @throws IOException
117      * @since 3.1
118      */
119     public UDPDiscoveryReceiver( final UDPDiscoveryService service,
120             final String multicastInterfaceString,
121             final InetAddress multicastAddress,
122             final int multicastPort )
123         throws IOException
124     {
125         this.service = service;
126         if (service != null)
127         {
128             this.serializer = service.getSerializer();
129         }
130 
131         // create a small thread pool to handle a barrage
132         this.pooledExecutor = ThreadPoolManager.getInstance().createPool(
133                 new PoolConfiguration(false, 0, maxPoolSize, maxPoolSize, 0,
134                         WhenBlockedPolicy.DISCARDOLDEST, maxPoolSize),
135                 "JCS-UDPDiscoveryReceiver-", Thread.MIN_PRIORITY);
136 
137         log.info( "Constructing listener, [{0}:{1}]", multicastAddress, multicastPort );
138         createSocket( multicastInterfaceString, multicastAddress, multicastPort );
139     }
140 
141     /**
142      * Creates the socket for this class.
143      * <p>
144      * @param multicastInterfaceString
145      * @param multicastAddress
146      * @param multicastPort
147      * @throws IOException
148      */
149     private void createSocket( final String multicastInterfaceString, final InetAddress multicastAddress,
150             final int multicastPort )
151         throws IOException
152     {
153         try
154         {
155             // Use dedicated interface if specified
156             NetworkInterface multicastInterface = null;
157             if (multicastInterfaceString != null)
158             {
159                 multicastInterface = NetworkInterface.getByName(multicastInterfaceString);
160             }
161             else
162             {
163                 multicastInterface = HostNameUtil.getMulticastNetworkInterface();
164             }
165             if (multicastInterface != null)
166             {
167                 log.info("Using network interface {0}", multicastInterface::getDisplayName);
168             }
169 
170             multicastChannel = DatagramChannel.open(
171                     multicastAddress instanceof Inet6Address ?
172                             StandardProtocolFamily.INET6 : StandardProtocolFamily.INET)
173                     .setOption(StandardSocketOptions.SO_REUSEADDR, true)
174                     .setOption(StandardSocketOptions.IP_MULTICAST_IF, multicastInterface)
175                     .bind(new InetSocketAddress(multicastPort));
176             multicastChannel.configureBlocking(false);
177 
178             log.info("Joining Group: [{0}] on {1}", multicastAddress, multicastInterface);
179             multicastGroupKey = multicastChannel.join(multicastAddress, multicastInterface);
180 
181             selector = Selector.open();
182             multicastChannel.register(selector, SelectionKey.OP_READ);
183         }
184         catch ( final IOException e )
185         {
186             log.error( "Could not bind to multicast address [{0}:{1}]", multicastAddress,
187                     multicastPort, e );
188             throw e;
189         }
190     }
191 
192     private final ArrayBlockingQueue<UDPDiscoveryMessage> msgQueue =
193             new ArrayBlockingQueue<>(maxPoolSize);
194 
195     /**
196      * Wait for multicast message
197      *
198      * @return the object message
199      * @throws IOException
200      * @deprecated no longer used
201      */
202     @Deprecated
203     public Object waitForMessage()
204         throws IOException
205     {
206         try
207         {
208             return msgQueue.take();
209         }
210         catch (InterruptedException e)
211         {
212             throw new IOException("Interrupted waiting for message", e);
213         }
214     }
215 
216     /** Main processing method for the UDPDiscoveryReceiver object */
217     @Override
218     public void run()
219     {
220         try
221         {
222             log.debug( "Waiting for message." );
223 
224             while (!shutdown.get())
225             {
226                 int activeKeys = selector.select();
227                 if (activeKeys == 0)
228                 {
229                     continue;
230                 }
231 
232                 for (Iterator<SelectionKey> i = selector.selectedKeys().iterator(); i.hasNext();)
233                 {
234                     if (shutdown.get())
235                     {
236                         break;
237                     }
238 
239                     SelectionKey key = i.next();
240 
241                     if (!key.isValid())
242                     {
243                         continue;
244                     }
245 
246                     if (key.isReadable())
247                     {
248                         cnt.incrementAndGet();
249                         log.debug( "{0} messages received.", this::getCnt );
250 
251                         DatagramChannel mc = (DatagramChannel) key.channel();
252 
253                         ByteBuffer byteBuffer = ByteBuffer.allocate(65536);
254                         InetSocketAddress sourceAddress =
255                                 (InetSocketAddress) mc.receive(byteBuffer);
256                         byteBuffer.flip();
257 
258                         try
259                         {
260                             log.debug("Received packet from address [{0}]", sourceAddress);
261                             byte[] bytes = new byte[byteBuffer.limit()];
262                             byteBuffer.get(bytes);
263                             Object obj = serializer.deSerialize(bytes, null);
264 
265                             if (obj instanceof UDPDiscoveryMessage)
266                             {
267                                 // Ensure that the address we're supposed to send to is, indeed, the address
268                                 // of the machine on the other end of this connection.  This guards against
269                                 // instances where we don't exactly get the right local host address
270                                 final UDPDiscoveryMessage msg = (UDPDiscoveryMessage) obj;
271                                 msg.setHost(sourceAddress.getHostString());
272 
273                                 log.debug( "Read object from address [{0}], object=[{1}]",
274                                         sourceAddress, obj );
275 
276                                 // Just to keep the functionality of the deprecated waitForMessage method
277                                 synchronized (msgQueue)
278                                 {
279                                     // Check if queue full already?
280                                     if (msgQueue.remainingCapacity() == 0)
281                                     {
282                                         // remove oldest element from queue
283                                         msgQueue.remove();
284                                     }
285 
286                                     msgQueue.add(msg);
287                                 }
288 
289                                 pooledExecutor.execute(() -> handleMessage(msg));
290                                 log.debug( "Passed handler to executor." );
291                             }
292                         }
293                         catch ( final IOException | ClassNotFoundException e )
294                         {
295                             log.error( "Error receiving multicast packet", e );
296                         }
297 
298                         i.remove();
299                     }
300                 }
301             } // end while
302         }
303         catch ( final IOException e )
304         {
305             log.error( "Unexpected exception in UDP receiver.", e );
306         }
307     }
308 
309     /**
310      * @param cnt The cnt to set.
311      */
312     public void setCnt( final int cnt )
313     {
314         this.cnt.set(cnt);
315     }
316 
317     /**
318      * @return Returns the cnt.
319      */
320     public int getCnt()
321     {
322         return cnt.get();
323     }
324 
325     /**
326      * For testing
327      *
328      * @param serializer the serializer to set
329      * @since 3.1
330      */
331     protected void setSerializer(IElementSerializer serializer)
332     {
333         this.serializer = serializer;
334     }
335 
336     /**
337      * Separate thread run when a command comes into the UDPDiscoveryReceiver.
338      * @deprecated No longer used
339      */
340     @Deprecated
341     public class MessageHandler
342         implements Runnable
343     {
344         /** The message to handle. Passed in during construction. */
345         private final UDPDiscoveryMessage message;
346 
347         /**
348          * @param message
349          */
350         public MessageHandler( final UDPDiscoveryMessage message )
351         {
352             this.message = message;
353         }
354 
355         /**
356          * Process the message.
357          */
358         @Override
359         public void run()
360         {
361             handleMessage(message);
362         }
363     }
364 
365     /**
366      * Separate thread run when a command comes into the UDPDiscoveryReceiver.
367      */
368     private void handleMessage(UDPDiscoveryMessage message)
369     {
370         // consider comparing ports here instead.
371         if ( message.getRequesterId() == CacheInfo.listenerId )
372         {
373             log.debug( "Ignoring message sent from self" );
374         }
375         else
376         {
377             log.debug( "Process message sent from another" );
378             log.debug( "Message = {0}", message );
379 
380             if ( message.getHost() == null || message.getCacheNames() == null || message.getCacheNames().isEmpty() )
381             {
382                 log.debug( "Ignoring invalid message: {0}", message );
383             }
384             else
385             {
386                 processMessage(message);
387             }
388         }
389     }
390 
391     /**
392      * Process the incoming message.
393      */
394     private void processMessage(UDPDiscoveryMessage message)
395     {
396         final DiscoveredService discoveredService = new DiscoveredService(message);
397 
398         switch (message.getMessageType())
399         {
400             case REMOVE:
401                 log.debug( "Removing service from set {0}", discoveredService );
402                 service.removeDiscoveredService( discoveredService );
403                 break;
404             case REQUEST:
405                 // if this is a request message, have the service handle it and
406                 // return
407                 log.debug( "Message is a Request Broadcast, will have the service handle it." );
408                 service.serviceRequestBroadcast();
409                 break;
410             case PASSIVE:
411             default:
412                 log.debug( "Adding or updating service to set {0}", discoveredService );
413                 service.addOrUpdateService( discoveredService );
414                 break;
415         }
416     }
417 
418     /** Shuts down the socket. */
419     @Override
420     public void shutdown()
421     {
422         if (shutdown.compareAndSet(false, true))
423         {
424             try
425             {
426                 selector.close();
427                 multicastGroupKey.drop();
428                 multicastChannel.close();
429             }
430             catch ( final IOException e )
431             {
432                 log.error( "Problem closing socket" );
433             }
434         }
435     }
436 }