View Javadoc
1   package org.apache.commons.jcs.utils.discovery;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import org.apache.commons.jcs.engine.CacheInfo;
23  import org.apache.commons.jcs.utils.discovery.UDPDiscoveryMessage.BroadcastType;
24  import org.apache.commons.jcs.utils.serialization.StandardSerializer;
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  
28  import java.io.ByteArrayOutputStream;
29  import java.io.IOException;
30  import java.net.DatagramPacket;
31  import java.net.InetAddress;
32  import java.net.MulticastSocket;
33  import java.util.ArrayList;
34  
35  /**
36   * This is a generic sender for the UDPDiscovery process.
37   * <p>
38   * @author Aaron Smuts
39   */
40  public class UDPDiscoverySender
41  {
42      /** The logger. */
43      private static final Log log = LogFactory.getLog( UDPDiscoverySender.class );
44  
45      /** The socket */
46      private MulticastSocket localSocket;
47  
48      /** The address */
49      private InetAddress multicastAddress;
50  
51      /** The port */
52      private final int multicastPort;
53  
54      /** Used to serialize messages */
55      private final StandardSerializer serializer = new StandardSerializer();
56  
57      /**
58       * Constructor for the UDPDiscoverySender object
59       * <p>
60       * This sender can be used to send multiple messages.
61       * <p>
62       * When you are done sending, you should destroy the socket sender.
63       * <p>
64       * @param host
65       * @param port
66       * @throws IOException
67       */
68      public UDPDiscoverySender( String host, int port )
69          throws IOException
70      {
71          try
72          {
73              if ( log.isDebugEnabled() )
74              {
75                  log.debug( "Constructing socket for sender on port [" + port + "]" );
76              }
77              localSocket = new MulticastSocket( port );
78  
79              // Remote address.
80              multicastAddress = InetAddress.getByName( host );
81          }
82          catch ( IOException e )
83          {
84              log.error( "Could not bind to multicast address [" + host + "]", e );
85  
86              throw e;
87          }
88  
89          multicastPort = port;
90      }
91  
92      /**
93       * Closes the socket connection.
94       */
95      public void destroy()
96      {
97          try
98          {
99              if ( this.localSocket != null && !this.localSocket.isClosed() )
100             {
101                 this.localSocket.close();
102             }
103         }
104         catch ( Exception e )
105         {
106             log.error( "Problem destrying sender", e );
107         }
108     }
109 
110     /**
111      * Just being careful about closing the socket.
112      * <p>
113      * @throws Throwable
114      */
115     @Override
116     protected void finalize()
117         throws Throwable
118     {
119         super.finalize();
120         destroy();
121     }
122 
123     /**
124      * Send messages.
125      * <p>
126      * @param message
127      * @throws IOException
128      */
129     public void send( UDPDiscoveryMessage message )
130         throws IOException
131     {
132         if ( this.localSocket == null )
133         {
134             throw new IOException( "Socket is null, cannot send message." );
135         }
136 
137         if ( this.localSocket.isClosed() )
138         {
139             throw new IOException( "Socket is closed, cannot send message." );
140         }
141 
142         if ( log.isDebugEnabled() )
143         {
144             log.debug( "sending UDPDiscoveryMessage, address [" + multicastAddress + "], port [" + multicastPort
145                 + "], message = " + message );
146         }
147 
148         try
149         {
150             final byte[] bytes = serializer.serialize( message );
151 
152             // put the byte array in a packet
153             final DatagramPacket packet = new DatagramPacket( bytes, bytes.length, multicastAddress, multicastPort );
154 
155             if ( log.isDebugEnabled() )
156             {
157                 log.debug( "Sending DatagramPacket. bytes.length [" + bytes.length + "] to " + multicastAddress + ":"
158                     + multicastPort );
159             }
160 
161             localSocket.send( packet );
162         }
163         catch ( IOException e )
164         {
165             log.error( "Error sending message", e );
166             throw e;
167         }
168     }
169 
170     /**
171      * Ask other to broadcast their info the the multicast address. If a lateral is non receiving it
172      * can use this. This is also called on startup so we can get info.
173      * <p>
174      * @throws IOException
175      */
176     public void requestBroadcast()
177         throws IOException
178     {
179         if ( log.isDebugEnabled() )
180         {
181             log.debug( "sending requestBroadcast " );
182         }
183 
184         UDPDiscoveryMessage message = new UDPDiscoveryMessage();
185         message.setRequesterId( CacheInfo.listenerId );
186         message.setMessageType( BroadcastType.REQUEST );
187         send( message );
188     }
189 
190     /**
191      * This sends a message broadcasting out that the host and port is available for connections.
192      * <p>
193      * It uses the vmid as the requesterDI
194      * @param host
195      * @param port
196      * @param cacheNames
197      * @throws IOException
198      */
199     public void passiveBroadcast( String host, int port, ArrayList<String> cacheNames )
200         throws IOException
201     {
202         passiveBroadcast( host, port, cacheNames, CacheInfo.listenerId );
203     }
204 
205     /**
206      * This allows you to set the sender id. This is mainly for testing.
207      * <p>
208      * @param host
209      * @param port
210      * @param cacheNames names of the cache regions
211      * @param listenerId
212      * @throws IOException
213      */
214     protected void passiveBroadcast( String host, int port, ArrayList<String> cacheNames, long listenerId )
215         throws IOException
216     {
217         if ( log.isDebugEnabled() )
218         {
219             log.debug( "sending passiveBroadcast " );
220         }
221 
222         UDPDiscoveryMessage message = new UDPDiscoveryMessage();
223         message.setHost( host );
224         message.setPort( port );
225         message.setCacheNames( cacheNames );
226         message.setRequesterId( listenerId );
227         message.setMessageType( BroadcastType.PASSIVE );
228         send( message );
229     }
230 
231     /**
232      * This sends a message broadcasting our that the host and port is no longer available.
233      * <p>
234      * It uses the vmid as the requesterID
235      * <p>
236      * @param host host
237      * @param port port
238      * @param cacheNames names of the cache regions
239      * @throws IOException on error
240      */
241     public void removeBroadcast( String host, int port, ArrayList<String> cacheNames )
242         throws IOException
243     {
244         removeBroadcast( host, port, cacheNames, CacheInfo.listenerId );
245     }
246 
247     /**
248      * This allows you to set the sender id. This is mainly for testing.
249      * <p>
250      * @param host host
251      * @param port port
252      * @param cacheNames names of the cache regions
253      * @param listenerId listener ID
254      * @throws IOException on error
255      */
256     protected void removeBroadcast( String host, int port, ArrayList<String> cacheNames, long listenerId )
257         throws IOException
258     {
259         if ( log.isDebugEnabled() )
260         {
261             log.debug( "sending removeBroadcast " );
262         }
263 
264         UDPDiscoveryMessage message = new UDPDiscoveryMessage();
265         message.setHost( host );
266         message.setPort( port );
267         message.setCacheNames( cacheNames );
268         message.setRequesterId( listenerId );
269         message.setMessageType( BroadcastType.REMOVE );
270         send( message );
271     }
272 }
273 
274 /**
275  * This allows us to get the byte array from an output stream.
276  * <p>
277  * @author asmuts
278  * @created January 15, 2002
279  */
280 
281 class MyByteArrayOutputStream
282     extends ByteArrayOutputStream
283 {
284     /**
285      * Gets the bytes attribute of the MyByteArrayOutputStream object
286      * @return The bytes value
287      */
288     public byte[] getBytes()
289     {
290         return buf;
291     }
292 }