001 package org.apache.jcs.auxiliary.lateral.socket.tcp;
002
003 /*
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements. See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership. The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License. You may obtain a copy of the License at
011 *
012 * http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing,
015 * software distributed under the License is distributed on an
016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017 * KIND, either express or implied. See the License for the
018 * specific language governing permissions and limitations
019 * under the License.
020 */
021
022 import java.io.IOException;
023 import java.io.ObjectInputStream;
024 import java.io.ObjectOutputStream;
025 import java.io.Serializable;
026 import java.net.InetAddress;
027 import java.net.InetSocketAddress;
028 import java.net.Socket;
029
030 import org.apache.commons.logging.Log;
031 import org.apache.commons.logging.LogFactory;
032 import org.apache.jcs.auxiliary.lateral.LateralElementDescriptor;
033 import org.apache.jcs.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
034
035 /**
036 * This class is based on the log4j SocketAppender class. I'm using a different repair structure, so
037 * it is significantly different.
038 */
039 public class LateralTCPSender
040 {
041 /** The logger */
042 private final static Log log = LogFactory.getLog( LateralTCPSender.class );
043
044 /** Config */
045 private ITCPLateralCacheAttributes tcpLateralCacheAttributes;
046
047 /** The hostname of the remote server. */
048 private String remoteHost;
049
050 /** The address of the server */
051 private InetAddress address;
052
053 /** The port the server is listening to. */
054 int port = 1111;
055
056 /** The stream from the server connection. */
057 private ObjectOutputStream oos;
058
059 /** The socket connection with the server. */
060 private Socket socket;
061
062 /** how many messages sent */
063 private int sendCnt = 0;
064
065 /** Use to synchronize multiple threads that may be trying to get. */
066 private final Object getLock = new int[0];
067
068 /**
069 * Constructor for the LateralTCPSender object.
070 * <p>
071 * @param lca
072 * @exception IOException
073 */
074 public LateralTCPSender( ITCPLateralCacheAttributes lca )
075 throws IOException
076 {
077 this.setTcpLateralCacheAttributes( lca );
078
079 String p1 = lca.getTcpServer();
080 if ( p1 != null )
081 {
082 String h2 = p1.substring( 0, p1.indexOf( ":" ) );
083 int po = Integer.parseInt( p1.substring( p1.indexOf( ":" ) + 1 ) );
084 if ( log.isDebugEnabled() )
085 {
086 log.debug( "h2 = " + h2 );
087 log.debug( "po = " + po );
088 }
089
090 if ( h2 == null )
091 {
092 throw new IOException( "Cannot connect to invalid address [" + h2 + ":" + po + "]" );
093 }
094
095 init( h2, po );
096 }
097 }
098
099 /**
100 * Creates a connection to a TCP server.
101 * <p>
102 * @param host
103 * @param port
104 * @throws IOException
105 */
106 protected void init( String host, int port )
107 throws IOException
108 {
109 this.port = port;
110 this.address = getAddressByName( host );
111 this.setRemoteHost( host );
112
113 try
114 {
115 if ( log.isInfoEnabled() )
116 {
117 log.info( "Attempting connection to [" + address.getHostName() + "]" );
118 }
119
120 // have time out socket open do this for us
121 try
122 {
123 InetSocketAddress address = new InetSocketAddress( host, port );
124 socket = new Socket();
125 socket.connect( address, tcpLateralCacheAttributes.getOpenTimeOut() );
126 }
127 catch ( IOException ioe )
128 {
129 if (socket != null)
130 {
131 socket.close();
132 }
133 throw new IOException( "Cannot connect to " + host + ":" + port, ioe );
134 }
135
136 socket.setSoTimeout( tcpLateralCacheAttributes.getSocketTimeOut() );
137 synchronized ( this )
138 {
139 oos = new ObjectOutputStream( socket.getOutputStream() );
140 }
141 }
142 catch ( java.net.ConnectException e )
143 {
144 log.debug( "Remote host [" + address.getHostName() + "] refused connection." );
145 throw e;
146 }
147 catch ( IOException e )
148 {
149 log.debug( "Could not connect to [" + address.getHostName() + "]. Exception is " + e );
150 throw e;
151 }
152 }
153
154 /**
155 * Gets the addressByName attribute of the LateralTCPSender object.
156 * <p>
157 * @param host
158 * @return The addressByName value
159 * @throws IOException
160 */
161 private InetAddress getAddressByName( String host )
162 throws IOException
163 {
164 try
165 {
166 return InetAddress.getByName( host );
167 }
168 catch ( Exception e )
169 {
170 log.error( "Could not find address of [" + host + "] ", e );
171 throw new IOException( "Could not find address of [" + host + "] " + e.getMessage() );
172 }
173 }
174
175 /**
176 * Sends commands to the lateral cache listener.
177 * <p>
178 * @param led
179 * @throws IOException
180 */
181 public <K extends Serializable, V extends Serializable> void send( LateralElementDescriptor<K, V> led )
182 throws IOException
183 {
184 sendCnt++;
185 if ( log.isInfoEnabled() )
186 {
187 if ( sendCnt % 100 == 0 )
188 {
189 log.info( "Send Count (port " + port + ") = " + sendCnt );
190 }
191 }
192
193 if ( log.isDebugEnabled() )
194 {
195 log.debug( "sending LateralElementDescriptor" );
196 }
197
198 if ( led == null )
199 {
200 return;
201 }
202
203 if ( address == null )
204 {
205 throw new IOException( "No remote host is set for LateralTCPSender." );
206 }
207
208 if ( oos != null )
209 {
210 synchronized ( this.getLock )
211 {
212 try
213 {
214 oos.writeUnshared( led );
215 oos.flush();
216 }
217 catch ( IOException e )
218 {
219 oos = null;
220 log.error( "Detected problem with connection: " + e );
221 throw e;
222 }
223 }
224 }
225 }
226
227 /**
228 * Sends commands to the lateral cache listener and gets a response. I'm afraid that we could
229 * get into a pretty bad blocking situation here. This needs work. I just wanted to get some
230 * form of get working. However, get is not recommended for performance reasons. If you have 10
231 * laterals, then you have to make 10 failed gets to find out none of the caches have the item.
232 * <p>
233 * @param led
234 * @return ICacheElement
235 * @throws IOException
236 */
237 public <K extends Serializable, V extends Serializable> Object sendAndReceive( LateralElementDescriptor<K, V> led )
238 throws IOException
239 {
240 if ( led == null )
241 {
242 return null;
243 }
244
245 if ( address == null )
246 {
247 throw new IOException( "No remote host is set for LateralTCPSender." );
248 }
249
250 Object response = null;
251
252 if ( oos != null )
253 {
254 // Synchronized to insure that the get requests to server from this
255 // sender and the responses are processed in order, else you could
256 // return the wrong item from the cache.
257 // This is a big block of code. May need to re-think this strategy.
258 // This may not be necessary.
259 // Normal puts, etc to laterals do not have to be synchronized.
260 synchronized ( this.getLock )
261 {
262 try
263 {
264 try
265 {
266 // clean up input stream, nothing should be there yet.
267 if ( socket.getInputStream().available() > 0 )
268 {
269 socket.getInputStream().read( new byte[socket.getInputStream().available()] );
270 }
271 }
272 catch ( IOException ioe )
273 {
274 log.error( "Problem cleaning socket before send " + socket, ioe );
275 throw ioe;
276 }
277
278 // write object to listener
279 oos.writeUnshared( led );
280 oos.flush();
281
282 try
283 {
284 // TODO make configurable
285 // socket.setSoTimeout( 2000 );
286 ObjectInputStream ois = new ObjectInputStream( socket.getInputStream() );
287 response = ois.readObject();
288 }
289 catch ( IOException ioe )
290 {
291 String message = "Could not open ObjectInputStream to " + socket;
292 message += " SoTimeout [" + socket.getSoTimeout() + "] Connected [" + socket.isConnected() + "]";
293 log.error( message, ioe );
294 throw ioe;
295 }
296 catch ( Exception e )
297 {
298 log.error( e );
299 }
300 }
301 catch ( IOException e )
302 {
303 oos = null;
304 log.error( "Detected problem with connection: " + e );
305 throw e;
306 }
307 }
308 }
309
310 return response;
311 }
312
313 /**
314 * Closes connection used by all LateralTCPSenders for this lateral connection. Dispose request
315 * should come into the facade and be sent to all lateral cache services. The lateral cache
316 * service will then call this method.
317 * <p>
318 * @param cache
319 * @throws IOException
320 */
321 public void dispose( String cache )
322 throws IOException
323 {
324 if ( log.isInfoEnabled() )
325 {
326 log.info( "Dispose called for cache [" + cache + "]" );
327 }
328 // WILL CLOSE CONNECTION USED BY ALL
329 oos.close();
330 }
331
332 /**
333 * @param tcpLateralCacheAttributes The tcpLateralCacheAttributes to set.
334 */
335 public void setTcpLateralCacheAttributes( ITCPLateralCacheAttributes tcpLateralCacheAttributes )
336 {
337 this.tcpLateralCacheAttributes = tcpLateralCacheAttributes;
338 }
339
340 /**
341 * @return Returns the tcpLateralCacheAttributes.
342 */
343 public ITCPLateralCacheAttributes getTcpLateralCacheAttributes()
344 {
345 return tcpLateralCacheAttributes;
346 }
347
348 /**
349 * @param remoteHost The remoteHost to set.
350 */
351 public void setRemoteHost( String remoteHost )
352 {
353 this.remoteHost = remoteHost;
354 }
355
356 /**
357 * @return Returns the remoteHost.
358 */
359 public String getRemoteHost()
360 {
361 return remoteHost;
362 }
363 }