1 package org.apache.commons.jcs.utils.discovery;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.ByteArrayInputStream;
23 import java.io.IOException;
24 import java.io.ObjectInputStream;
25 import java.net.DatagramPacket;
26 import java.net.InetAddress;
27 import java.net.MulticastSocket;
28 import java.util.concurrent.Executors;
29 import java.util.concurrent.ThreadPoolExecutor;
30
31 import org.apache.commons.jcs.engine.CacheInfo;
32 import org.apache.commons.jcs.engine.behavior.IShutdownObserver;
33 import org.apache.commons.jcs.io.ObjectInputStreamClassLoaderAware;
34 import org.apache.commons.jcs.utils.discovery.UDPDiscoveryMessage.BroadcastType;
35 import org.apache.commons.jcs.utils.threadpool.DaemonThreadFactory;
36 import org.apache.commons.logging.Log;
37 import org.apache.commons.logging.LogFactory;
38
39
40 public class UDPDiscoveryReceiver
41 implements Runnable, IShutdownObserver
42 {
43
44 private static final Log log = LogFactory.getLog( UDPDiscoveryReceiver.class );
45
46
47 private final byte[] mBuffer = new byte[65536];
48
49
50 private MulticastSocket mSocket;
51
52
53
54
55
56 private static final int maxPoolSize = 2;
57
58
59 private ThreadPoolExecutor pooledExecutor = null;
60
61
62 private int cnt = 0;
63
64
65 private UDPDiscoveryService service = null;
66
67
68 private String multicastAddressString = "";
69
70
71 private int multicastPort = 0;
72
73
74 private boolean shutdown = false;
75
76
77
78
79
80
81
82
83
84
85
86 public UDPDiscoveryReceiver( UDPDiscoveryService service, String multicastAddressString, int multicastPort )
87 throws IOException
88 {
89 this.service = service;
90 this.multicastAddressString = multicastAddressString;
91 this.multicastPort = multicastPort;
92
93
94 pooledExecutor = (ThreadPoolExecutor)Executors.newFixedThreadPool(maxPoolSize,
95 new DaemonThreadFactory("JCS-UDPDiscoveryReceiver-", Thread.MIN_PRIORITY));
96 pooledExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
97
98
99 if ( log.isInfoEnabled() )
100 {
101 log.info( "Constructing listener, [" + this.multicastAddressString + ":" + this.multicastPort + "]" );
102 }
103
104 try
105 {
106 createSocket( this.multicastAddressString, this.multicastPort );
107 }
108 catch ( IOException ioe )
109 {
110
111
112 throw ioe;
113 }
114 }
115
116
117
118
119
120
121
122
123 private void createSocket( String multicastAddressString, int multicastPort )
124 throws IOException
125 {
126 try
127 {
128 mSocket = new MulticastSocket( multicastPort );
129 if ( log.isInfoEnabled() )
130 {
131 log.info( "Joining Group: [" + InetAddress.getByName( multicastAddressString ) + "]" );
132 }
133 mSocket.joinGroup( InetAddress.getByName( multicastAddressString ) );
134 }
135 catch ( IOException e )
136 {
137 log.error( "Could not bind to multicast address [" + InetAddress.getByName( multicastAddressString ) + ":" + multicastPort + "]", e );
138 throw e;
139 }
140 }
141
142
143
144
145
146
147
148
149 public Object waitForMessage()
150 throws IOException
151 {
152 final DatagramPacket packet = new DatagramPacket( mBuffer, mBuffer.length );
153 ObjectInputStream objectStream = null;
154 Object obj = null;
155 try
156 {
157 if ( log.isDebugEnabled() )
158 {
159 log.debug( "Waiting for message." );
160 }
161
162 mSocket.receive( packet );
163
164 if ( log.isDebugEnabled() )
165 {
166 log.debug( "Received packet from address [" + packet.getSocketAddress() + "]" );
167 }
168
169 final ByteArrayInputStream byteStream = new ByteArrayInputStream( mBuffer, 0, packet.getLength() );
170 objectStream = new ObjectInputStreamClassLoaderAware( byteStream, null );
171 obj = objectStream.readObject();
172
173 if ( obj instanceof UDPDiscoveryMessage )
174 {
175
176
177
178 UDPDiscoveryMessage msg = (UDPDiscoveryMessage) obj;
179 msg.setHost(packet.getAddress().getHostAddress());
180
181 if ( log.isDebugEnabled() )
182 {
183 log.debug( "Read object from address [" + packet.getSocketAddress() + "], object=[" + obj + "]" );
184 }
185 }
186 }
187 catch ( Exception e )
188 {
189 log.error( "Error receiving multicast packet", e );
190 }
191 finally
192 {
193 if (objectStream != null)
194 {
195 try
196 {
197 objectStream.close();
198 }
199 catch (IOException e)
200 {
201 log.error( "Error closing object stream", e );
202 }
203 }
204 }
205 return obj;
206 }
207
208
209 @Override
210 public void run()
211 {
212 try
213 {
214 while ( !shutdown )
215 {
216 Object obj = waitForMessage();
217
218
219 cnt++;
220
221 if ( log.isDebugEnabled() )
222 {
223 log.debug( getCnt() + " messages received." );
224 }
225
226 UDPDiscoveryMessage message = null;
227
228 try
229 {
230 message = (UDPDiscoveryMessage) obj;
231
232 if ( message != null )
233 {
234 MessageHandler handler = new MessageHandler( message );
235
236 pooledExecutor.execute( handler );
237
238 if ( log.isDebugEnabled() )
239 {
240 log.debug( "Passed handler to executor." );
241 }
242 }
243 else
244 {
245 log.warn( "message is null" );
246 }
247 }
248 catch ( ClassCastException cce )
249 {
250 log.warn( "Received unknown message type " + cce.getMessage() );
251 }
252 }
253 }
254 catch ( Exception e )
255 {
256 log.error( "Unexpected exception in UDP receiver.", e );
257 try
258 {
259 Thread.sleep( 100 );
260
261
262 }
263 catch ( Exception e2 )
264 {
265 log.error( "Problem sleeping", e2 );
266 }
267 }
268 }
269
270
271
272
273 public void setCnt( int cnt )
274 {
275 this.cnt = cnt;
276 }
277
278
279
280
281 public int getCnt()
282 {
283 return cnt;
284 }
285
286
287
288
289 public class MessageHandler
290 implements Runnable
291 {
292
293 private UDPDiscoveryMessage message = null;
294
295
296
297
298 public MessageHandler( UDPDiscoveryMessage message )
299 {
300 this.message = message;
301 }
302
303
304
305
306 @SuppressWarnings("synthetic-access")
307 @Override
308 public void run()
309 {
310
311 if ( message.getRequesterId() == CacheInfo.listenerId )
312 {
313 if ( log.isDebugEnabled() )
314 {
315 log.debug( "Ignoring message sent from self" );
316 }
317 }
318 else
319 {
320 if ( log.isDebugEnabled() )
321 {
322 log.debug( "Process message sent from another" );
323 log.debug( "Message = " + message );
324 }
325
326 if ( message.getHost() == null || message.getCacheNames() == null || message.getCacheNames().isEmpty() )
327 {
328 if ( log.isDebugEnabled() )
329 {
330 log.debug( "Ignoring invalid message: " + message );
331 }
332 }
333 else
334 {
335 processMessage();
336 }
337 }
338 }
339
340
341
342
343 @SuppressWarnings("synthetic-access")
344 private void processMessage()
345 {
346 DiscoveredService discoveredService = new DiscoveredService();
347 discoveredService.setServiceAddress( message.getHost() );
348 discoveredService.setCacheNames( message.getCacheNames() );
349 discoveredService.setServicePort( message.getPort() );
350 discoveredService.setLastHearFromTime( System.currentTimeMillis() );
351
352
353
354 if ( message.getMessageType() == BroadcastType.REQUEST )
355 {
356 if ( log.isDebugEnabled() )
357 {
358 log.debug( "Message is a Request Broadcast, will have the service handle it." );
359 }
360 service.serviceRequestBroadcast();
361 return;
362 }
363 else if ( message.getMessageType() == BroadcastType.REMOVE )
364 {
365 if ( log.isDebugEnabled() )
366 {
367 log.debug( "Removing service from set " + discoveredService );
368 }
369 service.removeDiscoveredService( discoveredService );
370 }
371 else
372 {
373 service.addOrUpdateService( discoveredService );
374 }
375 }
376 }
377
378
379 @Override
380 public void shutdown()
381 {
382 try
383 {
384 shutdown = true;
385 mSocket.leaveGroup( InetAddress.getByName( multicastAddressString ) );
386 mSocket.close();
387 pooledExecutor.shutdownNow();
388 }
389 catch ( IOException e )
390 {
391 log.error( "Problem closing socket" );
392 }
393 }
394 }