View Javadoc
1   package org.apache.commons.jcs.utils.discovery;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.ByteArrayInputStream;
23  import java.io.IOException;
24  import java.io.ObjectInputStream;
25  import java.net.DatagramPacket;
26  import java.net.InetAddress;
27  import java.net.MulticastSocket;
28  import java.util.concurrent.Executors;
29  import java.util.concurrent.ThreadPoolExecutor;
30  
31  import org.apache.commons.jcs.engine.CacheInfo;
32  import org.apache.commons.jcs.engine.behavior.IShutdownObserver;
33  import org.apache.commons.jcs.io.ObjectInputStreamClassLoaderAware;
34  import org.apache.commons.jcs.utils.discovery.UDPDiscoveryMessage.BroadcastType;
35  import org.apache.commons.jcs.utils.threadpool.DaemonThreadFactory;
36  import org.apache.commons.logging.Log;
37  import org.apache.commons.logging.LogFactory;
38  
39  /** Receives UDP Discovery messages. */
40  public class UDPDiscoveryReceiver
41      implements Runnable, IShutdownObserver
42  {
43      /** The log factory */
44      private static final Log log = LogFactory.getLog( UDPDiscoveryReceiver.class );
45  
46      /** buffer */
47      private final byte[] mBuffer = new byte[65536];
48  
49      /** The socket used for communication. */
50      private MulticastSocket mSocket;
51  
52      /**
53       * TODO: Consider using the threadpool manager to get this thread pool. For now place a tight
54       * restriction on the pool size
55       */
56      private static final int maxPoolSize = 2;
57  
58      /** The processor */
59      private ThreadPoolExecutor pooledExecutor = null;
60  
61      /** number of messages received. For debugging and testing. */
62      private int cnt = 0;
63  
64      /** Service to get cache names and handle request broadcasts */
65      private UDPDiscoveryService service = null;
66  
67      /** Address */
68      private String multicastAddressString = "";
69  
70      /** The port */
71      private int multicastPort = 0;
72  
73      /** Is it shutdown. */
74      private boolean shutdown = false;
75  
76      /**
77       * Constructor for the LateralUDPReceiver object.
78       * <p>
79       * We determine out own host using InetAddress
80       *<p>
81       * @param service
82       * @param multicastAddressString
83       * @param multicastPort
84       * @throws IOException
85       */
86      public UDPDiscoveryReceiver( UDPDiscoveryService service, String multicastAddressString, int multicastPort )
87          throws IOException
88      {
89          this.service = service;
90          this.multicastAddressString = multicastAddressString;
91          this.multicastPort = multicastPort;
92  
93          // create a small thread pool to handle a barrage
94          pooledExecutor = (ThreadPoolExecutor)Executors.newFixedThreadPool(maxPoolSize,
95                  new DaemonThreadFactory("JCS-UDPDiscoveryReceiver-", Thread.MIN_PRIORITY));
96          pooledExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
97          //pooledExecutor.setMinimumPoolSize(1);
98  
99          if ( log.isInfoEnabled() )
100         {
101             log.info( "Constructing listener, [" + this.multicastAddressString + ":" + this.multicastPort + "]" );
102         }
103 
104         try
105         {
106             createSocket( this.multicastAddressString, this.multicastPort );
107         }
108         catch ( IOException ioe )
109         {
110             // consider eating this so we can go on, or constructing the socket
111             // later
112             throw ioe;
113         }
114     }
115 
116     /**
117      * Creates the socket for this class.
118      * <p>
119      * @param multicastAddressString
120      * @param multicastPort
121      * @throws IOException
122      */
123     private void createSocket( String multicastAddressString, int multicastPort )
124         throws IOException
125     {
126         try
127         {
128             mSocket = new MulticastSocket( multicastPort );
129             if ( log.isInfoEnabled() )
130             {
131                 log.info( "Joining Group: [" + InetAddress.getByName( multicastAddressString ) + "]" );
132             }
133             mSocket.joinGroup( InetAddress.getByName( multicastAddressString ) );
134         }
135         catch ( IOException e )
136         {
137             log.error( "Could not bind to multicast address [" + InetAddress.getByName( multicastAddressString ) + ":" + multicastPort + "]", e );
138             throw e;
139         }
140     }
141 
142     /**
143      * Highly unreliable. If it is processing one message while another comes in, the second
144      * message is lost. This is for low concurrency peppering.
145      * <p>
146      * @return the object message
147      * @throws IOException
148      */
149     public Object waitForMessage()
150         throws IOException
151     {
152         final DatagramPacket packet = new DatagramPacket( mBuffer, mBuffer.length );
153         ObjectInputStream objectStream = null;
154         Object obj = null;
155         try
156         {
157             if ( log.isDebugEnabled() )
158             {
159                 log.debug( "Waiting for message." );
160             }
161 
162             mSocket.receive( packet );
163 
164             if ( log.isDebugEnabled() )
165             {
166                 log.debug( "Received packet from address [" + packet.getSocketAddress() + "]" );
167             }
168 
169             final ByteArrayInputStream byteStream = new ByteArrayInputStream( mBuffer, 0, packet.getLength() );
170             objectStream = new ObjectInputStreamClassLoaderAware( byteStream, null );
171             obj = objectStream.readObject();
172 
173             if ( obj instanceof UDPDiscoveryMessage )
174             {
175             	// Ensure that the address we're supposed to send to is, indeed, the address
176             	// of the machine on the other end of this connection.  This guards against
177             	// instances where we don't exactly get the right local host address
178             	UDPDiscoveryMessage msg = (UDPDiscoveryMessage) obj;
179             	msg.setHost(packet.getAddress().getHostAddress());
180 
181 	            if ( log.isDebugEnabled() )
182 	            {
183 	                log.debug( "Read object from address [" + packet.getSocketAddress() + "], object=[" + obj + "]" );
184 	            }
185             }
186         }
187         catch ( Exception e )
188         {
189             log.error( "Error receiving multicast packet", e );
190         }
191         finally
192         {
193             if (objectStream != null)
194             {
195                 try
196                 {
197                     objectStream.close();
198                 }
199                 catch (IOException e)
200                 {
201                     log.error( "Error closing object stream", e );
202                 }
203             }
204         }
205         return obj;
206     }
207 
208     /** Main processing method for the LateralUDPReceiver object */
209     @Override
210     public void run()
211     {
212         try
213         {
214             while ( !shutdown )
215             {
216                 Object obj = waitForMessage();
217 
218                 // not thread safe, but just for debugging
219                 cnt++;
220 
221                 if ( log.isDebugEnabled() )
222                 {
223                     log.debug( getCnt() + " messages received." );
224                 }
225 
226                 UDPDiscoveryMessage message = null;
227 
228                 try
229                 {
230                     message = (UDPDiscoveryMessage) obj;
231                     // check for null
232                     if ( message != null )
233                     {
234                         MessageHandler handler = new MessageHandler( message );
235 
236                         pooledExecutor.execute( handler );
237 
238                         if ( log.isDebugEnabled() )
239                         {
240                             log.debug( "Passed handler to executor." );
241                         }
242                     }
243                     else
244                     {
245                         log.warn( "message is null" );
246                     }
247                 }
248                 catch ( ClassCastException cce )
249                 {
250                     log.warn( "Received unknown message type " + cce.getMessage() );
251                 }
252             } // end while
253         }
254         catch ( Exception e )
255         {
256             log.error( "Unexpected exception in UDP receiver.", e );
257             try
258             {
259                 Thread.sleep( 100 );
260                 // TODO consider some failure count so we don't do this
261                 // forever.
262             }
263             catch ( Exception e2 )
264             {
265                 log.error( "Problem sleeping", e2 );
266             }
267         }
268     }
269 
270     /**
271      * @param cnt The cnt to set.
272      */
273     public void setCnt( int cnt )
274     {
275         this.cnt = cnt;
276     }
277 
278     /**
279      * @return Returns the cnt.
280      */
281     public int getCnt()
282     {
283         return cnt;
284     }
285 
286     /**
287      * Separate thread run when a command comes into the UDPDiscoveryReceiver.
288      */
289     public class MessageHandler
290         implements Runnable
291     {
292         /** The message to handle. Passed in during construction. */
293         private UDPDiscoveryMessage message = null;
294 
295         /**
296          * @param message
297          */
298         public MessageHandler( UDPDiscoveryMessage message )
299         {
300             this.message = message;
301         }
302 
303         /**
304          * Process the message.
305          */
306         @SuppressWarnings("synthetic-access")
307         @Override
308         public void run()
309         {
310             // consider comparing ports here instead.
311             if ( message.getRequesterId() == CacheInfo.listenerId )
312             {
313                 if ( log.isDebugEnabled() )
314                 {
315                     log.debug( "Ignoring message sent from self" );
316                 }
317             }
318             else
319             {
320                 if ( log.isDebugEnabled() )
321                 {
322                     log.debug( "Process message sent from another" );
323                     log.debug( "Message = " + message );
324                 }
325 
326                 if ( message.getHost() == null || message.getCacheNames() == null || message.getCacheNames().isEmpty() )
327                 {
328                     if ( log.isDebugEnabled() )
329                     {
330                         log.debug( "Ignoring invalid message: " + message );
331                     }
332                 }
333                 else
334                 {
335                     processMessage();
336                 }
337             }
338         }
339 
340         /**
341          * Process the incoming message.
342          */
343         @SuppressWarnings("synthetic-access")
344         private void processMessage()
345         {
346             DiscoveredService discoveredService = new DiscoveredService();
347             discoveredService.setServiceAddress( message.getHost() );
348             discoveredService.setCacheNames( message.getCacheNames() );
349             discoveredService.setServicePort( message.getPort() );
350             discoveredService.setLastHearFromTime( System.currentTimeMillis() );
351 
352             // if this is a request message, have the service handle it and
353             // return
354             if ( message.getMessageType() == BroadcastType.REQUEST )
355             {
356                 if ( log.isDebugEnabled() )
357                 {
358                     log.debug( "Message is a Request Broadcast, will have the service handle it." );
359                 }
360                 service.serviceRequestBroadcast();
361                 return;
362             }
363             else if ( message.getMessageType() == BroadcastType.REMOVE )
364             {
365                 if ( log.isDebugEnabled() )
366                 {
367                     log.debug( "Removing service from set " + discoveredService );
368                 }
369                 service.removeDiscoveredService( discoveredService );
370             }
371             else
372             {
373                 service.addOrUpdateService( discoveredService );
374             }
375         }
376     }
377 
378     /** Shuts down the socket. */
379     @Override
380     public void shutdown()
381     {
382         try
383         {
384             shutdown = true;
385             mSocket.leaveGroup( InetAddress.getByName( multicastAddressString ) );
386             mSocket.close();
387             pooledExecutor.shutdownNow();
388         }
389         catch ( IOException e )
390         {
391             log.error( "Problem closing socket" );
392         }
393     }
394 }