001    package org.apache.jcs.auxiliary.lateral.socket.tcp;
002    
003    /*
004     * Licensed to the Apache Software Foundation (ASF) under one
005     * or more contributor license agreements.  See the NOTICE file
006     * distributed with this work for additional information
007     * regarding copyright ownership.  The ASF licenses this file
008     * to you under the Apache License, Version 2.0 (the
009     * "License"); you may not use this file except in compliance
010     * with the License.  You may obtain a copy of the License at
011     *
012     *   http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing,
015     * software distributed under the License is distributed on an
016     * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017     * KIND, either express or implied.  See the License for the
018     * specific language governing permissions and limitations
019     * under the License.
020     */
021    
022    import java.io.IOException;
023    import java.io.ObjectInputStream;
024    import java.io.ObjectOutputStream;
025    import java.io.Serializable;
026    import java.net.InetAddress;
027    import java.net.InetSocketAddress;
028    import java.net.Socket;
029    
030    import org.apache.commons.logging.Log;
031    import org.apache.commons.logging.LogFactory;
032    import org.apache.jcs.auxiliary.lateral.LateralElementDescriptor;
033    import org.apache.jcs.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
034    
035    /**
036     * This class is based on the log4j SocketAppender class. I'm using a different repair structure, so
037     * it is significantly different.
038     */
039    public class LateralTCPSender
040    {
041        /** The logger */
042        private final static Log log = LogFactory.getLog( LateralTCPSender.class );
043    
044        /** Config */
045        private ITCPLateralCacheAttributes tcpLateralCacheAttributes;
046    
047        /** The hostname of the remote server. */
048        private String remoteHost;
049    
050        /** The address of the server */
051        private InetAddress address;
052    
053        /** The port the server is listening to. */
054        int port = 1111;
055    
056        /** The stream from the server connection. */
057        private ObjectOutputStream oos;
058    
059        /** The socket connection with the server. */
060        private Socket socket;
061    
062        /** how many messages sent */
063        private int sendCnt = 0;
064    
065        /** Use to synchronize multiple threads that may be trying to get. */
066        private final Object getLock = new int[0];
067    
068        /**
069         * Constructor for the LateralTCPSender object.
070         * <p>
071         * @param lca
072         * @exception IOException
073         */
074        public LateralTCPSender( ITCPLateralCacheAttributes lca )
075            throws IOException
076        {
077            this.setTcpLateralCacheAttributes( lca );
078    
079            String p1 = lca.getTcpServer();
080            if ( p1 != null )
081            {
082                String h2 = p1.substring( 0, p1.indexOf( ":" ) );
083                int po = Integer.parseInt( p1.substring( p1.indexOf( ":" ) + 1 ) );
084                if ( log.isDebugEnabled() )
085                {
086                    log.debug( "h2 = " + h2 );
087                    log.debug( "po = " + po );
088                }
089    
090                if ( h2 == null )
091                {
092                    throw new IOException( "Cannot connect to invalid address [" + h2 + ":" + po + "]" );
093                }
094    
095                init( h2, po );
096            }
097        }
098    
099        /**
100         * Creates a connection to a TCP server.
101         * <p>
102         * @param host
103         * @param port
104         * @throws IOException
105         */
106        protected void init( String host, int port )
107            throws IOException
108        {
109            this.port = port;
110            this.address = getAddressByName( host );
111            this.setRemoteHost( host );
112    
113            try
114            {
115                if ( log.isInfoEnabled() )
116                {
117                    log.info( "Attempting connection to [" + address.getHostName() + "]" );
118                }
119    
120                // have time out socket open do this for us
121                try
122                {
123                    InetSocketAddress address = new InetSocketAddress( host, port );
124                    socket = new Socket();
125                    socket.connect( address, tcpLateralCacheAttributes.getOpenTimeOut() );
126                }
127                catch ( IOException ioe )
128                {
129                    if (socket != null)
130                    {
131                        socket.close();
132                    }
133                    throw new IOException( "Cannot connect to " + host + ":" + port, ioe );
134                }
135    
136                socket.setSoTimeout( tcpLateralCacheAttributes.getSocketTimeOut() );
137                synchronized ( this )
138                {
139                    oos = new ObjectOutputStream( socket.getOutputStream() );
140                }
141            }
142            catch ( java.net.ConnectException e )
143            {
144                log.debug( "Remote host [" + address.getHostName() + "] refused connection." );
145                throw e;
146            }
147            catch ( IOException e )
148            {
149                log.debug( "Could not connect to [" + address.getHostName() + "]. Exception is " + e );
150                throw e;
151            }
152        }
153    
154        /**
155         * Gets the addressByName attribute of the LateralTCPSender object.
156         * <p>
157         * @param host
158         * @return The addressByName value
159         * @throws IOException
160         */
161        private InetAddress getAddressByName( String host )
162            throws IOException
163        {
164            try
165            {
166                return InetAddress.getByName( host );
167            }
168            catch ( Exception e )
169            {
170                log.error( "Could not find address of [" + host + "] ", e );
171                throw new IOException( "Could not find address of [" + host + "] " + e.getMessage() );
172            }
173        }
174    
175        /**
176         * Sends commands to the lateral cache listener.
177         * <p>
178         * @param led
179         * @throws IOException
180         */
181        public <K extends Serializable, V extends Serializable> void send( LateralElementDescriptor<K, V> led )
182            throws IOException
183        {
184            sendCnt++;
185            if ( log.isInfoEnabled() )
186            {
187                if ( sendCnt % 100 == 0 )
188                {
189                    log.info( "Send Count (port " + port + ") = " + sendCnt );
190                }
191            }
192    
193            if ( log.isDebugEnabled() )
194            {
195                log.debug( "sending LateralElementDescriptor" );
196            }
197    
198            if ( led == null )
199            {
200                return;
201            }
202    
203            if ( address == null )
204            {
205                throw new IOException( "No remote host is set for LateralTCPSender." );
206            }
207    
208            if ( oos != null )
209            {
210                synchronized ( this.getLock )
211                {
212                    try
213                    {
214                        oos.writeUnshared( led );
215                        oos.flush();
216                    }
217                    catch ( IOException e )
218                    {
219                        oos = null;
220                        log.error( "Detected problem with connection: " + e );
221                        throw e;
222                    }
223                }
224            }
225        }
226    
227        /**
228         * Sends commands to the lateral cache listener and gets a response. I'm afraid that we could
229         * get into a pretty bad blocking situation here. This needs work. I just wanted to get some
230         * form of get working. However, get is not recommended for performance reasons. If you have 10
231         * laterals, then you have to make 10 failed gets to find out none of the caches have the item.
232         * <p>
233         * @param led
234         * @return ICacheElement
235         * @throws IOException
236         */
237        public <K extends Serializable, V extends Serializable> Object sendAndReceive( LateralElementDescriptor<K, V> led )
238            throws IOException
239        {
240            if ( led == null )
241            {
242                return null;
243            }
244    
245            if ( address == null )
246            {
247                throw new IOException( "No remote host is set for LateralTCPSender." );
248            }
249    
250            Object response = null;
251    
252            if ( oos != null )
253            {
254                // Synchronized to insure that the get requests to server from this
255                // sender and the responses are processed in order, else you could
256                // return the wrong item from the cache.
257                // This is a big block of code. May need to re-think this strategy.
258                // This may not be necessary.
259                // Normal puts, etc to laterals do not have to be synchronized.
260                synchronized ( this.getLock )
261                {
262                    try
263                    {
264                        try
265                        {
266                            // clean up input stream, nothing should be there yet.
267                            if ( socket.getInputStream().available() > 0 )
268                            {
269                                socket.getInputStream().read( new byte[socket.getInputStream().available()] );
270                            }
271                        }
272                        catch ( IOException ioe )
273                        {
274                            log.error( "Problem cleaning socket before send " + socket, ioe );
275                            throw ioe;
276                        }
277    
278                        // write object to listener
279                        oos.writeUnshared( led );
280                        oos.flush();
281    
282                        try
283                        {
284                            // TODO make configurable
285                            // socket.setSoTimeout( 2000 );
286                            ObjectInputStream ois = new ObjectInputStream( socket.getInputStream() );
287                            response = ois.readObject();
288                        }
289                        catch ( IOException ioe )
290                        {
291                            String message = "Could not open ObjectInputStream to " + socket;
292                            message += " SoTimeout [" + socket.getSoTimeout() + "] Connected [" + socket.isConnected() + "]";
293                            log.error( message, ioe );
294                            throw ioe;
295                        }
296                        catch ( Exception e )
297                        {
298                            log.error( e );
299                        }
300                    }
301                    catch ( IOException e )
302                    {
303                        oos = null;
304                        log.error( "Detected problem with connection: " + e );
305                        throw e;
306                    }
307                }
308            }
309    
310            return response;
311        }
312    
313        /**
314         * Closes connection used by all LateralTCPSenders for this lateral connection. Dispose request
315         * should come into the facade and be sent to all lateral cache services. The lateral cache
316         * service will then call this method.
317         * <p>
318         * @param cache
319         * @throws IOException
320         */
321        public void dispose( String cache )
322            throws IOException
323        {
324            if ( log.isInfoEnabled() )
325            {
326                log.info( "Dispose called for cache [" + cache + "]" );
327            }
328            // WILL CLOSE CONNECTION USED BY ALL
329            oos.close();
330        }
331    
332        /**
333         * @param tcpLateralCacheAttributes The tcpLateralCacheAttributes to set.
334         */
335        public void setTcpLateralCacheAttributes( ITCPLateralCacheAttributes tcpLateralCacheAttributes )
336        {
337            this.tcpLateralCacheAttributes = tcpLateralCacheAttributes;
338        }
339    
340        /**
341         * @return Returns the tcpLateralCacheAttributes.
342         */
343        public ITCPLateralCacheAttributes getTcpLateralCacheAttributes()
344        {
345            return tcpLateralCacheAttributes;
346        }
347    
348        /**
349         * @param remoteHost The remoteHost to set.
350         */
351        public void setRemoteHost( String remoteHost )
352        {
353            this.remoteHost = remoteHost;
354        }
355    
356        /**
357         * @return Returns the remoteHost.
358         */
359        public String getRemoteHost()
360        {
361            return remoteHost;
362        }
363    }