1 package org.apache.commons.jcs3.utils.discovery;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.IOException;
23 import java.net.Inet6Address;
24 import java.net.InetAddress;
25 import java.net.InetSocketAddress;
26 import java.net.NetworkInterface;
27 import java.net.StandardProtocolFamily;
28 import java.net.StandardSocketOptions;
29 import java.nio.ByteBuffer;
30 import java.nio.channels.DatagramChannel;
31 import java.nio.channels.MembershipKey;
32 import java.nio.channels.SelectionKey;
33 import java.nio.channels.Selector;
34 import java.util.Iterator;
35 import java.util.concurrent.ArrayBlockingQueue;
36 import java.util.concurrent.ExecutorService;
37 import java.util.concurrent.atomic.AtomicBoolean;
38 import java.util.concurrent.atomic.AtomicInteger;
39
40 import org.apache.commons.jcs3.engine.CacheInfo;
41 import org.apache.commons.jcs3.engine.behavior.IElementSerializer;
42 import org.apache.commons.jcs3.engine.behavior.IShutdownObserver;
43 import org.apache.commons.jcs3.log.Log;
44 import org.apache.commons.jcs3.log.LogManager;
45 import org.apache.commons.jcs3.utils.net.HostNameUtil;
46 import org.apache.commons.jcs3.utils.threadpool.PoolConfiguration;
47 import org.apache.commons.jcs3.utils.threadpool.PoolConfiguration.WhenBlockedPolicy;
48 import org.apache.commons.jcs3.utils.threadpool.ThreadPoolManager;
49
50
51 public class UDPDiscoveryReceiver
52 implements Runnable, IShutdownObserver
53 {
54
55 private static final Log log = LogManager.getLog( UDPDiscoveryReceiver.class );
56
57
58 private DatagramChannel multicastChannel;
59
60
61 private MembershipKey multicastGroupKey;
62
63
64 private Selector selector;
65
66
67
68
69
70 private static final int maxPoolSize = 2;
71
72
73 private final ExecutorService pooledExecutor;
74
75
76 private final AtomicInteger cnt = new AtomicInteger(0);
77
78
79 private final UDPDiscoveryService service;
80
81
82 private IElementSerializer serializer;
83
84
85 private final AtomicBoolean shutdown = new AtomicBoolean(false);
86
87
88
89
90
91
92
93
94
95
96
97
98 public UDPDiscoveryReceiver( final UDPDiscoveryService service,
99 final String multicastInterfaceString,
100 final String multicastAddressString,
101 final int multicastPort )
102 throws IOException
103 {
104 this(service, multicastInterfaceString,
105 InetAddress.getByName( multicastAddressString ),
106 multicastPort);
107 }
108
109
110
111
112
113
114
115
116
117
118
119 public UDPDiscoveryReceiver( final UDPDiscoveryService service,
120 final String multicastInterfaceString,
121 final InetAddress multicastAddress,
122 final int multicastPort )
123 throws IOException
124 {
125 this.service = service;
126 if (service != null)
127 {
128 this.serializer = service.getSerializer();
129 }
130
131
132 this.pooledExecutor = ThreadPoolManager.getInstance().createPool(
133 new PoolConfiguration(false, 0, maxPoolSize, maxPoolSize, 0,
134 WhenBlockedPolicy.DISCARDOLDEST, maxPoolSize),
135 "JCS-UDPDiscoveryReceiver-", Thread.MIN_PRIORITY);
136
137 log.info( "Constructing listener, [{0}:{1}]", multicastAddress, multicastPort );
138 createSocket( multicastInterfaceString, multicastAddress, multicastPort );
139 }
140
141
142
143
144
145
146
147
148
149 private void createSocket( final String multicastInterfaceString, final InetAddress multicastAddress,
150 final int multicastPort )
151 throws IOException
152 {
153 try
154 {
155
156 NetworkInterface multicastInterface = null;
157 if (multicastInterfaceString != null)
158 {
159 multicastInterface = NetworkInterface.getByName(multicastInterfaceString);
160 }
161 else
162 {
163 multicastInterface = HostNameUtil.getMulticastNetworkInterface();
164 }
165 if (multicastInterface != null)
166 {
167 log.info("Using network interface {0}", multicastInterface::getDisplayName);
168 }
169
170 multicastChannel = DatagramChannel.open(
171 multicastAddress instanceof Inet6Address ?
172 StandardProtocolFamily.INET6 : StandardProtocolFamily.INET)
173 .setOption(StandardSocketOptions.SO_REUSEADDR, true)
174 .setOption(StandardSocketOptions.IP_MULTICAST_IF, multicastInterface)
175 .bind(new InetSocketAddress(multicastPort));
176 multicastChannel.configureBlocking(false);
177
178 log.info("Joining Group: [{0}] on {1}", multicastAddress, multicastInterface);
179 multicastGroupKey = multicastChannel.join(multicastAddress, multicastInterface);
180
181 selector = Selector.open();
182 multicastChannel.register(selector, SelectionKey.OP_READ);
183 }
184 catch ( final IOException e )
185 {
186 log.error( "Could not bind to multicast address [{0}:{1}]", multicastAddress,
187 multicastPort, e );
188 throw e;
189 }
190 }
191
192 private final ArrayBlockingQueue<UDPDiscoveryMessage> msgQueue =
193 new ArrayBlockingQueue<>(maxPoolSize);
194
195
196
197
198
199
200
201
202 @Deprecated
203 public Object waitForMessage()
204 throws IOException
205 {
206 try
207 {
208 return msgQueue.take();
209 }
210 catch (InterruptedException e)
211 {
212 throw new IOException("Interrupted waiting for message", e);
213 }
214 }
215
216
217 @Override
218 public void run()
219 {
220 try
221 {
222 log.debug( "Waiting for message." );
223
224 while (!shutdown.get())
225 {
226 int activeKeys = selector.select();
227 if (activeKeys == 0)
228 {
229 continue;
230 }
231
232 for (Iterator<SelectionKey> i = selector.selectedKeys().iterator(); i.hasNext();)
233 {
234 if (shutdown.get())
235 {
236 break;
237 }
238
239 SelectionKey key = i.next();
240
241 if (!key.isValid())
242 {
243 continue;
244 }
245
246 if (key.isReadable())
247 {
248 cnt.incrementAndGet();
249 log.debug( "{0} messages received.", this::getCnt );
250
251 DatagramChannel mc = (DatagramChannel) key.channel();
252
253 ByteBuffer byteBuffer = ByteBuffer.allocate(65536);
254 InetSocketAddress sourceAddress =
255 (InetSocketAddress) mc.receive(byteBuffer);
256 byteBuffer.flip();
257
258 try
259 {
260 log.debug("Received packet from address [{0}]", sourceAddress);
261 byte[] bytes = new byte[byteBuffer.limit()];
262 byteBuffer.get(bytes);
263 Object obj = serializer.deSerialize(bytes, null);
264
265 if (obj instanceof UDPDiscoveryMessage)
266 {
267
268
269
270 final UDPDiscoveryMessage msg = (UDPDiscoveryMessage) obj;
271 msg.setHost(sourceAddress.getHostString());
272
273 log.debug( "Read object from address [{0}], object=[{1}]",
274 sourceAddress, obj );
275
276
277 synchronized (msgQueue)
278 {
279
280 if (msgQueue.remainingCapacity() == 0)
281 {
282
283 msgQueue.remove();
284 }
285
286 msgQueue.add(msg);
287 }
288
289 pooledExecutor.execute(() -> handleMessage(msg));
290 log.debug( "Passed handler to executor." );
291 }
292 }
293 catch ( final IOException | ClassNotFoundException e )
294 {
295 log.error( "Error receiving multicast packet", e );
296 }
297
298 i.remove();
299 }
300 }
301 }
302 }
303 catch ( final IOException e )
304 {
305 log.error( "Unexpected exception in UDP receiver.", e );
306 }
307 }
308
309
310
311
312 public void setCnt( final int cnt )
313 {
314 this.cnt.set(cnt);
315 }
316
317
318
319
320 public int getCnt()
321 {
322 return cnt.get();
323 }
324
325
326
327
328
329
330
331 protected void setSerializer(IElementSerializer serializer)
332 {
333 this.serializer = serializer;
334 }
335
336
337
338
339
340 @Deprecated
341 public class MessageHandler
342 implements Runnable
343 {
344
345 private final UDPDiscoveryMessage message;
346
347
348
349
350 public MessageHandler( final UDPDiscoveryMessage message )
351 {
352 this.message = message;
353 }
354
355
356
357
358 @Override
359 public void run()
360 {
361 handleMessage(message);
362 }
363 }
364
365
366
367
368 private void handleMessage(UDPDiscoveryMessage message)
369 {
370
371 if ( message.getRequesterId() == CacheInfo.listenerId )
372 {
373 log.debug( "Ignoring message sent from self" );
374 }
375 else
376 {
377 log.debug( "Process message sent from another" );
378 log.debug( "Message = {0}", message );
379
380 if ( message.getHost() == null || message.getCacheNames() == null || message.getCacheNames().isEmpty() )
381 {
382 log.debug( "Ignoring invalid message: {0}", message );
383 }
384 else
385 {
386 processMessage(message);
387 }
388 }
389 }
390
391
392
393
394 private void processMessage(UDPDiscoveryMessage message)
395 {
396 final DiscoveredService discoveredService = new DiscoveredService(message);
397
398 switch (message.getMessageType())
399 {
400 case REMOVE:
401 log.debug( "Removing service from set {0}", discoveredService );
402 service.removeDiscoveredService( discoveredService );
403 break;
404 case REQUEST:
405
406
407 log.debug( "Message is a Request Broadcast, will have the service handle it." );
408 service.serviceRequestBroadcast();
409 break;
410 case PASSIVE:
411 default:
412 log.debug( "Adding or updating service to set {0}", discoveredService );
413 service.addOrUpdateService( discoveredService );
414 break;
415 }
416 }
417
418
419 @Override
420 public void shutdown()
421 {
422 if (shutdown.compareAndSet(false, true))
423 {
424 try
425 {
426 selector.close();
427 multicastGroupKey.drop();
428 multicastChannel.close();
429 }
430 catch ( final IOException e )
431 {
432 log.error( "Problem closing socket" );
433 }
434 }
435 }
436 }