001package org.apache.commons.jcs.utils.discovery;
002
003/*
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements.  See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership.  The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License.  You may obtain a copy of the License at
011 *
012 *   http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing,
015 * software distributed under the License is distributed on an
016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017 * KIND, either express or implied.  See the License for the
018 * specific language governing permissions and limitations
019 * under the License.
020 */
021
022import java.io.ByteArrayInputStream;
023import java.io.IOException;
024import java.io.ObjectInputStream;
025import java.net.DatagramPacket;
026import java.net.InetAddress;
027import java.net.MulticastSocket;
028import java.util.concurrent.Executors;
029import java.util.concurrent.ThreadPoolExecutor;
030
031import org.apache.commons.jcs.engine.CacheInfo;
032import org.apache.commons.jcs.engine.behavior.IShutdownObserver;
033import org.apache.commons.jcs.io.ObjectInputStreamClassLoaderAware;
034import org.apache.commons.jcs.utils.discovery.UDPDiscoveryMessage.BroadcastType;
035import org.apache.commons.jcs.utils.threadpool.DaemonThreadFactory;
036import org.apache.commons.logging.Log;
037import org.apache.commons.logging.LogFactory;
038
039/** Receives UDP Discovery messages. */
040public class UDPDiscoveryReceiver
041    implements Runnable, IShutdownObserver
042{
043    /** The log factory */
044    private static final Log log = LogFactory.getLog( UDPDiscoveryReceiver.class );
045
046    /** buffer */
047    private final byte[] mBuffer = new byte[65536];
048
049    /** The socket used for communication. */
050    private MulticastSocket mSocket;
051
052    /**
053     * TODO: Consider using the threadpool manager to get this thread pool. For now place a tight
054     * restriction on the pool size
055     */
056    private static final int maxPoolSize = 2;
057
058    /** The processor */
059    private ThreadPoolExecutor pooledExecutor = null;
060
061    /** number of messages received. For debugging and testing. */
062    private int cnt = 0;
063
064    /** Service to get cache names and handle request broadcasts */
065    private UDPDiscoveryService service = null;
066
067    /** Address */
068    private String multicastAddressString = "";
069
070    /** The port */
071    private int multicastPort = 0;
072
073    /** Is it shutdown. */
074    private boolean shutdown = false;
075
076    /**
077     * Constructor for the LateralUDPReceiver object.
078     * <p>
079     * We determine out own host using InetAddress
080     *<p>
081     * @param service
082     * @param multicastAddressString
083     * @param multicastPort
084     * @throws IOException
085     */
086    public UDPDiscoveryReceiver( UDPDiscoveryService service, String multicastAddressString, int multicastPort )
087        throws IOException
088    {
089        this.service = service;
090        this.multicastAddressString = multicastAddressString;
091        this.multicastPort = multicastPort;
092
093        // create a small thread pool to handle a barrage
094        pooledExecutor = (ThreadPoolExecutor)Executors.newFixedThreadPool(maxPoolSize,
095                new DaemonThreadFactory("JCS-UDPDiscoveryReceiver-", Thread.MIN_PRIORITY));
096        pooledExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
097        //pooledExecutor.setMinimumPoolSize(1);
098
099        if ( log.isInfoEnabled() )
100        {
101            log.info( "Constructing listener, [" + this.multicastAddressString + ":" + this.multicastPort + "]" );
102        }
103
104        try
105        {
106            createSocket( this.multicastAddressString, this.multicastPort );
107        }
108        catch ( IOException ioe )
109        {
110            // consider eating this so we can go on, or constructing the socket
111            // later
112            throw ioe;
113        }
114    }
115
116    /**
117     * Creates the socket for this class.
118     * <p>
119     * @param multicastAddressString
120     * @param multicastPort
121     * @throws IOException
122     */
123    private void createSocket( String multicastAddressString, int multicastPort )
124        throws IOException
125    {
126        try
127        {
128            mSocket = new MulticastSocket( multicastPort );
129            if ( log.isInfoEnabled() )
130            {
131                log.info( "Joining Group: [" + InetAddress.getByName( multicastAddressString ) + "]" );
132            }
133            mSocket.joinGroup( InetAddress.getByName( multicastAddressString ) );
134        }
135        catch ( IOException e )
136        {
137            log.error( "Could not bind to multicast address [" + InetAddress.getByName( multicastAddressString ) + ":" + multicastPort + "]", e );
138            throw e;
139        }
140    }
141
142    /**
143     * Highly unreliable. If it is processing one message while another comes in, the second
144     * message is lost. This is for low concurrency peppering.
145     * <p>
146     * @return the object message
147     * @throws IOException
148     */
149    public Object waitForMessage()
150        throws IOException
151    {
152        final DatagramPacket packet = new DatagramPacket( mBuffer, mBuffer.length );
153        ObjectInputStream objectStream = null;
154        Object obj = null;
155        try
156        {
157            if ( log.isDebugEnabled() )
158            {
159                log.debug( "Waiting for message." );
160            }
161
162            mSocket.receive( packet );
163
164            if ( log.isDebugEnabled() )
165            {
166                log.debug( "Received packet from address [" + packet.getSocketAddress() + "]" );
167            }
168
169            final ByteArrayInputStream byteStream = new ByteArrayInputStream( mBuffer, 0, packet.getLength() );
170            objectStream = new ObjectInputStreamClassLoaderAware( byteStream, null );
171            obj = objectStream.readObject();
172
173            if ( obj instanceof UDPDiscoveryMessage )
174            {
175                // Ensure that the address we're supposed to send to is, indeed, the address
176                // of the machine on the other end of this connection.  This guards against
177                // instances where we don't exactly get the right local host address
178                UDPDiscoveryMessage msg = (UDPDiscoveryMessage) obj;
179                msg.setHost(packet.getAddress().getHostAddress());
180
181                    if ( log.isDebugEnabled() )
182                    {
183                        log.debug( "Read object from address [" + packet.getSocketAddress() + "], object=[" + obj + "]" );
184                    }
185            }
186        }
187        catch ( Exception e )
188        {
189            log.error( "Error receiving multicast packet", e );
190        }
191        finally
192        {
193            if (objectStream != null)
194            {
195                try
196                {
197                    objectStream.close();
198                }
199                catch (IOException e)
200                {
201                    log.error( "Error closing object stream", e );
202                }
203            }
204        }
205        return obj;
206    }
207
208    /** Main processing method for the LateralUDPReceiver object */
209    @Override
210    public void run()
211    {
212        try
213        {
214            while ( !shutdown )
215            {
216                Object obj = waitForMessage();
217
218                // not thread safe, but just for debugging
219                cnt++;
220
221                if ( log.isDebugEnabled() )
222                {
223                    log.debug( getCnt() + " messages received." );
224                }
225
226                UDPDiscoveryMessage message = null;
227
228                try
229                {
230                    message = (UDPDiscoveryMessage) obj;
231                    // check for null
232                    if ( message != null )
233                    {
234                        MessageHandler handler = new MessageHandler( message );
235
236                        pooledExecutor.execute( handler );
237
238                        if ( log.isDebugEnabled() )
239                        {
240                            log.debug( "Passed handler to executor." );
241                        }
242                    }
243                    else
244                    {
245                        log.warn( "message is null" );
246                    }
247                }
248                catch ( ClassCastException cce )
249                {
250                    log.warn( "Received unknown message type " + cce.getMessage() );
251                }
252            } // end while
253        }
254        catch ( Exception e )
255        {
256            log.error( "Unexpected exception in UDP receiver.", e );
257            try
258            {
259                Thread.sleep( 100 );
260                // TODO consider some failure count so we don't do this
261                // forever.
262            }
263            catch ( Exception e2 )
264            {
265                log.error( "Problem sleeping", e2 );
266            }
267        }
268    }
269
270    /**
271     * @param cnt The cnt to set.
272     */
273    public void setCnt( int cnt )
274    {
275        this.cnt = cnt;
276    }
277
278    /**
279     * @return Returns the cnt.
280     */
281    public int getCnt()
282    {
283        return cnt;
284    }
285
286    /**
287     * Separate thread run when a command comes into the UDPDiscoveryReceiver.
288     */
289    public class MessageHandler
290        implements Runnable
291    {
292        /** The message to handle. Passed in during construction. */
293        private UDPDiscoveryMessage message = null;
294
295        /**
296         * @param message
297         */
298        public MessageHandler( UDPDiscoveryMessage message )
299        {
300            this.message = message;
301        }
302
303        /**
304         * Process the message.
305         */
306        @SuppressWarnings("synthetic-access")
307        @Override
308        public void run()
309        {
310            // consider comparing ports here instead.
311            if ( message.getRequesterId() == CacheInfo.listenerId )
312            {
313                if ( log.isDebugEnabled() )
314                {
315                    log.debug( "Ignoring message sent from self" );
316                }
317            }
318            else
319            {
320                if ( log.isDebugEnabled() )
321                {
322                    log.debug( "Process message sent from another" );
323                    log.debug( "Message = " + message );
324                }
325
326                if ( message.getHost() == null || message.getCacheNames() == null || message.getCacheNames().isEmpty() )
327                {
328                    if ( log.isDebugEnabled() )
329                    {
330                        log.debug( "Ignoring invalid message: " + message );
331                    }
332                }
333                else
334                {
335                    processMessage();
336                }
337            }
338        }
339
340        /**
341         * Process the incoming message.
342         */
343        @SuppressWarnings("synthetic-access")
344        private void processMessage()
345        {
346            DiscoveredService discoveredService = new DiscoveredService();
347            discoveredService.setServiceAddress( message.getHost() );
348            discoveredService.setCacheNames( message.getCacheNames() );
349            discoveredService.setServicePort( message.getPort() );
350            discoveredService.setLastHearFromTime( System.currentTimeMillis() );
351
352            // if this is a request message, have the service handle it and
353            // return
354            if ( message.getMessageType() == BroadcastType.REQUEST )
355            {
356                if ( log.isDebugEnabled() )
357                {
358                    log.debug( "Message is a Request Broadcast, will have the service handle it." );
359                }
360                service.serviceRequestBroadcast();
361                return;
362            }
363            else if ( message.getMessageType() == BroadcastType.REMOVE )
364            {
365                if ( log.isDebugEnabled() )
366                {
367                    log.debug( "Removing service from set " + discoveredService );
368                }
369                service.removeDiscoveredService( discoveredService );
370            }
371            else
372            {
373                service.addOrUpdateService( discoveredService );
374            }
375        }
376    }
377
378    /** Shuts down the socket. */
379    @Override
380    public void shutdown()
381    {
382        try
383        {
384            shutdown = true;
385            mSocket.leaveGroup( InetAddress.getByName( multicastAddressString ) );
386            mSocket.close();
387            pooledExecutor.shutdownNow();
388        }
389        catch ( IOException e )
390        {
391            log.error( "Problem closing socket" );
392        }
393    }
394}