View Javadoc

1   package org.apache.jcs.auxiliary.lateral.socket.tcp.discovery;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.ByteArrayInputStream;
23  import java.io.IOException;
24  import java.io.ObjectInputStream;
25  import java.net.DatagramPacket;
26  import java.net.InetAddress;
27  import java.net.MulticastSocket;
28  import java.util.ArrayList;
29  import java.util.Iterator;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.jcs.auxiliary.lateral.LateralCacheAttributes;
34  import org.apache.jcs.auxiliary.lateral.LateralCacheInfo;
35  import org.apache.jcs.auxiliary.lateral.LateralCacheNoWait;
36  import org.apache.jcs.auxiliary.lateral.socket.tcp.LateralTCPCacheManager;
37  import org.apache.jcs.auxiliary.lateral.socket.tcp.TCPLateralCacheAttributes;
38  import org.apache.jcs.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
39  import org.apache.jcs.engine.behavior.ICache;
40  import org.apache.jcs.engine.behavior.ICompositeCacheManager;
41  import org.apache.jcs.engine.behavior.IElementSerializer;
42  import org.apache.jcs.engine.behavior.IShutdownObserver;
43  import org.apache.jcs.engine.logging.behavior.ICacheEventLogger;
44  
45  import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer;
46  import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
47  import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
48  
49  /*** Receives UDP Discovery messages. */
50  public class UDPDiscoveryReceiver
51      implements Runnable, IShutdownObserver
52  {
53      /*** The log factory */
54      private final static Log log = LogFactory.getLog( UDPDiscoveryReceiver.class );
55  
56      /*** buffer */
57      private final byte[] m_buffer = new byte[65536];
58  
59      /*** The socket used for communication. */
60      private MulticastSocket m_socket;
61  
62      /***
63       * TODO: Consider using the threadpool manager to get this thread pool. For now place a tight
64       * restriction on the pool size
65       */
66      private static final int maxPoolSize = 10;
67  
68      /*** The processor */
69      private PooledExecutor pooledExecutor = null;
70  
71      /*** number of messages received. For debugging and testing. */
72      private int cnt = 0;
73  
74      /*** Service to get cache names and hande request broadcasts */
75      protected UDPDiscoveryService service = null;
76  
77      /*** Address */
78      private String multicastAddressString = "";
79  
80      /*** The port */
81      private int multicastPort = 0;
82  
83      /*** The cache manager. */
84      private ICompositeCacheManager cacheMgr;
85  
86      /*** Is it shutdown. */
87      private boolean shutdown = false;
88  
89      /*** The event logger. */
90      protected ICacheEventLogger cacheEventLogger;
91  
92      /*** The serializer. */
93      protected IElementSerializer elementSerializer;
94  
95      /***
96       * Constructor for the LateralUDPReceiver object.
97       * <p>
98       * We determine out own host using InetAddress
99       *<p>
100      * @param service
101      * @param multicastAddressString
102      * @param multicastPort
103      * @param cacheMgr
104      * @param cacheEventLogger
105      * @param elementSerializer
106      * @exception IOException
107      */
108     public UDPDiscoveryReceiver( UDPDiscoveryService service, String multicastAddressString, int multicastPort,
109                                  ICompositeCacheManager cacheMgr, ICacheEventLogger cacheEventLogger,
110                                  IElementSerializer elementSerializer )
111         throws IOException
112     {
113         this.service = service;
114         this.multicastAddressString = multicastAddressString;
115         this.multicastPort = multicastPort;
116         this.cacheMgr = cacheMgr;
117         this.cacheEventLogger = cacheEventLogger;
118         this.elementSerializer = elementSerializer;
119 
120         // create a small thread pool to handle a barage
121         pooledExecutor = new PooledExecutor( new BoundedBuffer( 100 ), maxPoolSize );
122         pooledExecutor.discardOldestWhenBlocked();
123         //pooledExecutor.setMinimumPoolSize(1);
124         pooledExecutor.setThreadFactory( new MyThreadFactory() );
125 
126         if ( log.isInfoEnabled() )
127         {
128             log.info( "constructing listener, [" + this.multicastAddressString + ":" + this.multicastPort + "]" );
129         }
130 
131         try
132         {
133             createSocket( this.multicastAddressString, this.multicastPort );
134         }
135         catch ( IOException ioe )
136         {
137             // consider eatign this so we can go on, or constructing the socket
138             // later
139             throw ioe;
140         }
141     }
142 
143     /***
144      * Creates the socket for this class.
145      * @param multicastAddressString
146      * @param multicastPort
147      * @throws IOException
148      */
149     private void createSocket( String multicastAddressString, int multicastPort )
150         throws IOException
151     {
152         try
153         {
154             m_socket = new MulticastSocket( multicastPort );
155             m_socket.joinGroup( InetAddress.getByName( multicastAddressString ) );
156         }
157         catch ( IOException e )
158         {
159             log.error( "Could not bind to multicast address [" + multicastAddressString + ":" + multicastPort + "]", e );
160             throw e;
161         }
162     }
163 
164     /***
165      * Highly unreliable. If it is processing one message while another comes in , the second
166      * message is lost. This is for low concurency peppering.
167      * @return the object message
168      * @throws IOException
169      */
170     public Object waitForMessage()
171         throws IOException
172     {
173         final DatagramPacket packet = new DatagramPacket( m_buffer, m_buffer.length );
174 
175         Object obj = null;
176         try
177         {
178             m_socket.receive( packet );
179 
180             final ByteArrayInputStream byteStream = new ByteArrayInputStream( m_buffer, 0, packet.getLength() );
181 
182             final ObjectInputStream objectStream = new ObjectInputStream( byteStream );
183 
184             obj = objectStream.readObject();
185 
186         }
187         catch ( Exception e )
188         {
189             log.error( "Error receving multicast packet", e );
190         }
191         return obj;
192     }
193 
194     /*** Main processing method for the LateralUDPReceiver object */
195     public void run()
196     {
197         try
198         {
199             while ( !shutdown )
200             {
201                 Object obj = waitForMessage();
202 
203                 // not thread safe, but just for debugging
204                 cnt++;
205 
206                 if ( log.isDebugEnabled() )
207                 {
208                     log.debug( getCnt() + " messages received." );
209                 }
210 
211                 UDPDiscoveryMessage message = null;
212 
213                 try
214                 {
215                     message = (UDPDiscoveryMessage) obj;
216                     // check for null
217                     if ( message != null )
218                     {
219                         MessageHandler handler = new MessageHandler( message );
220 
221                         pooledExecutor.execute( handler );
222 
223                         if ( log.isDebugEnabled() )
224                         {
225                             log.debug( "Passed handler to executor." );
226                         }
227                     }
228                     else
229                     {
230                         log.warn( "message is null" );
231                     }
232                 }
233                 catch ( ClassCastException cce )
234                 {
235                     log.warn( "Received unknown message type " + cce.getMessage() );
236                 }
237             } // end while
238         }
239         catch ( Exception e )
240         {
241             log.error( "Unexpected exception in UDP receiver.", e );
242             try
243             {
244                 Thread.sleep( 100 );
245                 // TODO consider some failure count so we don't do this
246                 // forever.
247             }
248             catch ( Exception e2 )
249             {
250                 log.error( "Problem sleeping", e2 );
251             }
252         }
253         return;
254     }
255 
256     /***
257      * @param cnt The cnt to set.
258      */
259     public void setCnt( int cnt )
260     {
261         this.cnt = cnt;
262     }
263 
264     /***
265      * @return Returns the cnt.
266      */
267     public int getCnt()
268     {
269         return cnt;
270     }
271 
272     /***
273      * Separate thread run when a command comes into the UDPDiscoveryReceiver.
274      */
275     public class MessageHandler
276         implements Runnable
277     {
278         /*** The message to handle.  Passed in during construction. */
279         private UDPDiscoveryMessage message = null;
280 
281         /***
282          * @param message
283          */
284         public MessageHandler( UDPDiscoveryMessage message )
285         {
286             this.message = message;
287         }
288 
289         /***
290          * Process the message.
291          */
292         public void run()
293         {
294             // consider comparing ports here instead.
295             if ( message.getRequesterId() == LateralCacheInfo.listenerId )
296             {
297                 if ( log.isDebugEnabled() )
298                 {
299                     log.debug( "from self" );
300                 }
301             }
302             else
303             {
304                 if ( log.isDebugEnabled() )
305                 {
306                     log.debug( "from another" );
307                     log.debug( "Message = " + message );
308                 }
309 
310                 // if this is a request message, have the service handle it and
311                 // return
312                 if ( message.getMessageType() == UDPDiscoveryMessage.REQUEST_BROADCAST )
313                 {
314                     if ( log.isDebugEnabled() )
315                     {
316                         log.debug( "Message is a Request Broadcase, will have the service handle it." );
317                     }
318                     service.serviceRequestBroadcast();
319                     return;
320                 }
321 
322                 try
323                 {
324                     // get a cache and add it to the no waits
325                     // the add method should not add the same.
326                     // we need the listener port from the original config.
327                     ITCPLateralCacheAttributes lca = null;
328                     if ( service.getTcpLateralCacheAttributes() != null )
329                     {
330                         lca = (ITCPLateralCacheAttributes) service.getTcpLateralCacheAttributes().copy();
331                     }
332                     else
333                     {
334                         lca = new TCPLateralCacheAttributes();
335                     }
336                     lca.setTransmissionType( LateralCacheAttributes.TCP );
337                     lca.setTcpServer( message.getHost() + ":" + message.getPort() );
338                     LateralTCPCacheManager lcm = LateralTCPCacheManager.getInstance( lca, cacheMgr, cacheEventLogger,
339                                                                                      elementSerializer );
340 
341                     ArrayList regions = message.getCacheNames();
342                     if ( regions != null )
343                     {
344                         // for each region get the cache
345                         Iterator it = regions.iterator();
346                         while ( it.hasNext() )
347                         {
348                             String cacheName = (String) it.next();
349 
350                             try
351                             {
352                                 ICache ic = lcm.getCache( cacheName );
353 
354                                 if ( log.isDebugEnabled() )
355                                 {
356                                     log.debug( "Got cache, ic = " + ic );
357                                 }
358 
359                                 // add this to the nowaits for this cachename
360                                 if ( ic != null )
361                                 {
362                                     service.addNoWait( (LateralCacheNoWait) ic );
363                                     if ( log.isDebugEnabled() )
364                                     {
365                                         log.debug( "Called addNoWait for cacheName " + cacheName );
366                                     }
367                                 }
368                             }
369                             catch ( Exception e )
370                             {
371                                 log.error( "Problem creating no wait", e );
372                             }
373                         }
374                         // end while
375                     }
376                     else
377                     {
378                         log.warn( "No cache names found in message " + message );
379                     }
380                 }
381                 catch ( Exception e )
382                 {
383                     log.error( "Problem getting lateral maanger", e );
384                 }
385             }
386         }
387     }
388 
389     /***
390      * Allows us to set the daemon status on the executor threads
391      */
392     class MyThreadFactory
393         implements ThreadFactory
394     {
395         /***
396          * Sets the thread to daemon.
397          * <p>
398          * @param runner
399          * @return a daemon thread
400          */
401         public Thread newThread( Runnable runner )
402         {
403             Thread t = new Thread( runner );
404             String oldName = t.getName();
405             t.setName( "JCS-UDPDiscoveryReceiver-" + oldName );
406             t.setDaemon( true );
407             t.setPriority( Thread.MIN_PRIORITY );
408             return t;
409         }
410     }
411 
412     /*** Shuts down the socket. */
413     public void shutdown()
414     {
415         try
416         {
417             shutdown = true;
418             m_socket.close();
419             pooledExecutor.shutdownNow();
420         }
421         catch ( Exception e )
422         {
423             log.error( "Problem closing socket" );
424         }
425     }
426 }
427 // end class