001package org.apache.commons.jcs3.utils.discovery; 002 003/* 004 * Licensed to the Apache Software Foundation (ASF) under one 005 * or more contributor license agreements. See the NOTICE file 006 * distributed with this work for additional information 007 * regarding copyright ownership. The ASF licenses this file 008 * to you under the Apache License, Version 2.0 (the 009 * "License"); you may not use this file except in compliance 010 * with the License. You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, 015 * software distributed under the License is distributed on an 016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 017 * KIND, either express or implied. See the License for the 018 * specific language governing permissions and limitations 019 * under the License. 020 */ 021 022import java.io.IOException; 023import java.net.Inet6Address; 024import java.net.InetAddress; 025import java.net.InetSocketAddress; 026import java.net.NetworkInterface; 027import java.net.StandardProtocolFamily; 028import java.net.StandardSocketOptions; 029import java.nio.ByteBuffer; 030import java.nio.channels.DatagramChannel; 031import java.nio.channels.MembershipKey; 032import java.nio.channels.SelectionKey; 033import java.nio.channels.Selector; 034import java.util.Iterator; 035import java.util.concurrent.ArrayBlockingQueue; 036import java.util.concurrent.ExecutorService; 037import java.util.concurrent.atomic.AtomicBoolean; 038import java.util.concurrent.atomic.AtomicInteger; 039 040import org.apache.commons.jcs3.engine.CacheInfo; 041import org.apache.commons.jcs3.engine.behavior.IElementSerializer; 042import org.apache.commons.jcs3.engine.behavior.IShutdownObserver; 043import org.apache.commons.jcs3.log.Log; 044import org.apache.commons.jcs3.log.LogManager; 045import org.apache.commons.jcs3.utils.net.HostNameUtil; 046import org.apache.commons.jcs3.utils.threadpool.PoolConfiguration; 047import org.apache.commons.jcs3.utils.threadpool.PoolConfiguration.WhenBlockedPolicy; 048import org.apache.commons.jcs3.utils.threadpool.ThreadPoolManager; 049 050/** Receives UDP Discovery messages. */ 051public class UDPDiscoveryReceiver 052 implements Runnable, IShutdownObserver 053{ 054 /** The log factory */ 055 private static final Log log = LogManager.getLog( UDPDiscoveryReceiver.class ); 056 057 /** The channel used for communication. */ 058 private DatagramChannel multicastChannel; 059 060 /** The group membership key. */ 061 private MembershipKey multicastGroupKey; 062 063 /** The selector. */ 064 private Selector selector; 065 066 /** 067 * TODO: Consider using the threadpool manager to get this thread pool. For now place a tight 068 * restriction on the pool size 069 */ 070 private static final int maxPoolSize = 2; 071 072 /** The processor */ 073 private final ExecutorService pooledExecutor; 074 075 /** number of messages received. For debugging and testing. */ 076 private final AtomicInteger cnt = new AtomicInteger(0); 077 078 /** Service to get cache names and handle request broadcasts */ 079 private final UDPDiscoveryService service; 080 081 /** Serializer */ 082 private IElementSerializer serializer; 083 084 /** Is it shutdown. */ 085 private final AtomicBoolean shutdown = new AtomicBoolean(false); 086 087 /** 088 * Constructor for the UDPDiscoveryReceiver object. 089 * <p> 090 * We determine our own host using InetAddress 091 *<p> 092 * @param service 093 * @param multicastInterfaceString 094 * @param multicastAddressString 095 * @param multicastPort 096 * @throws IOException 097 */ 098 public UDPDiscoveryReceiver( final UDPDiscoveryService service, 099 final String multicastInterfaceString, 100 final String multicastAddressString, 101 final int multicastPort ) 102 throws IOException 103 { 104 this(service, multicastInterfaceString, 105 InetAddress.getByName( multicastAddressString ), 106 multicastPort); 107 } 108 109 /** 110 * Constructor for the UDPDiscoveryReceiver object. 111 * <p> 112 * @param service 113 * @param multicastInterfaceString 114 * @param multicastAddress 115 * @param multicastPort 116 * @throws IOException 117 * @since 3.1 118 */ 119 public UDPDiscoveryReceiver( final UDPDiscoveryService service, 120 final String multicastInterfaceString, 121 final InetAddress multicastAddress, 122 final int multicastPort ) 123 throws IOException 124 { 125 this.service = service; 126 if (service != null) 127 { 128 this.serializer = service.getSerializer(); 129 } 130 131 // create a small thread pool to handle a barrage 132 this.pooledExecutor = ThreadPoolManager.getInstance().createPool( 133 new PoolConfiguration(false, 0, maxPoolSize, maxPoolSize, 0, 134 WhenBlockedPolicy.DISCARDOLDEST, maxPoolSize), 135 "JCS-UDPDiscoveryReceiver-", Thread.MIN_PRIORITY); 136 137 log.info( "Constructing listener, [{0}:{1}]", multicastAddress, multicastPort ); 138 createSocket( multicastInterfaceString, multicastAddress, multicastPort ); 139 } 140 141 /** 142 * Creates the socket for this class. 143 * <p> 144 * @param multicastInterfaceString 145 * @param multicastAddress 146 * @param multicastPort 147 * @throws IOException 148 */ 149 private void createSocket( final String multicastInterfaceString, final InetAddress multicastAddress, 150 final int multicastPort ) 151 throws IOException 152 { 153 try 154 { 155 // Use dedicated interface if specified 156 NetworkInterface multicastInterface = null; 157 if (multicastInterfaceString != null) 158 { 159 multicastInterface = NetworkInterface.getByName(multicastInterfaceString); 160 } 161 else 162 { 163 multicastInterface = HostNameUtil.getMulticastNetworkInterface(); 164 } 165 if (multicastInterface != null) 166 { 167 log.info("Using network interface {0}", multicastInterface::getDisplayName); 168 } 169 170 multicastChannel = DatagramChannel.open( 171 multicastAddress instanceof Inet6Address ? 172 StandardProtocolFamily.INET6 : StandardProtocolFamily.INET) 173 .setOption(StandardSocketOptions.SO_REUSEADDR, true) 174 .setOption(StandardSocketOptions.IP_MULTICAST_IF, multicastInterface) 175 .bind(new InetSocketAddress(multicastPort)); 176 multicastChannel.configureBlocking(false); 177 178 log.info("Joining Group: [{0}] on {1}", multicastAddress, multicastInterface); 179 multicastGroupKey = multicastChannel.join(multicastAddress, multicastInterface); 180 181 selector = Selector.open(); 182 multicastChannel.register(selector, SelectionKey.OP_READ); 183 } 184 catch ( final IOException e ) 185 { 186 log.error( "Could not bind to multicast address [{0}:{1}]", multicastAddress, 187 multicastPort, e ); 188 throw e; 189 } 190 } 191 192 private final ArrayBlockingQueue<UDPDiscoveryMessage> msgQueue = 193 new ArrayBlockingQueue<>(maxPoolSize); 194 195 /** 196 * Wait for multicast message 197 * 198 * @return the object message 199 * @throws IOException 200 * @deprecated no longer used 201 */ 202 @Deprecated 203 public Object waitForMessage() 204 throws IOException 205 { 206 try 207 { 208 return msgQueue.take(); 209 } 210 catch (InterruptedException e) 211 { 212 throw new IOException("Interrupted waiting for message", e); 213 } 214 } 215 216 /** Main processing method for the UDPDiscoveryReceiver object */ 217 @Override 218 public void run() 219 { 220 try 221 { 222 log.debug( "Waiting for message." ); 223 224 while (!shutdown.get()) 225 { 226 int activeKeys = selector.select(); 227 if (activeKeys == 0) 228 { 229 continue; 230 } 231 232 for (Iterator<SelectionKey> i = selector.selectedKeys().iterator(); i.hasNext();) 233 { 234 if (shutdown.get()) 235 { 236 break; 237 } 238 239 SelectionKey key = i.next(); 240 241 if (!key.isValid()) 242 { 243 continue; 244 } 245 246 if (key.isReadable()) 247 { 248 cnt.incrementAndGet(); 249 log.debug( "{0} messages received.", this::getCnt ); 250 251 DatagramChannel mc = (DatagramChannel) key.channel(); 252 253 ByteBuffer byteBuffer = ByteBuffer.allocate(65536); 254 InetSocketAddress sourceAddress = 255 (InetSocketAddress) mc.receive(byteBuffer); 256 byteBuffer.flip(); 257 258 try 259 { 260 log.debug("Received packet from address [{0}]", sourceAddress); 261 byte[] bytes = new byte[byteBuffer.limit()]; 262 byteBuffer.get(bytes); 263 Object obj = serializer.deSerialize(bytes, null); 264 265 if (obj instanceof UDPDiscoveryMessage) 266 { 267 // Ensure that the address we're supposed to send to is, indeed, the address 268 // of the machine on the other end of this connection. This guards against 269 // instances where we don't exactly get the right local host address 270 final UDPDiscoveryMessage msg = (UDPDiscoveryMessage) obj; 271 msg.setHost(sourceAddress.getHostString()); 272 273 log.debug( "Read object from address [{0}], object=[{1}]", 274 sourceAddress, obj ); 275 276 // Just to keep the functionality of the deprecated waitForMessage method 277 synchronized (msgQueue) 278 { 279 // Check if queue full already? 280 if (msgQueue.remainingCapacity() == 0) 281 { 282 // remove oldest element from queue 283 msgQueue.remove(); 284 } 285 286 msgQueue.add(msg); 287 } 288 289 pooledExecutor.execute(() -> handleMessage(msg)); 290 log.debug( "Passed handler to executor." ); 291 } 292 } 293 catch ( final IOException | ClassNotFoundException e ) 294 { 295 log.error( "Error receiving multicast packet", e ); 296 } 297 298 i.remove(); 299 } 300 } 301 } // end while 302 } 303 catch ( final IOException e ) 304 { 305 log.error( "Unexpected exception in UDP receiver.", e ); 306 } 307 } 308 309 /** 310 * @param cnt The cnt to set. 311 */ 312 public void setCnt( final int cnt ) 313 { 314 this.cnt.set(cnt); 315 } 316 317 /** 318 * @return Returns the cnt. 319 */ 320 public int getCnt() 321 { 322 return cnt.get(); 323 } 324 325 /** 326 * For testing 327 * 328 * @param serializer the serializer to set 329 * @since 3.1 330 */ 331 protected void setSerializer(IElementSerializer serializer) 332 { 333 this.serializer = serializer; 334 } 335 336 /** 337 * Separate thread run when a command comes into the UDPDiscoveryReceiver. 338 * @deprecated No longer used 339 */ 340 @Deprecated 341 public class MessageHandler 342 implements Runnable 343 { 344 /** The message to handle. Passed in during construction. */ 345 private final UDPDiscoveryMessage message; 346 347 /** 348 * @param message 349 */ 350 public MessageHandler( final UDPDiscoveryMessage message ) 351 { 352 this.message = message; 353 } 354 355 /** 356 * Process the message. 357 */ 358 @Override 359 public void run() 360 { 361 handleMessage(message); 362 } 363 } 364 365 /** 366 * Separate thread run when a command comes into the UDPDiscoveryReceiver. 367 */ 368 private void handleMessage(UDPDiscoveryMessage message) 369 { 370 // consider comparing ports here instead. 371 if ( message.getRequesterId() == CacheInfo.listenerId ) 372 { 373 log.debug( "Ignoring message sent from self" ); 374 } 375 else 376 { 377 log.debug( "Process message sent from another" ); 378 log.debug( "Message = {0}", message ); 379 380 if ( message.getHost() == null || message.getCacheNames() == null || message.getCacheNames().isEmpty() ) 381 { 382 log.debug( "Ignoring invalid message: {0}", message ); 383 } 384 else 385 { 386 processMessage(message); 387 } 388 } 389 } 390 391 /** 392 * Process the incoming message. 393 */ 394 private void processMessage(UDPDiscoveryMessage message) 395 { 396 final DiscoveredService discoveredService = new DiscoveredService(message); 397 398 switch (message.getMessageType()) 399 { 400 case REMOVE: 401 log.debug( "Removing service from set {0}", discoveredService ); 402 service.removeDiscoveredService( discoveredService ); 403 break; 404 case REQUEST: 405 // if this is a request message, have the service handle it and 406 // return 407 log.debug( "Message is a Request Broadcast, will have the service handle it." ); 408 service.serviceRequestBroadcast(); 409 break; 410 case PASSIVE: 411 default: 412 log.debug( "Adding or updating service to set {0}", discoveredService ); 413 service.addOrUpdateService( discoveredService ); 414 break; 415 } 416 } 417 418 /** Shuts down the socket. */ 419 @Override 420 public void shutdown() 421 { 422 if (shutdown.compareAndSet(false, true)) 423 { 424 try 425 { 426 selector.close(); 427 multicastGroupKey.drop(); 428 multicastChannel.close(); 429 } 430 catch ( final IOException e ) 431 { 432 log.error( "Problem closing socket" ); 433 } 434 } 435 } 436}