001package org.apache.commons.jcs.auxiliary.lateral.socket.tcp;
002
003/*
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements.  See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership.  The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License.  You may obtain a copy of the License at
011 *
012 *   http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing,
015 * software distributed under the License is distributed on an
016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017 * KIND, either express or implied.  See the License for the
018 * specific language governing permissions and limitations
019 * under the License.
020 */
021
022import java.io.IOException;
023import java.io.ObjectInputStream;
024import java.io.ObjectOutputStream;
025import java.net.InetSocketAddress;
026import java.net.Socket;
027
028import org.apache.commons.jcs.auxiliary.lateral.LateralElementDescriptor;
029import org.apache.commons.jcs.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
030import org.apache.commons.jcs.io.ObjectInputStreamClassLoaderAware;
031import org.apache.commons.logging.Log;
032import org.apache.commons.logging.LogFactory;
033
034/**
035 * This class is based on the log4j SocketAppender class. I'm using a different repair structure, so
036 * it is significantly different.
037 */
038public class LateralTCPSender
039{
040    /** The logger */
041    private static final Log log = LogFactory.getLog( LateralTCPSender.class );
042
043    /** Config */
044    private int socketOpenTimeOut;
045    private int socketSoTimeOut;
046
047    /** The stream from the server connection. */
048    private ObjectOutputStream oos;
049
050    /** The socket connection with the server. */
051    private Socket socket;
052
053    /** how many messages sent */
054    private int sendCnt = 0;
055
056    /** Use to synchronize multiple threads that may be trying to get. */
057    private final Object getLock = new int[0];
058
059    /**
060     * Constructor for the LateralTCPSender object.
061     * <p>
062     * @param lca
063     * @throws IOException
064     */
065    public LateralTCPSender( ITCPLateralCacheAttributes lca )
066        throws IOException
067    {
068        this.socketOpenTimeOut = lca.getOpenTimeOut();
069        this.socketSoTimeOut = lca.getSocketTimeOut();
070
071        String p1 = lca.getTcpServer();
072        if ( p1 == null )
073        {
074            throw new IOException( "Invalid server (null)" );
075        }
076
077        String h2 = p1.substring( 0, p1.indexOf( ":" ) );
078        int po = Integer.parseInt( p1.substring( p1.indexOf( ":" ) + 1 ) );
079        if ( log.isDebugEnabled() )
080        {
081            log.debug( "h2 = " + h2 );
082            log.debug( "po = " + po );
083        }
084
085        if ( h2.length() == 0 )
086        {
087            throw new IOException( "Cannot connect to invalid address [" + h2 + ":" + po + "]" );
088        }
089
090        init( h2, po );
091    }
092
093    /**
094     * Creates a connection to a TCP server.
095     * <p>
096     * @param host
097     * @param port
098     * @throws IOException
099     */
100    protected void init( String host, int port )
101        throws IOException
102    {
103        try
104        {
105            if ( log.isInfoEnabled() )
106            {
107                log.info( "Attempting connection to [" + host + "]" );
108            }
109
110            // have time out socket open do this for us
111            try
112            {
113                socket = new Socket();
114                socket.connect( new InetSocketAddress( host, port ), this.socketOpenTimeOut );
115            }
116            catch ( IOException ioe )
117            {
118                if (socket != null)
119                {
120                    socket.close();
121                }
122
123                throw new IOException( "Cannot connect to " + host + ":" + port, ioe );
124            }
125
126            socket.setSoTimeout( socketSoTimeOut );
127            synchronized ( this )
128            {
129                oos = new ObjectOutputStream( socket.getOutputStream() );
130            }
131        }
132        catch ( java.net.ConnectException e )
133        {
134            log.debug( "Remote host [" + host + "] refused connection." );
135            throw e;
136        }
137        catch ( IOException e )
138        {
139            log.debug( "Could not connect to [" + host + "]. Exception is " + e );
140            throw e;
141        }
142    }
143
144    /**
145     * Sends commands to the lateral cache listener.
146     * <p>
147     * @param led
148     * @throws IOException
149     */
150    public <K, V> void send( LateralElementDescriptor<K, V> led )
151        throws IOException
152    {
153        sendCnt++;
154        if ( log.isInfoEnabled() && sendCnt % 100 == 0 )
155        {
156            log.info( "Send Count (port " + socket.getPort() + ") = " + sendCnt );
157        }
158
159        if ( log.isDebugEnabled() )
160        {
161            log.debug( "sending LateralElementDescriptor" );
162        }
163
164        if ( led == null )
165        {
166            return;
167        }
168
169        if ( oos == null )
170        {
171            throw new IOException( "No remote connection is available for LateralTCPSender." );
172        }
173
174        synchronized ( this.getLock )
175        {
176            oos.writeUnshared( led );
177            oos.flush();
178        }
179    }
180
181    /**
182     * Sends commands to the lateral cache listener and gets a response. I'm afraid that we could
183     * get into a pretty bad blocking situation here. This needs work. I just wanted to get some
184     * form of get working. However, get is not recommended for performance reasons. If you have 10
185     * laterals, then you have to make 10 failed gets to find out none of the caches have the item.
186     * <p>
187     * @param led
188     * @return ICacheElement
189     * @throws IOException
190     */
191    public <K, V> Object sendAndReceive( LateralElementDescriptor<K, V> led )
192        throws IOException
193    {
194        if ( led == null )
195        {
196            return null;
197        }
198
199        if ( oos == null )
200        {
201            throw new IOException( "No remote connection is available for LateralTCPSender." );
202        }
203
204        Object response = null;
205
206        // Synchronized to insure that the get requests to server from this
207        // sender and the responses are processed in order, else you could
208        // return the wrong item from the cache.
209        // This is a big block of code. May need to re-think this strategy.
210        // This may not be necessary.
211        // Normal puts, etc to laterals do not have to be synchronized.
212        synchronized ( this.getLock )
213        {
214            try
215            {
216                // clean up input stream, nothing should be there yet.
217                if ( socket.getInputStream().available() > 0 )
218                {
219                    socket.getInputStream().read( new byte[socket.getInputStream().available()] );
220                }
221            }
222            catch ( IOException ioe )
223            {
224                log.error( "Problem cleaning socket before send " + socket, ioe );
225                throw ioe;
226            }
227
228            // write object to listener
229            oos.writeUnshared( led );
230            oos.flush();
231            ObjectInputStream ois = null;
232
233            try
234            {
235                socket.setSoTimeout( socketSoTimeOut );
236                ois = new ObjectInputStreamClassLoaderAware( socket.getInputStream(), null );
237                response = ois.readObject();
238            }
239            catch ( IOException ioe )
240            {
241                String message = "Could not open ObjectInputStream to " + socket +
242                    " SoTimeout [" + socket.getSoTimeout() +
243                    "] Connected [" + socket.isConnected() + "]";
244                log.error( message, ioe );
245                throw ioe;
246            }
247            catch ( Exception e )
248            {
249                log.error( e );
250            }
251            finally
252            {
253                if (ois != null)
254                {
255                    ois.close();
256                }
257            }
258        }
259
260        return response;
261    }
262
263    /**
264     * Closes connection used by all LateralTCPSenders for this lateral connection. Dispose request
265     * should come into the facade and be sent to all lateral cache services. The lateral cache
266     * service will then call this method.
267     * <p>
268     * @throws IOException
269     */
270    public void dispose()
271        throws IOException
272    {
273        if ( log.isInfoEnabled() )
274        {
275            log.info( "Dispose called" );
276        }
277        // WILL CLOSE CONNECTION USED BY ALL
278        oos.close();
279        socket.close();
280    }
281}