1 package org.apache.jcs.auxiliary.lateral.socket.tcp;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.IOException;
23 import java.io.ObjectInputStream;
24 import java.io.ObjectOutputStream;
25 import java.io.Serializable;
26 import java.net.InetAddress;
27 import java.net.InetSocketAddress;
28 import java.net.Socket;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.jcs.auxiliary.lateral.LateralElementDescriptor;
33 import org.apache.jcs.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
34
35
36
37
38
39 public class LateralTCPSender
40 {
41
42 private final static Log log = LogFactory.getLog( LateralTCPSender.class );
43
44
45 private ITCPLateralCacheAttributes tcpLateralCacheAttributes;
46
47
48 private String remoteHost;
49
50
51 private InetAddress address;
52
53
54 int port = 1111;
55
56
57 private ObjectOutputStream oos;
58
59
60 private Socket socket;
61
62
63 private int sendCnt = 0;
64
65
66 private final Object getLock = new int[0];
67
68
69
70
71
72
73
74 public LateralTCPSender( ITCPLateralCacheAttributes lca )
75 throws IOException
76 {
77 this.setTcpLateralCacheAttributes( lca );
78
79 String p1 = lca.getTcpServer();
80 if ( p1 != null )
81 {
82 String h2 = p1.substring( 0, p1.indexOf( ":" ) );
83 int po = Integer.parseInt( p1.substring( p1.indexOf( ":" ) + 1 ) );
84 if ( log.isDebugEnabled() )
85 {
86 log.debug( "h2 = " + h2 );
87 log.debug( "po = " + po );
88 }
89
90 if ( h2 == null )
91 {
92 throw new IOException( "Cannot connect to invalid address [" + h2 + ":" + po + "]" );
93 }
94
95 init( h2, po );
96 }
97 }
98
99
100
101
102
103
104
105
106 protected void init( String host, int port )
107 throws IOException
108 {
109 this.port = port;
110 this.address = getAddressByName( host );
111 this.setRemoteHost( host );
112
113 try
114 {
115 if ( log.isInfoEnabled() )
116 {
117 log.info( "Attempting connection to [" + address.getHostName() + "]" );
118 }
119
120
121 try
122 {
123 InetSocketAddress address = new InetSocketAddress( host, port );
124 socket = new Socket();
125 socket.connect( address, tcpLateralCacheAttributes.getOpenTimeOut() );
126 }
127 catch ( IOException ioe )
128 {
129 if (socket != null)
130 {
131 socket.close();
132 }
133 throw new IOException( "Cannot connect to " + host + ":" + port, ioe );
134 }
135
136 socket.setSoTimeout( tcpLateralCacheAttributes.getSocketTimeOut() );
137 synchronized ( this )
138 {
139 oos = new ObjectOutputStream( socket.getOutputStream() );
140 }
141 }
142 catch ( java.net.ConnectException e )
143 {
144 log.debug( "Remote host [" + address.getHostName() + "] refused connection." );
145 throw e;
146 }
147 catch ( IOException e )
148 {
149 log.debug( "Could not connect to [" + address.getHostName() + "]. Exception is " + e );
150 throw e;
151 }
152 }
153
154
155
156
157
158
159
160
161 private InetAddress getAddressByName( String host )
162 throws IOException
163 {
164 try
165 {
166 return InetAddress.getByName( host );
167 }
168 catch ( Exception e )
169 {
170 log.error( "Could not find address of [" + host + "] ", e );
171 throw new IOException( "Could not find address of [" + host + "] " + e.getMessage() );
172 }
173 }
174
175
176
177
178
179
180
181 public <K extends Serializable, V extends Serializable> void send( LateralElementDescriptor<K, V> led )
182 throws IOException
183 {
184 sendCnt++;
185 if ( log.isInfoEnabled() )
186 {
187 if ( sendCnt % 100 == 0 )
188 {
189 log.info( "Send Count (port " + port + ") = " + sendCnt );
190 }
191 }
192
193 if ( log.isDebugEnabled() )
194 {
195 log.debug( "sending LateralElementDescriptor" );
196 }
197
198 if ( led == null )
199 {
200 return;
201 }
202
203 if ( address == null )
204 {
205 throw new IOException( "No remote host is set for LateralTCPSender." );
206 }
207
208 if ( oos != null )
209 {
210 synchronized ( this.getLock )
211 {
212 try
213 {
214 oos.writeUnshared( led );
215 oos.flush();
216 }
217 catch ( IOException e )
218 {
219 oos = null;
220 log.error( "Detected problem with connection: " + e );
221 throw e;
222 }
223 }
224 }
225 }
226
227
228
229
230
231
232
233
234
235
236
237 public <K extends Serializable, V extends Serializable> Object sendAndReceive( LateralElementDescriptor<K, V> led )
238 throws IOException
239 {
240 if ( led == null )
241 {
242 return null;
243 }
244
245 if ( address == null )
246 {
247 throw new IOException( "No remote host is set for LateralTCPSender." );
248 }
249
250 Object response = null;
251
252 if ( oos != null )
253 {
254
255
256
257
258
259
260 synchronized ( this.getLock )
261 {
262 try
263 {
264 try
265 {
266
267 if ( socket.getInputStream().available() > 0 )
268 {
269 socket.getInputStream().read( new byte[socket.getInputStream().available()] );
270 }
271 }
272 catch ( IOException ioe )
273 {
274 log.error( "Problem cleaning socket before send " + socket, ioe );
275 throw ioe;
276 }
277
278
279 oos.writeUnshared( led );
280 oos.flush();
281
282 try
283 {
284
285
286 ObjectInputStream ois = new ObjectInputStream( socket.getInputStream() );
287 response = ois.readObject();
288 }
289 catch ( IOException ioe )
290 {
291 String message = "Could not open ObjectInputStream to " + socket;
292 message += " SoTimeout [" + socket.getSoTimeout() + "] Connected [" + socket.isConnected() + "]";
293 log.error( message, ioe );
294 throw ioe;
295 }
296 catch ( Exception e )
297 {
298 log.error( e );
299 }
300 }
301 catch ( IOException e )
302 {
303 oos = null;
304 log.error( "Detected problem with connection: " + e );
305 throw e;
306 }
307 }
308 }
309
310 return response;
311 }
312
313
314
315
316
317
318
319
320
321 public void dispose( String cache )
322 throws IOException
323 {
324 if ( log.isInfoEnabled() )
325 {
326 log.info( "Dispose called for cache [" + cache + "]" );
327 }
328
329 oos.close();
330 }
331
332
333
334
335 public void setTcpLateralCacheAttributes( ITCPLateralCacheAttributes tcpLateralCacheAttributes )
336 {
337 this.tcpLateralCacheAttributes = tcpLateralCacheAttributes;
338 }
339
340
341
342
343 public ITCPLateralCacheAttributes getTcpLateralCacheAttributes()
344 {
345 return tcpLateralCacheAttributes;
346 }
347
348
349
350
351 public void setRemoteHost( String remoteHost )
352 {
353 this.remoteHost = remoteHost;
354 }
355
356
357
358
359 public String getRemoteHost()
360 {
361 return remoteHost;
362 }
363 }