View Javadoc

1   package org.apache.jcs.auxiliary.lateral.socket.tcp;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.IOException;
23  import java.io.ObjectInputStream;
24  import java.io.ObjectOutputStream;
25  import java.io.Serializable;
26  import java.net.InetAddress;
27  import java.net.InetSocketAddress;
28  import java.net.Socket;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.jcs.auxiliary.lateral.LateralElementDescriptor;
33  import org.apache.jcs.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
34  
35  /**
36   * This class is based on the log4j SocketAppender class. I'm using a different repair structure, so
37   * it is significantly different.
38   */
39  public class LateralTCPSender
40  {
41      /** The logger */
42      private final static Log log = LogFactory.getLog( LateralTCPSender.class );
43  
44      /** Config */
45      private ITCPLateralCacheAttributes tcpLateralCacheAttributes;
46  
47      /** The hostname of the remote server. */
48      private String remoteHost;
49  
50      /** The address of the server */
51      private InetAddress address;
52  
53      /** The port the server is listening to. */
54      int port = 1111;
55  
56      /** The stream from the server connection. */
57      private ObjectOutputStream oos;
58  
59      /** The socket connection with the server. */
60      private Socket socket;
61  
62      /** how many messages sent */
63      private int sendCnt = 0;
64  
65      /** Use to synchronize multiple threads that may be trying to get. */
66      private final Object getLock = new int[0];
67  
68      /**
69       * Constructor for the LateralTCPSender object.
70       * <p>
71       * @param lca
72       * @exception IOException
73       */
74      public LateralTCPSender( ITCPLateralCacheAttributes lca )
75          throws IOException
76      {
77          this.setTcpLateralCacheAttributes( lca );
78  
79          String p1 = lca.getTcpServer();
80          if ( p1 != null )
81          {
82              String h2 = p1.substring( 0, p1.indexOf( ":" ) );
83              int po = Integer.parseInt( p1.substring( p1.indexOf( ":" ) + 1 ) );
84              if ( log.isDebugEnabled() )
85              {
86                  log.debug( "h2 = " + h2 );
87                  log.debug( "po = " + po );
88              }
89  
90              if ( h2 == null )
91              {
92                  throw new IOException( "Cannot connect to invalid address [" + h2 + ":" + po + "]" );
93              }
94  
95              init( h2, po );
96          }
97      }
98  
99      /**
100      * Creates a connection to a TCP server.
101      * <p>
102      * @param host
103      * @param port
104      * @throws IOException
105      */
106     protected void init( String host, int port )
107         throws IOException
108     {
109         this.port = port;
110         this.address = getAddressByName( host );
111         this.setRemoteHost( host );
112 
113         try
114         {
115             if ( log.isInfoEnabled() )
116             {
117                 log.info( "Attempting connection to [" + address.getHostName() + "]" );
118             }
119 
120             // have time out socket open do this for us
121             try
122             {
123                 InetSocketAddress address = new InetSocketAddress( host, port );
124                 socket = new Socket();
125                 socket.connect( address, tcpLateralCacheAttributes.getOpenTimeOut() );
126             }
127             catch ( IOException ioe )
128             {
129                 if (socket != null)
130                 {
131                     socket.close();
132                 }
133                 throw new IOException( "Cannot connect to " + host + ":" + port, ioe );
134             }
135 
136             socket.setSoTimeout( tcpLateralCacheAttributes.getSocketTimeOut() );
137             synchronized ( this )
138             {
139                 oos = new ObjectOutputStream( socket.getOutputStream() );
140             }
141         }
142         catch ( java.net.ConnectException e )
143         {
144             log.debug( "Remote host [" + address.getHostName() + "] refused connection." );
145             throw e;
146         }
147         catch ( IOException e )
148         {
149             log.debug( "Could not connect to [" + address.getHostName() + "]. Exception is " + e );
150             throw e;
151         }
152     }
153 
154     /**
155      * Gets the addressByName attribute of the LateralTCPSender object.
156      * <p>
157      * @param host
158      * @return The addressByName value
159      * @throws IOException
160      */
161     private InetAddress getAddressByName( String host )
162         throws IOException
163     {
164         try
165         {
166             return InetAddress.getByName( host );
167         }
168         catch ( Exception e )
169         {
170             log.error( "Could not find address of [" + host + "] ", e );
171             throw new IOException( "Could not find address of [" + host + "] " + e.getMessage() );
172         }
173     }
174 
175     /**
176      * Sends commands to the lateral cache listener.
177      * <p>
178      * @param led
179      * @throws IOException
180      */
181     public <K extends Serializable, V extends Serializable> void send( LateralElementDescriptor<K, V> led )
182         throws IOException
183     {
184         sendCnt++;
185         if ( log.isInfoEnabled() )
186         {
187             if ( sendCnt % 100 == 0 )
188             {
189                 log.info( "Send Count (port " + port + ") = " + sendCnt );
190             }
191         }
192 
193         if ( log.isDebugEnabled() )
194         {
195             log.debug( "sending LateralElementDescriptor" );
196         }
197 
198         if ( led == null )
199         {
200             return;
201         }
202 
203         if ( address == null )
204         {
205             throw new IOException( "No remote host is set for LateralTCPSender." );
206         }
207 
208         if ( oos != null )
209         {
210             synchronized ( this.getLock )
211             {
212                 try
213                 {
214                     oos.writeUnshared( led );
215                     oos.flush();
216                 }
217                 catch ( IOException e )
218                 {
219                     oos = null;
220                     log.error( "Detected problem with connection: " + e );
221                     throw e;
222                 }
223             }
224         }
225     }
226 
227     /**
228      * Sends commands to the lateral cache listener and gets a response. I'm afraid that we could
229      * get into a pretty bad blocking situation here. This needs work. I just wanted to get some
230      * form of get working. However, get is not recommended for performance reasons. If you have 10
231      * laterals, then you have to make 10 failed gets to find out none of the caches have the item.
232      * <p>
233      * @param led
234      * @return ICacheElement
235      * @throws IOException
236      */
237     public <K extends Serializable, V extends Serializable> Object sendAndReceive( LateralElementDescriptor<K, V> led )
238         throws IOException
239     {
240         if ( led == null )
241         {
242             return null;
243         }
244 
245         if ( address == null )
246         {
247             throw new IOException( "No remote host is set for LateralTCPSender." );
248         }
249 
250         Object response = null;
251 
252         if ( oos != null )
253         {
254             // Synchronized to insure that the get requests to server from this
255             // sender and the responses are processed in order, else you could
256             // return the wrong item from the cache.
257             // This is a big block of code. May need to re-think this strategy.
258             // This may not be necessary.
259             // Normal puts, etc to laterals do not have to be synchronized.
260             synchronized ( this.getLock )
261             {
262                 try
263                 {
264                     try
265                     {
266                         // clean up input stream, nothing should be there yet.
267                         if ( socket.getInputStream().available() > 0 )
268                         {
269                             socket.getInputStream().read( new byte[socket.getInputStream().available()] );
270                         }
271                     }
272                     catch ( IOException ioe )
273                     {
274                         log.error( "Problem cleaning socket before send " + socket, ioe );
275                         throw ioe;
276                     }
277 
278                     // write object to listener
279                     oos.writeUnshared( led );
280                     oos.flush();
281 
282                     try
283                     {
284                         // TODO make configurable
285                         // socket.setSoTimeout( 2000 );
286                         ObjectInputStream ois = new ObjectInputStream( socket.getInputStream() );
287                         response = ois.readObject();
288                     }
289                     catch ( IOException ioe )
290                     {
291                         String message = "Could not open ObjectInputStream to " + socket;
292                         message += " SoTimeout [" + socket.getSoTimeout() + "] Connected [" + socket.isConnected() + "]";
293                         log.error( message, ioe );
294                         throw ioe;
295                     }
296                     catch ( Exception e )
297                     {
298                         log.error( e );
299                     }
300                 }
301                 catch ( IOException e )
302                 {
303                     oos = null;
304                     log.error( "Detected problem with connection: " + e );
305                     throw e;
306                 }
307             }
308         }
309 
310         return response;
311     }
312 
313     /**
314      * Closes connection used by all LateralTCPSenders for this lateral connection. Dispose request
315      * should come into the facade and be sent to all lateral cache services. The lateral cache
316      * service will then call this method.
317      * <p>
318      * @param cache
319      * @throws IOException
320      */
321     public void dispose( String cache )
322         throws IOException
323     {
324         if ( log.isInfoEnabled() )
325         {
326             log.info( "Dispose called for cache [" + cache + "]" );
327         }
328         // WILL CLOSE CONNECTION USED BY ALL
329         oos.close();
330     }
331 
332     /**
333      * @param tcpLateralCacheAttributes The tcpLateralCacheAttributes to set.
334      */
335     public void setTcpLateralCacheAttributes( ITCPLateralCacheAttributes tcpLateralCacheAttributes )
336     {
337         this.tcpLateralCacheAttributes = tcpLateralCacheAttributes;
338     }
339 
340     /**
341      * @return Returns the tcpLateralCacheAttributes.
342      */
343     public ITCPLateralCacheAttributes getTcpLateralCacheAttributes()
344     {
345         return tcpLateralCacheAttributes;
346     }
347 
348     /**
349      * @param remoteHost The remoteHost to set.
350      */
351     public void setRemoteHost( String remoteHost )
352     {
353         this.remoteHost = remoteHost;
354     }
355 
356     /**
357      * @return Returns the remoteHost.
358      */
359     public String getRemoteHost()
360     {
361         return remoteHost;
362     }
363 }