001package org.apache.commons.jcs.utils.discovery;
002
003/*
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements.  See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership.  The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License.  You may obtain a copy of the License at
011 *
012 *   http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing,
015 * software distributed under the License is distributed on an
016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017 * KIND, either express or implied.  See the License for the
018 * specific language governing permissions and limitations
019 * under the License.
020 */
021
022import org.apache.commons.jcs.engine.CacheInfo;
023import org.apache.commons.jcs.utils.discovery.UDPDiscoveryMessage.BroadcastType;
024import org.apache.commons.jcs.utils.serialization.StandardSerializer;
025import org.apache.commons.logging.Log;
026import org.apache.commons.logging.LogFactory;
027
028import java.io.ByteArrayOutputStream;
029import java.io.IOException;
030import java.net.DatagramPacket;
031import java.net.InetAddress;
032import java.net.MulticastSocket;
033import java.util.ArrayList;
034
035/**
036 * This is a generic sender for the UDPDiscovery process.
037 * <p>
038 * @author Aaron Smuts
039 */
040public class UDPDiscoverySender
041{
042    /** The logger. */
043    private static final Log log = LogFactory.getLog( UDPDiscoverySender.class );
044
045    /** The socket */
046    private MulticastSocket localSocket;
047
048    /** The address */
049    private InetAddress multicastAddress;
050
051    /** The port */
052    private final int multicastPort;
053
054    /** Used to serialize messages */
055    private final StandardSerializer serializer = new StandardSerializer();
056
057    /**
058     * Constructor for the UDPDiscoverySender object
059     * <p>
060     * This sender can be used to send multiple messages.
061     * <p>
062     * When you are done sending, you should destroy the socket sender.
063     * <p>
064     * @param host
065     * @param port
066     * @throws IOException
067     */
068    public UDPDiscoverySender( String host, int port )
069        throws IOException
070    {
071        try
072        {
073            if ( log.isDebugEnabled() )
074            {
075                log.debug( "Constructing socket for sender on port [" + port + "]" );
076            }
077            localSocket = new MulticastSocket( port );
078
079            // Remote address.
080            multicastAddress = InetAddress.getByName( host );
081        }
082        catch ( IOException e )
083        {
084            log.error( "Could not bind to multicast address [" + host + "]", e );
085
086            throw e;
087        }
088
089        multicastPort = port;
090    }
091
092    /**
093     * Closes the socket connection.
094     */
095    public void destroy()
096    {
097        try
098        {
099            if ( this.localSocket != null && !this.localSocket.isClosed() )
100            {
101                this.localSocket.close();
102            }
103        }
104        catch ( Exception e )
105        {
106            log.error( "Problem destrying sender", e );
107        }
108    }
109
110    /**
111     * Just being careful about closing the socket.
112     * <p>
113     * @throws Throwable
114     */
115    @Override
116    protected void finalize()
117        throws Throwable
118    {
119        super.finalize();
120        destroy();
121    }
122
123    /**
124     * Send messages.
125     * <p>
126     * @param message
127     * @throws IOException
128     */
129    public void send( UDPDiscoveryMessage message )
130        throws IOException
131    {
132        if ( this.localSocket == null )
133        {
134            throw new IOException( "Socket is null, cannot send message." );
135        }
136
137        if ( this.localSocket.isClosed() )
138        {
139            throw new IOException( "Socket is closed, cannot send message." );
140        }
141
142        if ( log.isDebugEnabled() )
143        {
144            log.debug( "sending UDPDiscoveryMessage, address [" + multicastAddress + "], port [" + multicastPort
145                + "], message = " + message );
146        }
147
148        try
149        {
150            final byte[] bytes = serializer.serialize( message );
151
152            // put the byte array in a packet
153            final DatagramPacket packet = new DatagramPacket( bytes, bytes.length, multicastAddress, multicastPort );
154
155            if ( log.isDebugEnabled() )
156            {
157                log.debug( "Sending DatagramPacket. bytes.length [" + bytes.length + "] to " + multicastAddress + ":"
158                    + multicastPort );
159            }
160
161            localSocket.send( packet );
162        }
163        catch ( IOException e )
164        {
165            log.error( "Error sending message", e );
166            throw e;
167        }
168    }
169
170    /**
171     * Ask other to broadcast their info the the multicast address. If a lateral is non receiving it
172     * can use this. This is also called on startup so we can get info.
173     * <p>
174     * @throws IOException
175     */
176    public void requestBroadcast()
177        throws IOException
178    {
179        if ( log.isDebugEnabled() )
180        {
181            log.debug( "sending requestBroadcast " );
182        }
183
184        UDPDiscoveryMessage message = new UDPDiscoveryMessage();
185        message.setRequesterId( CacheInfo.listenerId );
186        message.setMessageType( BroadcastType.REQUEST );
187        send( message );
188    }
189
190    /**
191     * This sends a message broadcasting out that the host and port is available for connections.
192     * <p>
193     * It uses the vmid as the requesterDI
194     * @param host
195     * @param port
196     * @param cacheNames
197     * @throws IOException
198     */
199    public void passiveBroadcast( String host, int port, ArrayList<String> cacheNames )
200        throws IOException
201    {
202        passiveBroadcast( host, port, cacheNames, CacheInfo.listenerId );
203    }
204
205    /**
206     * This allows you to set the sender id. This is mainly for testing.
207     * <p>
208     * @param host
209     * @param port
210     * @param cacheNames names of the cache regions
211     * @param listenerId
212     * @throws IOException
213     */
214    protected void passiveBroadcast( String host, int port, ArrayList<String> cacheNames, long listenerId )
215        throws IOException
216    {
217        if ( log.isDebugEnabled() )
218        {
219            log.debug( "sending passiveBroadcast " );
220        }
221
222        UDPDiscoveryMessage message = new UDPDiscoveryMessage();
223        message.setHost( host );
224        message.setPort( port );
225        message.setCacheNames( cacheNames );
226        message.setRequesterId( listenerId );
227        message.setMessageType( BroadcastType.PASSIVE );
228        send( message );
229    }
230
231    /**
232     * This sends a message broadcasting our that the host and port is no longer available.
233     * <p>
234     * It uses the vmid as the requesterID
235     * <p>
236     * @param host host
237     * @param port port
238     * @param cacheNames names of the cache regions
239     * @throws IOException on error
240     */
241    public void removeBroadcast( String host, int port, ArrayList<String> cacheNames )
242        throws IOException
243    {
244        removeBroadcast( host, port, cacheNames, CacheInfo.listenerId );
245    }
246
247    /**
248     * This allows you to set the sender id. This is mainly for testing.
249     * <p>
250     * @param host host
251     * @param port port
252     * @param cacheNames names of the cache regions
253     * @param listenerId listener ID
254     * @throws IOException on error
255     */
256    protected void removeBroadcast( String host, int port, ArrayList<String> cacheNames, long listenerId )
257        throws IOException
258    {
259        if ( log.isDebugEnabled() )
260        {
261            log.debug( "sending removeBroadcast " );
262        }
263
264        UDPDiscoveryMessage message = new UDPDiscoveryMessage();
265        message.setHost( host );
266        message.setPort( port );
267        message.setCacheNames( cacheNames );
268        message.setRequesterId( listenerId );
269        message.setMessageType( BroadcastType.REMOVE );
270        send( message );
271    }
272}
273
274/**
275 * This allows us to get the byte array from an output stream.
276 * <p>
277 * @author asmuts
278 * @created January 15, 2002
279 */
280
281class MyByteArrayOutputStream
282    extends ByteArrayOutputStream
283{
284    /**
285     * Gets the bytes attribute of the MyByteArrayOutputStream object
286     * @return The bytes value
287     */
288    public byte[] getBytes()
289    {
290        return buf;
291    }
292}