1 package org.apache.jcs.auxiliary.lateral.socket.tcp.discovery;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.ByteArrayInputStream;
23 import java.io.IOException;
24 import java.io.ObjectInputStream;
25 import java.net.DatagramPacket;
26 import java.net.InetAddress;
27 import java.net.MulticastSocket;
28 import java.util.ArrayList;
29 import java.util.Iterator;
30
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.apache.jcs.auxiliary.lateral.LateralCacheAttributes;
34 import org.apache.jcs.auxiliary.lateral.LateralCacheInfo;
35 import org.apache.jcs.auxiliary.lateral.LateralCacheNoWait;
36 import org.apache.jcs.auxiliary.lateral.socket.tcp.LateralTCPCacheManager;
37 import org.apache.jcs.auxiliary.lateral.socket.tcp.TCPLateralCacheAttributes;
38 import org.apache.jcs.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
39 import org.apache.jcs.engine.behavior.ICache;
40 import org.apache.jcs.engine.behavior.ICompositeCacheManager;
41 import org.apache.jcs.engine.behavior.IElementSerializer;
42 import org.apache.jcs.engine.behavior.IShutdownObserver;
43 import org.apache.jcs.engine.logging.behavior.ICacheEventLogger;
44
45 import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer;
46 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
47 import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
48
49 /*** Receives UDP Discovery messages. */
50 public class UDPDiscoveryReceiver
51 implements Runnable, IShutdownObserver
52 {
53 /*** The log factory */
54 private final static Log log = LogFactory.getLog( UDPDiscoveryReceiver.class );
55
56 /*** buffer */
57 private final byte[] m_buffer = new byte[65536];
58
59 /*** The socket used for communication. */
60 private MulticastSocket m_socket;
61
62 /***
63 * TODO: Consider using the threadpool manager to get this thread pool. For now place a tight
64 * restriction on the pool size
65 */
66 private static final int maxPoolSize = 10;
67
68 /*** The processor */
69 private PooledExecutor pooledExecutor = null;
70
71 /*** number of messages received. For debugging and testing. */
72 private int cnt = 0;
73
74 /*** Service to get cache names and hande request broadcasts */
75 protected UDPDiscoveryService service = null;
76
77 /*** Address */
78 private String multicastAddressString = "";
79
80 /*** The port */
81 private int multicastPort = 0;
82
83 /*** The cache manager. */
84 private ICompositeCacheManager cacheMgr;
85
86 /*** Is it shutdown. */
87 private boolean shutdown = false;
88
89 /*** The event logger. */
90 protected ICacheEventLogger cacheEventLogger;
91
92 /*** The serializer. */
93 protected IElementSerializer elementSerializer;
94
95 /***
96 * Constructor for the LateralUDPReceiver object.
97 * <p>
98 * We determine out own host using InetAddress
99 *<p>
100 * @param service
101 * @param multicastAddressString
102 * @param multicastPort
103 * @param cacheMgr
104 * @param cacheEventLogger
105 * @param elementSerializer
106 * @exception IOException
107 */
108 public UDPDiscoveryReceiver( UDPDiscoveryService service, String multicastAddressString, int multicastPort,
109 ICompositeCacheManager cacheMgr, ICacheEventLogger cacheEventLogger,
110 IElementSerializer elementSerializer )
111 throws IOException
112 {
113 this.service = service;
114 this.multicastAddressString = multicastAddressString;
115 this.multicastPort = multicastPort;
116 this.cacheMgr = cacheMgr;
117 this.cacheEventLogger = cacheEventLogger;
118 this.elementSerializer = elementSerializer;
119
120
121 pooledExecutor = new PooledExecutor( new BoundedBuffer( 100 ), maxPoolSize );
122 pooledExecutor.discardOldestWhenBlocked();
123
124 pooledExecutor.setThreadFactory( new MyThreadFactory() );
125
126 if ( log.isInfoEnabled() )
127 {
128 log.info( "constructing listener, [" + this.multicastAddressString + ":" + this.multicastPort + "]" );
129 }
130
131 try
132 {
133 createSocket( this.multicastAddressString, this.multicastPort );
134 }
135 catch ( IOException ioe )
136 {
137
138
139 throw ioe;
140 }
141 }
142
143 /***
144 * Creates the socket for this class.
145 * @param multicastAddressString
146 * @param multicastPort
147 * @throws IOException
148 */
149 private void createSocket( String multicastAddressString, int multicastPort )
150 throws IOException
151 {
152 try
153 {
154 m_socket = new MulticastSocket( multicastPort );
155 m_socket.joinGroup( InetAddress.getByName( multicastAddressString ) );
156 }
157 catch ( IOException e )
158 {
159 log.error( "Could not bind to multicast address [" + multicastAddressString + ":" + multicastPort + "]", e );
160 throw e;
161 }
162 }
163
164 /***
165 * Highly unreliable. If it is processing one message while another comes in , the second
166 * message is lost. This is for low concurency peppering.
167 * @return the object message
168 * @throws IOException
169 */
170 public Object waitForMessage()
171 throws IOException
172 {
173 final DatagramPacket packet = new DatagramPacket( m_buffer, m_buffer.length );
174
175 Object obj = null;
176 try
177 {
178 m_socket.receive( packet );
179
180 final ByteArrayInputStream byteStream = new ByteArrayInputStream( m_buffer, 0, packet.getLength() );
181
182 final ObjectInputStream objectStream = new ObjectInputStream( byteStream );
183
184 obj = objectStream.readObject();
185
186 }
187 catch ( Exception e )
188 {
189 log.error( "Error receving multicast packet", e );
190 }
191 return obj;
192 }
193
194 /*** Main processing method for the LateralUDPReceiver object */
195 public void run()
196 {
197 try
198 {
199 while ( !shutdown )
200 {
201 Object obj = waitForMessage();
202
203
204 cnt++;
205
206 if ( log.isDebugEnabled() )
207 {
208 log.debug( getCnt() + " messages received." );
209 }
210
211 UDPDiscoveryMessage message = null;
212
213 try
214 {
215 message = (UDPDiscoveryMessage) obj;
216
217 if ( message != null )
218 {
219 MessageHandler handler = new MessageHandler( message );
220
221 pooledExecutor.execute( handler );
222
223 if ( log.isDebugEnabled() )
224 {
225 log.debug( "Passed handler to executor." );
226 }
227 }
228 else
229 {
230 log.warn( "message is null" );
231 }
232 }
233 catch ( ClassCastException cce )
234 {
235 log.warn( "Received unknown message type " + cce.getMessage() );
236 }
237 }
238 }
239 catch ( Exception e )
240 {
241 log.error( "Unexpected exception in UDP receiver.", e );
242 try
243 {
244 Thread.sleep( 100 );
245
246
247 }
248 catch ( Exception e2 )
249 {
250 log.error( "Problem sleeping", e2 );
251 }
252 }
253 return;
254 }
255
256 /***
257 * @param cnt The cnt to set.
258 */
259 public void setCnt( int cnt )
260 {
261 this.cnt = cnt;
262 }
263
264 /***
265 * @return Returns the cnt.
266 */
267 public int getCnt()
268 {
269 return cnt;
270 }
271
272 /***
273 * Separate thread run when a command comes into the UDPDiscoveryReceiver.
274 */
275 public class MessageHandler
276 implements Runnable
277 {
278 /*** The message to handle. Passed in during construction. */
279 private UDPDiscoveryMessage message = null;
280
281 /***
282 * @param message
283 */
284 public MessageHandler( UDPDiscoveryMessage message )
285 {
286 this.message = message;
287 }
288
289 /***
290 * Process the message.
291 */
292 public void run()
293 {
294
295 if ( message.getRequesterId() == LateralCacheInfo.listenerId )
296 {
297 if ( log.isDebugEnabled() )
298 {
299 log.debug( "from self" );
300 }
301 }
302 else
303 {
304 if ( log.isDebugEnabled() )
305 {
306 log.debug( "from another" );
307 log.debug( "Message = " + message );
308 }
309
310
311
312 if ( message.getMessageType() == UDPDiscoveryMessage.REQUEST_BROADCAST )
313 {
314 if ( log.isDebugEnabled() )
315 {
316 log.debug( "Message is a Request Broadcase, will have the service handle it." );
317 }
318 service.serviceRequestBroadcast();
319 return;
320 }
321
322 try
323 {
324
325
326
327 ITCPLateralCacheAttributes lca = null;
328 if ( service.getTcpLateralCacheAttributes() != null )
329 {
330 lca = (ITCPLateralCacheAttributes) service.getTcpLateralCacheAttributes().copy();
331 }
332 else
333 {
334 lca = new TCPLateralCacheAttributes();
335 }
336 lca.setTransmissionType( LateralCacheAttributes.TCP );
337 lca.setTcpServer( message.getHost() + ":" + message.getPort() );
338 LateralTCPCacheManager lcm = LateralTCPCacheManager.getInstance( lca, cacheMgr, cacheEventLogger,
339 elementSerializer );
340
341 ArrayList regions = message.getCacheNames();
342 if ( regions != null )
343 {
344
345 Iterator it = regions.iterator();
346 while ( it.hasNext() )
347 {
348 String cacheName = (String) it.next();
349
350 try
351 {
352 ICache ic = lcm.getCache( cacheName );
353
354 if ( log.isDebugEnabled() )
355 {
356 log.debug( "Got cache, ic = " + ic );
357 }
358
359
360 if ( ic != null )
361 {
362 service.addNoWait( (LateralCacheNoWait) ic );
363 if ( log.isDebugEnabled() )
364 {
365 log.debug( "Called addNoWait for cacheName " + cacheName );
366 }
367 }
368 }
369 catch ( Exception e )
370 {
371 log.error( "Problem creating no wait", e );
372 }
373 }
374
375 }
376 else
377 {
378 log.warn( "No cache names found in message " + message );
379 }
380 }
381 catch ( Exception e )
382 {
383 log.error( "Problem getting lateral maanger", e );
384 }
385 }
386 }
387 }
388
389 /***
390 * Allows us to set the daemon status on the executor threads
391 */
392 class MyThreadFactory
393 implements ThreadFactory
394 {
395 /***
396 * Sets the thread to daemon.
397 * <p>
398 * @param runner
399 * @return a daemon thread
400 */
401 public Thread newThread( Runnable runner )
402 {
403 Thread t = new Thread( runner );
404 String oldName = t.getName();
405 t.setName( "JCS-UDPDiscoveryReceiver-" + oldName );
406 t.setDaemon( true );
407 t.setPriority( Thread.MIN_PRIORITY );
408 return t;
409 }
410 }
411
412 /*** Shuts down the socket. */
413 public void shutdown()
414 {
415 try
416 {
417 shutdown = true;
418 m_socket.close();
419 pooledExecutor.shutdownNow();
420 }
421 catch ( Exception e )
422 {
423 log.error( "Problem closing socket" );
424 }
425 }
426 }
427