001package org.apache.commons.jcs3.utils.discovery;
002
003/*
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements.  See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership.  The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License.  You may obtain a copy of the License at
011 *
012 *   http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing,
015 * software distributed under the License is distributed on an
016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017 * KIND, either express or implied.  See the License for the
018 * specific language governing permissions and limitations
019 * under the License.
020 */
021
022import java.io.IOException;
023import java.net.Inet6Address;
024import java.net.InetAddress;
025import java.net.InetSocketAddress;
026import java.net.NetworkInterface;
027import java.net.StandardProtocolFamily;
028import java.net.StandardSocketOptions;
029import java.nio.ByteBuffer;
030import java.nio.channels.DatagramChannel;
031import java.nio.channels.MembershipKey;
032import java.nio.channels.SelectionKey;
033import java.nio.channels.Selector;
034import java.util.Iterator;
035import java.util.concurrent.ArrayBlockingQueue;
036import java.util.concurrent.ExecutorService;
037import java.util.concurrent.atomic.AtomicBoolean;
038import java.util.concurrent.atomic.AtomicInteger;
039
040import org.apache.commons.jcs3.engine.CacheInfo;
041import org.apache.commons.jcs3.engine.behavior.IElementSerializer;
042import org.apache.commons.jcs3.engine.behavior.IShutdownObserver;
043import org.apache.commons.jcs3.log.Log;
044import org.apache.commons.jcs3.log.LogManager;
045import org.apache.commons.jcs3.utils.net.HostNameUtil;
046import org.apache.commons.jcs3.utils.threadpool.PoolConfiguration;
047import org.apache.commons.jcs3.utils.threadpool.PoolConfiguration.WhenBlockedPolicy;
048import org.apache.commons.jcs3.utils.threadpool.ThreadPoolManager;
049
050/** Receives UDP Discovery messages. */
051public class UDPDiscoveryReceiver
052    implements Runnable, IShutdownObserver
053{
054    /** The log factory */
055    private static final Log log = LogManager.getLog( UDPDiscoveryReceiver.class );
056
057    /** The channel used for communication. */
058    private DatagramChannel multicastChannel;
059
060    /** The group membership key. */
061    private MembershipKey multicastGroupKey;
062
063    /** The selector. */
064    private Selector selector;
065
066    /**
067     * TODO: Consider using the threadpool manager to get this thread pool. For now place a tight
068     * restriction on the pool size
069     */
070    private static final int maxPoolSize = 2;
071
072    /** The processor */
073    private final ExecutorService pooledExecutor;
074
075    /** number of messages received. For debugging and testing. */
076    private final AtomicInteger cnt = new AtomicInteger(0);
077
078    /** Service to get cache names and handle request broadcasts */
079    private final UDPDiscoveryService service;
080
081    /** Serializer */
082    private IElementSerializer serializer;
083
084    /** Is it shutdown. */
085    private final AtomicBoolean shutdown = new AtomicBoolean(false);
086
087    /**
088     * Constructor for the UDPDiscoveryReceiver object.
089     * <p>
090     * We determine our own host using InetAddress
091     *<p>
092     * @param service
093     * @param multicastInterfaceString
094     * @param multicastAddressString
095     * @param multicastPort
096     * @throws IOException
097     */
098    public UDPDiscoveryReceiver( final UDPDiscoveryService service,
099            final String multicastInterfaceString,
100            final String multicastAddressString,
101            final int multicastPort )
102        throws IOException
103    {
104        this(service, multicastInterfaceString,
105                InetAddress.getByName( multicastAddressString ),
106                multicastPort);
107    }
108
109    /**
110     * Constructor for the UDPDiscoveryReceiver object.
111     * <p>
112     * @param service
113     * @param multicastInterfaceString
114     * @param multicastAddress
115     * @param multicastPort
116     * @throws IOException
117     * @since 3.1
118     */
119    public UDPDiscoveryReceiver( final UDPDiscoveryService service,
120            final String multicastInterfaceString,
121            final InetAddress multicastAddress,
122            final int multicastPort )
123        throws IOException
124    {
125        this.service = service;
126        if (service != null)
127        {
128            this.serializer = service.getSerializer();
129        }
130
131        // create a small thread pool to handle a barrage
132        this.pooledExecutor = ThreadPoolManager.getInstance().createPool(
133                new PoolConfiguration(false, 0, maxPoolSize, maxPoolSize, 0,
134                        WhenBlockedPolicy.DISCARDOLDEST, maxPoolSize),
135                "JCS-UDPDiscoveryReceiver-", Thread.MIN_PRIORITY);
136
137        log.info( "Constructing listener, [{0}:{1}]", multicastAddress, multicastPort );
138        createSocket( multicastInterfaceString, multicastAddress, multicastPort );
139    }
140
141    /**
142     * Creates the socket for this class.
143     * <p>
144     * @param multicastInterfaceString
145     * @param multicastAddress
146     * @param multicastPort
147     * @throws IOException
148     */
149    private void createSocket( final String multicastInterfaceString, final InetAddress multicastAddress,
150            final int multicastPort )
151        throws IOException
152    {
153        try
154        {
155            // Use dedicated interface if specified
156            NetworkInterface multicastInterface = null;
157            if (multicastInterfaceString != null)
158            {
159                multicastInterface = NetworkInterface.getByName(multicastInterfaceString);
160            }
161            else
162            {
163                multicastInterface = HostNameUtil.getMulticastNetworkInterface();
164            }
165            if (multicastInterface != null)
166            {
167                log.info("Using network interface {0}", multicastInterface::getDisplayName);
168            }
169
170            multicastChannel = DatagramChannel.open(
171                    multicastAddress instanceof Inet6Address ?
172                            StandardProtocolFamily.INET6 : StandardProtocolFamily.INET)
173                    .setOption(StandardSocketOptions.SO_REUSEADDR, true)
174                    .setOption(StandardSocketOptions.IP_MULTICAST_IF, multicastInterface)
175                    .bind(new InetSocketAddress(multicastPort));
176            multicastChannel.configureBlocking(false);
177
178            log.info("Joining Group: [{0}] on {1}", multicastAddress, multicastInterface);
179            multicastGroupKey = multicastChannel.join(multicastAddress, multicastInterface);
180
181            selector = Selector.open();
182            multicastChannel.register(selector, SelectionKey.OP_READ);
183        }
184        catch ( final IOException e )
185        {
186            log.error( "Could not bind to multicast address [{0}:{1}]", multicastAddress,
187                    multicastPort, e );
188            throw e;
189        }
190    }
191
192    private final ArrayBlockingQueue<UDPDiscoveryMessage> msgQueue =
193            new ArrayBlockingQueue<>(maxPoolSize);
194
195    /**
196     * Wait for multicast message
197     *
198     * @return the object message
199     * @throws IOException
200     * @deprecated no longer used
201     */
202    @Deprecated
203    public Object waitForMessage()
204        throws IOException
205    {
206        try
207        {
208            return msgQueue.take();
209        }
210        catch (InterruptedException e)
211        {
212            throw new IOException("Interrupted waiting for message", e);
213        }
214    }
215
216    /** Main processing method for the UDPDiscoveryReceiver object */
217    @Override
218    public void run()
219    {
220        try
221        {
222            log.debug( "Waiting for message." );
223
224            while (!shutdown.get())
225            {
226                int activeKeys = selector.select();
227                if (activeKeys == 0)
228                {
229                    continue;
230                }
231
232                for (Iterator<SelectionKey> i = selector.selectedKeys().iterator(); i.hasNext();)
233                {
234                    if (shutdown.get())
235                    {
236                        break;
237                    }
238
239                    SelectionKey key = i.next();
240
241                    if (!key.isValid())
242                    {
243                        continue;
244                    }
245
246                    if (key.isReadable())
247                    {
248                        cnt.incrementAndGet();
249                        log.debug( "{0} messages received.", this::getCnt );
250
251                        DatagramChannel mc = (DatagramChannel) key.channel();
252
253                        ByteBuffer byteBuffer = ByteBuffer.allocate(65536);
254                        InetSocketAddress sourceAddress =
255                                (InetSocketAddress) mc.receive(byteBuffer);
256                        byteBuffer.flip();
257
258                        try
259                        {
260                            log.debug("Received packet from address [{0}]", sourceAddress);
261                            byte[] bytes = new byte[byteBuffer.limit()];
262                            byteBuffer.get(bytes);
263                            Object obj = serializer.deSerialize(bytes, null);
264
265                            if (obj instanceof UDPDiscoveryMessage)
266                            {
267                                // Ensure that the address we're supposed to send to is, indeed, the address
268                                // of the machine on the other end of this connection.  This guards against
269                                // instances where we don't exactly get the right local host address
270                                final UDPDiscoveryMessage msg = (UDPDiscoveryMessage) obj;
271                                msg.setHost(sourceAddress.getHostString());
272
273                                log.debug( "Read object from address [{0}], object=[{1}]",
274                                        sourceAddress, obj );
275
276                                // Just to keep the functionality of the deprecated waitForMessage method
277                                synchronized (msgQueue)
278                                {
279                                    // Check if queue full already?
280                                    if (msgQueue.remainingCapacity() == 0)
281                                    {
282                                        // remove oldest element from queue
283                                        msgQueue.remove();
284                                    }
285
286                                    msgQueue.add(msg);
287                                }
288
289                                pooledExecutor.execute(() -> handleMessage(msg));
290                                log.debug( "Passed handler to executor." );
291                            }
292                        }
293                        catch ( final IOException | ClassNotFoundException e )
294                        {
295                            log.error( "Error receiving multicast packet", e );
296                        }
297
298                        i.remove();
299                    }
300                }
301            } // end while
302        }
303        catch ( final IOException e )
304        {
305            log.error( "Unexpected exception in UDP receiver.", e );
306        }
307    }
308
309    /**
310     * @param cnt The cnt to set.
311     */
312    public void setCnt( final int cnt )
313    {
314        this.cnt.set(cnt);
315    }
316
317    /**
318     * @return Returns the cnt.
319     */
320    public int getCnt()
321    {
322        return cnt.get();
323    }
324
325    /**
326     * For testing
327     *
328     * @param serializer the serializer to set
329     * @since 3.1
330     */
331    protected void setSerializer(IElementSerializer serializer)
332    {
333        this.serializer = serializer;
334    }
335
336    /**
337     * Separate thread run when a command comes into the UDPDiscoveryReceiver.
338     * @deprecated No longer used
339     */
340    @Deprecated
341    public class MessageHandler
342        implements Runnable
343    {
344        /** The message to handle. Passed in during construction. */
345        private final UDPDiscoveryMessage message;
346
347        /**
348         * @param message
349         */
350        public MessageHandler( final UDPDiscoveryMessage message )
351        {
352            this.message = message;
353        }
354
355        /**
356         * Process the message.
357         */
358        @Override
359        public void run()
360        {
361            handleMessage(message);
362        }
363    }
364
365    /**
366     * Separate thread run when a command comes into the UDPDiscoveryReceiver.
367     */
368    private void handleMessage(UDPDiscoveryMessage message)
369    {
370        // consider comparing ports here instead.
371        if ( message.getRequesterId() == CacheInfo.listenerId )
372        {
373            log.debug( "Ignoring message sent from self" );
374        }
375        else
376        {
377            log.debug( "Process message sent from another" );
378            log.debug( "Message = {0}", message );
379
380            if ( message.getHost() == null || message.getCacheNames() == null || message.getCacheNames().isEmpty() )
381            {
382                log.debug( "Ignoring invalid message: {0}", message );
383            }
384            else
385            {
386                processMessage(message);
387            }
388        }
389    }
390
391    /**
392     * Process the incoming message.
393     */
394    private void processMessage(UDPDiscoveryMessage message)
395    {
396        final DiscoveredService discoveredService = new DiscoveredService(message);
397
398        switch (message.getMessageType())
399        {
400            case REMOVE:
401                log.debug( "Removing service from set {0}", discoveredService );
402                service.removeDiscoveredService( discoveredService );
403                break;
404            case REQUEST:
405                // if this is a request message, have the service handle it and
406                // return
407                log.debug( "Message is a Request Broadcast, will have the service handle it." );
408                service.serviceRequestBroadcast();
409                break;
410            case PASSIVE:
411            default:
412                log.debug( "Adding or updating service to set {0}", discoveredService );
413                service.addOrUpdateService( discoveredService );
414                break;
415        }
416    }
417
418    /** Shuts down the socket. */
419    @Override
420    public void shutdown()
421    {
422        if (shutdown.compareAndSet(false, true))
423        {
424            try
425            {
426                selector.close();
427                multicastGroupKey.drop();
428                multicastChannel.close();
429            }
430            catch ( final IOException e )
431            {
432                log.error( "Problem closing socket" );
433            }
434        }
435    }
436}