001package org.apache.commons.jcs.utils.discovery; 002 003/* 004 * Licensed to the Apache Software Foundation (ASF) under one 005 * or more contributor license agreements. See the NOTICE file 006 * distributed with this work for additional information 007 * regarding copyright ownership. The ASF licenses this file 008 * to you under the Apache License, Version 2.0 (the 009 * "License"); you may not use this file except in compliance 010 * with the License. You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, 015 * software distributed under the License is distributed on an 016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 017 * KIND, either express or implied. See the License for the 018 * specific language governing permissions and limitations 019 * under the License. 020 */ 021 022import java.io.ByteArrayInputStream; 023import java.io.IOException; 024import java.io.ObjectInputStream; 025import java.net.DatagramPacket; 026import java.net.InetAddress; 027import java.net.MulticastSocket; 028import java.util.concurrent.Executors; 029import java.util.concurrent.ThreadPoolExecutor; 030 031import org.apache.commons.jcs.engine.CacheInfo; 032import org.apache.commons.jcs.engine.behavior.IShutdownObserver; 033import org.apache.commons.jcs.io.ObjectInputStreamClassLoaderAware; 034import org.apache.commons.jcs.utils.discovery.UDPDiscoveryMessage.BroadcastType; 035import org.apache.commons.jcs.utils.threadpool.DaemonThreadFactory; 036import org.apache.commons.logging.Log; 037import org.apache.commons.logging.LogFactory; 038 039/** Receives UDP Discovery messages. */ 040public class UDPDiscoveryReceiver 041 implements Runnable, IShutdownObserver 042{ 043 /** The log factory */ 044 private static final Log log = LogFactory.getLog( UDPDiscoveryReceiver.class ); 045 046 /** buffer */ 047 private final byte[] mBuffer = new byte[65536]; 048 049 /** The socket used for communication. */ 050 private MulticastSocket mSocket; 051 052 /** 053 * TODO: Consider using the threadpool manager to get this thread pool. For now place a tight 054 * restriction on the pool size 055 */ 056 private static final int maxPoolSize = 2; 057 058 /** The processor */ 059 private ThreadPoolExecutor pooledExecutor = null; 060 061 /** number of messages received. For debugging and testing. */ 062 private int cnt = 0; 063 064 /** Service to get cache names and handle request broadcasts */ 065 private UDPDiscoveryService service = null; 066 067 /** Address */ 068 private String multicastAddressString = ""; 069 070 /** The port */ 071 private int multicastPort = 0; 072 073 /** Is it shutdown. */ 074 private boolean shutdown = false; 075 076 /** 077 * Constructor for the LateralUDPReceiver object. 078 * <p> 079 * We determine out own host using InetAddress 080 *<p> 081 * @param service 082 * @param multicastAddressString 083 * @param multicastPort 084 * @throws IOException 085 */ 086 public UDPDiscoveryReceiver( UDPDiscoveryService service, String multicastAddressString, int multicastPort ) 087 throws IOException 088 { 089 this.service = service; 090 this.multicastAddressString = multicastAddressString; 091 this.multicastPort = multicastPort; 092 093 // create a small thread pool to handle a barrage 094 pooledExecutor = (ThreadPoolExecutor)Executors.newFixedThreadPool(maxPoolSize, 095 new DaemonThreadFactory("JCS-UDPDiscoveryReceiver-", Thread.MIN_PRIORITY)); 096 pooledExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy()); 097 //pooledExecutor.setMinimumPoolSize(1); 098 099 if ( log.isInfoEnabled() ) 100 { 101 log.info( "Constructing listener, [" + this.multicastAddressString + ":" + this.multicastPort + "]" ); 102 } 103 104 try 105 { 106 createSocket( this.multicastAddressString, this.multicastPort ); 107 } 108 catch ( IOException ioe ) 109 { 110 // consider eating this so we can go on, or constructing the socket 111 // later 112 throw ioe; 113 } 114 } 115 116 /** 117 * Creates the socket for this class. 118 * <p> 119 * @param multicastAddressString 120 * @param multicastPort 121 * @throws IOException 122 */ 123 private void createSocket( String multicastAddressString, int multicastPort ) 124 throws IOException 125 { 126 try 127 { 128 mSocket = new MulticastSocket( multicastPort ); 129 if ( log.isInfoEnabled() ) 130 { 131 log.info( "Joining Group: [" + InetAddress.getByName( multicastAddressString ) + "]" ); 132 } 133 mSocket.joinGroup( InetAddress.getByName( multicastAddressString ) ); 134 } 135 catch ( IOException e ) 136 { 137 log.error( "Could not bind to multicast address [" + InetAddress.getByName( multicastAddressString ) + ":" + multicastPort + "]", e ); 138 throw e; 139 } 140 } 141 142 /** 143 * Highly unreliable. If it is processing one message while another comes in, the second 144 * message is lost. This is for low concurrency peppering. 145 * <p> 146 * @return the object message 147 * @throws IOException 148 */ 149 public Object waitForMessage() 150 throws IOException 151 { 152 final DatagramPacket packet = new DatagramPacket( mBuffer, mBuffer.length ); 153 ObjectInputStream objectStream = null; 154 Object obj = null; 155 try 156 { 157 if ( log.isDebugEnabled() ) 158 { 159 log.debug( "Waiting for message." ); 160 } 161 162 mSocket.receive( packet ); 163 164 if ( log.isDebugEnabled() ) 165 { 166 log.debug( "Received packet from address [" + packet.getSocketAddress() + "]" ); 167 } 168 169 final ByteArrayInputStream byteStream = new ByteArrayInputStream( mBuffer, 0, packet.getLength() ); 170 objectStream = new ObjectInputStreamClassLoaderAware( byteStream, null ); 171 obj = objectStream.readObject(); 172 173 if ( obj instanceof UDPDiscoveryMessage ) 174 { 175 // Ensure that the address we're supposed to send to is, indeed, the address 176 // of the machine on the other end of this connection. This guards against 177 // instances where we don't exactly get the right local host address 178 UDPDiscoveryMessage msg = (UDPDiscoveryMessage) obj; 179 msg.setHost(packet.getAddress().getHostAddress()); 180 181 if ( log.isDebugEnabled() ) 182 { 183 log.debug( "Read object from address [" + packet.getSocketAddress() + "], object=[" + obj + "]" ); 184 } 185 } 186 } 187 catch ( Exception e ) 188 { 189 log.error( "Error receiving multicast packet", e ); 190 } 191 finally 192 { 193 if (objectStream != null) 194 { 195 try 196 { 197 objectStream.close(); 198 } 199 catch (IOException e) 200 { 201 log.error( "Error closing object stream", e ); 202 } 203 } 204 } 205 return obj; 206 } 207 208 /** Main processing method for the LateralUDPReceiver object */ 209 @Override 210 public void run() 211 { 212 try 213 { 214 while ( !shutdown ) 215 { 216 Object obj = waitForMessage(); 217 218 // not thread safe, but just for debugging 219 cnt++; 220 221 if ( log.isDebugEnabled() ) 222 { 223 log.debug( getCnt() + " messages received." ); 224 } 225 226 UDPDiscoveryMessage message = null; 227 228 try 229 { 230 message = (UDPDiscoveryMessage) obj; 231 // check for null 232 if ( message != null ) 233 { 234 MessageHandler handler = new MessageHandler( message ); 235 236 pooledExecutor.execute( handler ); 237 238 if ( log.isDebugEnabled() ) 239 { 240 log.debug( "Passed handler to executor." ); 241 } 242 } 243 else 244 { 245 log.warn( "message is null" ); 246 } 247 } 248 catch ( ClassCastException cce ) 249 { 250 log.warn( "Received unknown message type " + cce.getMessage() ); 251 } 252 } // end while 253 } 254 catch ( Exception e ) 255 { 256 log.error( "Unexpected exception in UDP receiver.", e ); 257 try 258 { 259 Thread.sleep( 100 ); 260 // TODO consider some failure count so we don't do this 261 // forever. 262 } 263 catch ( Exception e2 ) 264 { 265 log.error( "Problem sleeping", e2 ); 266 } 267 } 268 } 269 270 /** 271 * @param cnt The cnt to set. 272 */ 273 public void setCnt( int cnt ) 274 { 275 this.cnt = cnt; 276 } 277 278 /** 279 * @return Returns the cnt. 280 */ 281 public int getCnt() 282 { 283 return cnt; 284 } 285 286 /** 287 * Separate thread run when a command comes into the UDPDiscoveryReceiver. 288 */ 289 public class MessageHandler 290 implements Runnable 291 { 292 /** The message to handle. Passed in during construction. */ 293 private UDPDiscoveryMessage message = null; 294 295 /** 296 * @param message 297 */ 298 public MessageHandler( UDPDiscoveryMessage message ) 299 { 300 this.message = message; 301 } 302 303 /** 304 * Process the message. 305 */ 306 @SuppressWarnings("synthetic-access") 307 @Override 308 public void run() 309 { 310 // consider comparing ports here instead. 311 if ( message.getRequesterId() == CacheInfo.listenerId ) 312 { 313 if ( log.isDebugEnabled() ) 314 { 315 log.debug( "Ignoring message sent from self" ); 316 } 317 } 318 else 319 { 320 if ( log.isDebugEnabled() ) 321 { 322 log.debug( "Process message sent from another" ); 323 log.debug( "Message = " + message ); 324 } 325 326 if ( message.getHost() == null || message.getCacheNames() == null || message.getCacheNames().isEmpty() ) 327 { 328 if ( log.isDebugEnabled() ) 329 { 330 log.debug( "Ignoring invalid message: " + message ); 331 } 332 } 333 else 334 { 335 processMessage(); 336 } 337 } 338 } 339 340 /** 341 * Process the incoming message. 342 */ 343 @SuppressWarnings("synthetic-access") 344 private void processMessage() 345 { 346 DiscoveredService discoveredService = new DiscoveredService(); 347 discoveredService.setServiceAddress( message.getHost() ); 348 discoveredService.setCacheNames( message.getCacheNames() ); 349 discoveredService.setServicePort( message.getPort() ); 350 discoveredService.setLastHearFromTime( System.currentTimeMillis() ); 351 352 // if this is a request message, have the service handle it and 353 // return 354 if ( message.getMessageType() == BroadcastType.REQUEST ) 355 { 356 if ( log.isDebugEnabled() ) 357 { 358 log.debug( "Message is a Request Broadcast, will have the service handle it." ); 359 } 360 service.serviceRequestBroadcast(); 361 return; 362 } 363 else if ( message.getMessageType() == BroadcastType.REMOVE ) 364 { 365 if ( log.isDebugEnabled() ) 366 { 367 log.debug( "Removing service from set " + discoveredService ); 368 } 369 service.removeDiscoveredService( discoveredService ); 370 } 371 else 372 { 373 service.addOrUpdateService( discoveredService ); 374 } 375 } 376 } 377 378 /** Shuts down the socket. */ 379 @Override 380 public void shutdown() 381 { 382 try 383 { 384 shutdown = true; 385 mSocket.leaveGroup( InetAddress.getByName( multicastAddressString ) ); 386 mSocket.close(); 387 pooledExecutor.shutdownNow(); 388 } 389 catch ( IOException e ) 390 { 391 log.error( "Problem closing socket" ); 392 } 393 } 394}