1 package org.apache.commons.jcs.auxiliary.lateral.socket.tcp;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.IOException;
23 import java.io.ObjectInputStream;
24 import java.io.ObjectOutputStream;
25 import java.net.InetSocketAddress;
26 import java.net.Socket;
27
28 import org.apache.commons.jcs.auxiliary.lateral.LateralElementDescriptor;
29 import org.apache.commons.jcs.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
30 import org.apache.commons.jcs.io.ObjectInputStreamClassLoaderAware;
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33
34
35
36
37
38 public class LateralTCPSender
39 {
40
41 private static final Log log = LogFactory.getLog( LateralTCPSender.class );
42
43
44 private int socketOpenTimeOut;
45 private int socketSoTimeOut;
46
47
48 private ObjectOutputStream oos;
49
50
51 private Socket socket;
52
53
54 private int sendCnt = 0;
55
56
57 private final Object getLock = new int[0];
58
59
60
61
62
63
64
65 public LateralTCPSender( ITCPLateralCacheAttributes lca )
66 throws IOException
67 {
68 this.socketOpenTimeOut = lca.getOpenTimeOut();
69 this.socketSoTimeOut = lca.getSocketTimeOut();
70
71 String p1 = lca.getTcpServer();
72 if ( p1 == null )
73 {
74 throw new IOException( "Invalid server (null)" );
75 }
76
77 String h2 = p1.substring( 0, p1.indexOf( ":" ) );
78 int po = Integer.parseInt( p1.substring( p1.indexOf( ":" ) + 1 ) );
79 if ( log.isDebugEnabled() )
80 {
81 log.debug( "h2 = " + h2 );
82 log.debug( "po = " + po );
83 }
84
85 if ( h2.length() == 0 )
86 {
87 throw new IOException( "Cannot connect to invalid address [" + h2 + ":" + po + "]" );
88 }
89
90 init( h2, po );
91 }
92
93
94
95
96
97
98
99
100 protected void init( String host, int port )
101 throws IOException
102 {
103 try
104 {
105 if ( log.isInfoEnabled() )
106 {
107 log.info( "Attempting connection to [" + host + "]" );
108 }
109
110
111 try
112 {
113 socket = new Socket();
114 socket.connect( new InetSocketAddress( host, port ), this.socketOpenTimeOut );
115 }
116 catch ( IOException ioe )
117 {
118 if (socket != null)
119 {
120 socket.close();
121 }
122
123 throw new IOException( "Cannot connect to " + host + ":" + port, ioe );
124 }
125
126 socket.setSoTimeout( socketSoTimeOut );
127 synchronized ( this )
128 {
129 oos = new ObjectOutputStream( socket.getOutputStream() );
130 }
131 }
132 catch ( java.net.ConnectException e )
133 {
134 log.debug( "Remote host [" + host + "] refused connection." );
135 throw e;
136 }
137 catch ( IOException e )
138 {
139 log.debug( "Could not connect to [" + host + "]. Exception is " + e );
140 throw e;
141 }
142 }
143
144
145
146
147
148
149
150 public <K, V> void send( LateralElementDescriptor<K, V> led )
151 throws IOException
152 {
153 sendCnt++;
154 if ( log.isInfoEnabled() && sendCnt % 100 == 0 )
155 {
156 log.info( "Send Count (port " + socket.getPort() + ") = " + sendCnt );
157 }
158
159 if ( log.isDebugEnabled() )
160 {
161 log.debug( "sending LateralElementDescriptor" );
162 }
163
164 if ( led == null )
165 {
166 return;
167 }
168
169 if ( oos == null )
170 {
171 throw new IOException( "No remote connection is available for LateralTCPSender." );
172 }
173
174 synchronized ( this.getLock )
175 {
176 oos.writeUnshared( led );
177 oos.flush();
178 }
179 }
180
181
182
183
184
185
186
187
188
189
190
191 public <K, V> Object sendAndReceive( LateralElementDescriptor<K, V> led )
192 throws IOException
193 {
194 if ( led == null )
195 {
196 return null;
197 }
198
199 if ( oos == null )
200 {
201 throw new IOException( "No remote connection is available for LateralTCPSender." );
202 }
203
204 Object response = null;
205
206
207
208
209
210
211
212 synchronized ( this.getLock )
213 {
214 try
215 {
216
217 if ( socket.getInputStream().available() > 0 )
218 {
219 socket.getInputStream().read( new byte[socket.getInputStream().available()] );
220 }
221 }
222 catch ( IOException ioe )
223 {
224 log.error( "Problem cleaning socket before send " + socket, ioe );
225 throw ioe;
226 }
227
228
229 oos.writeUnshared( led );
230 oos.flush();
231 ObjectInputStream ois = null;
232
233 try
234 {
235 socket.setSoTimeout( socketSoTimeOut );
236 ois = new ObjectInputStreamClassLoaderAware( socket.getInputStream(), null );
237 response = ois.readObject();
238 }
239 catch ( IOException ioe )
240 {
241 String message = "Could not open ObjectInputStream to " + socket +
242 " SoTimeout [" + socket.getSoTimeout() +
243 "] Connected [" + socket.isConnected() + "]";
244 log.error( message, ioe );
245 throw ioe;
246 }
247 catch ( Exception e )
248 {
249 log.error( e );
250 }
251 finally
252 {
253 if (ois != null)
254 {
255 ois.close();
256 }
257 }
258 }
259
260 return response;
261 }
262
263
264
265
266
267
268
269
270 public void dispose()
271 throws IOException
272 {
273 if ( log.isInfoEnabled() )
274 {
275 log.info( "Dispose called" );
276 }
277
278 oos.close();
279 socket.close();
280 }
281 }