View Javadoc
1   package org.apache.commons.jcs.auxiliary.lateral.socket.tcp;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.IOException;
23  import java.io.ObjectInputStream;
24  import java.io.ObjectOutputStream;
25  import java.net.InetSocketAddress;
26  import java.net.Socket;
27  
28  import org.apache.commons.jcs.auxiliary.lateral.LateralElementDescriptor;
29  import org.apache.commons.jcs.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
30  import org.apache.commons.jcs.io.ObjectInputStreamClassLoaderAware;
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  
34  /**
35   * This class is based on the log4j SocketAppender class. I'm using a different repair structure, so
36   * it is significantly different.
37   */
38  public class LateralTCPSender
39  {
40      /** The logger */
41      private static final Log log = LogFactory.getLog( LateralTCPSender.class );
42  
43      /** Config */
44      private int socketOpenTimeOut;
45      private int socketSoTimeOut;
46  
47      /** The stream from the server connection. */
48      private ObjectOutputStream oos;
49  
50      /** The socket connection with the server. */
51      private Socket socket;
52  
53      /** how many messages sent */
54      private int sendCnt = 0;
55  
56      /** Use to synchronize multiple threads that may be trying to get. */
57      private final Object getLock = new int[0];
58  
59      /**
60       * Constructor for the LateralTCPSender object.
61       * <p>
62       * @param lca
63       * @throws IOException
64       */
65      public LateralTCPSender( ITCPLateralCacheAttributes lca )
66          throws IOException
67      {
68          this.socketOpenTimeOut = lca.getOpenTimeOut();
69          this.socketSoTimeOut = lca.getSocketTimeOut();
70  
71          String p1 = lca.getTcpServer();
72          if ( p1 == null )
73          {
74              throw new IOException( "Invalid server (null)" );
75          }
76  
77          String h2 = p1.substring( 0, p1.indexOf( ":" ) );
78          int po = Integer.parseInt( p1.substring( p1.indexOf( ":" ) + 1 ) );
79          if ( log.isDebugEnabled() )
80          {
81              log.debug( "h2 = " + h2 );
82              log.debug( "po = " + po );
83          }
84  
85          if ( h2.length() == 0 )
86          {
87              throw new IOException( "Cannot connect to invalid address [" + h2 + ":" + po + "]" );
88          }
89  
90          init( h2, po );
91      }
92  
93      /**
94       * Creates a connection to a TCP server.
95       * <p>
96       * @param host
97       * @param port
98       * @throws IOException
99       */
100     protected void init( String host, int port )
101         throws IOException
102     {
103         try
104         {
105             if ( log.isInfoEnabled() )
106             {
107                 log.info( "Attempting connection to [" + host + "]" );
108             }
109 
110             // have time out socket open do this for us
111             try
112             {
113                 socket = new Socket();
114                 socket.connect( new InetSocketAddress( host, port ), this.socketOpenTimeOut );
115             }
116             catch ( IOException ioe )
117             {
118                 if (socket != null)
119                 {
120                     socket.close();
121                 }
122 
123                 throw new IOException( "Cannot connect to " + host + ":" + port, ioe );
124             }
125 
126             socket.setSoTimeout( socketSoTimeOut );
127             synchronized ( this )
128             {
129                 oos = new ObjectOutputStream( socket.getOutputStream() );
130             }
131         }
132         catch ( java.net.ConnectException e )
133         {
134             log.debug( "Remote host [" + host + "] refused connection." );
135             throw e;
136         }
137         catch ( IOException e )
138         {
139             log.debug( "Could not connect to [" + host + "]. Exception is " + e );
140             throw e;
141         }
142     }
143 
144     /**
145      * Sends commands to the lateral cache listener.
146      * <p>
147      * @param led
148      * @throws IOException
149      */
150     public <K, V> void send( LateralElementDescriptor<K, V> led )
151         throws IOException
152     {
153         sendCnt++;
154         if ( log.isInfoEnabled() && sendCnt % 100 == 0 )
155         {
156             log.info( "Send Count (port " + socket.getPort() + ") = " + sendCnt );
157         }
158 
159         if ( log.isDebugEnabled() )
160         {
161             log.debug( "sending LateralElementDescriptor" );
162         }
163 
164         if ( led == null )
165         {
166             return;
167         }
168 
169         if ( oos == null )
170         {
171             throw new IOException( "No remote connection is available for LateralTCPSender." );
172         }
173 
174         synchronized ( this.getLock )
175         {
176             oos.writeUnshared( led );
177             oos.flush();
178         }
179     }
180 
181     /**
182      * Sends commands to the lateral cache listener and gets a response. I'm afraid that we could
183      * get into a pretty bad blocking situation here. This needs work. I just wanted to get some
184      * form of get working. However, get is not recommended for performance reasons. If you have 10
185      * laterals, then you have to make 10 failed gets to find out none of the caches have the item.
186      * <p>
187      * @param led
188      * @return ICacheElement
189      * @throws IOException
190      */
191     public <K, V> Object sendAndReceive( LateralElementDescriptor<K, V> led )
192         throws IOException
193     {
194         if ( led == null )
195         {
196             return null;
197         }
198 
199         if ( oos == null )
200         {
201             throw new IOException( "No remote connection is available for LateralTCPSender." );
202         }
203 
204         Object response = null;
205 
206         // Synchronized to insure that the get requests to server from this
207         // sender and the responses are processed in order, else you could
208         // return the wrong item from the cache.
209         // This is a big block of code. May need to re-think this strategy.
210         // This may not be necessary.
211         // Normal puts, etc to laterals do not have to be synchronized.
212         synchronized ( this.getLock )
213         {
214             try
215             {
216                 // clean up input stream, nothing should be there yet.
217                 if ( socket.getInputStream().available() > 0 )
218                 {
219                     socket.getInputStream().read( new byte[socket.getInputStream().available()] );
220                 }
221             }
222             catch ( IOException ioe )
223             {
224                 log.error( "Problem cleaning socket before send " + socket, ioe );
225                 throw ioe;
226             }
227 
228             // write object to listener
229             oos.writeUnshared( led );
230             oos.flush();
231             ObjectInputStream ois = null;
232 
233             try
234             {
235                 socket.setSoTimeout( socketSoTimeOut );
236                 ois = new ObjectInputStreamClassLoaderAware( socket.getInputStream(), null );
237                 response = ois.readObject();
238             }
239             catch ( IOException ioe )
240             {
241                 String message = "Could not open ObjectInputStream to " + socket +
242                     " SoTimeout [" + socket.getSoTimeout() +
243                     "] Connected [" + socket.isConnected() + "]";
244                 log.error( message, ioe );
245                 throw ioe;
246             }
247             catch ( Exception e )
248             {
249                 log.error( e );
250             }
251             finally
252             {
253                 if (ois != null)
254                 {
255                     ois.close();
256                 }
257             }
258         }
259 
260         return response;
261     }
262 
263     /**
264      * Closes connection used by all LateralTCPSenders for this lateral connection. Dispose request
265      * should come into the facade and be sent to all lateral cache services. The lateral cache
266      * service will then call this method.
267      * <p>
268      * @throws IOException
269      */
270     public void dispose()
271         throws IOException
272     {
273         if ( log.isInfoEnabled() )
274         {
275             log.info( "Dispose called" );
276         }
277         // WILL CLOSE CONNECTION USED BY ALL
278         oos.close();
279         socket.close();
280     }
281 }