1 package org.apache.commons.jcs3.auxiliary.lateral.socket.tcp;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.IOException;
23 import java.net.InetSocketAddress;
24 import java.nio.channels.AsynchronousSocketChannel;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.Future;
27 import java.util.concurrent.TimeUnit;
28 import java.util.concurrent.TimeoutException;
29 import java.util.concurrent.locks.Lock;
30 import java.util.concurrent.locks.ReentrantLock;
31
32 import org.apache.commons.jcs3.auxiliary.lateral.LateralElementDescriptor;
33 import org.apache.commons.jcs3.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
34 import org.apache.commons.jcs3.engine.behavior.IElementSerializer;
35 import org.apache.commons.jcs3.log.Log;
36 import org.apache.commons.jcs3.log.LogManager;
37 import org.apache.commons.jcs3.utils.serialization.StandardSerializer;
38
39
40
41
42
43 public class LateralTCPSender
44 {
45
46 private static final Log log = LogManager.getLog( LateralTCPSender.class );
47
48
49 private final int socketOpenTimeOut;
50 private final int socketSoTimeOut;
51
52
53 private final IElementSerializer serializer;
54
55
56 private AsynchronousSocketChannel client;
57
58
59 private int sendCnt;
60
61
62 private final Lock lock = new ReentrantLock(true);
63
64
65
66
67
68
69
70
71 @Deprecated
72 public LateralTCPSender( final ITCPLateralCacheAttributes lca )
73 throws IOException
74 {
75 this(lca, new StandardSerializer());
76 }
77
78
79
80
81
82
83
84
85
86 public LateralTCPSender( final ITCPLateralCacheAttributes lca, final IElementSerializer serializer )
87 throws IOException
88 {
89 this.socketOpenTimeOut = lca.getOpenTimeOut();
90 this.socketSoTimeOut = lca.getSocketTimeOut();
91
92 this.serializer = serializer;
93
94 final String p1 = lca.getTcpServer();
95 if ( p1 == null )
96 {
97 throw new IOException( "Invalid server (null)" );
98 }
99
100 final int colonPosition = p1.lastIndexOf(':');
101
102 if ( colonPosition < 0 )
103 {
104 throw new IOException( "Invalid address [" + p1 + "]" );
105 }
106
107 final String h2 = p1.substring( 0, colonPosition );
108 final int po = Integer.parseInt( p1.substring( colonPosition + 1 ) );
109 log.debug( "h2 = {0}, po = {1}", h2, po );
110
111 if ( h2.isEmpty() )
112 {
113 throw new IOException( "Cannot connect to invalid address [" + h2 + ":" + po + "]" );
114 }
115
116 init( h2, po );
117 }
118
119
120
121
122
123
124
125
126 protected void init( final String host, final int port )
127 throws IOException
128 {
129 log.info( "Attempting connection to [{0}:{1}]", host, port );
130
131 try
132 {
133 client = AsynchronousSocketChannel.open();
134 InetSocketAddress hostAddress = new InetSocketAddress(host, port);
135 Future<Void> future = client.connect(hostAddress);
136
137 future.get(this.socketOpenTimeOut, TimeUnit.MILLISECONDS);
138 }
139 catch (final IOException | InterruptedException | ExecutionException | TimeoutException ioe)
140 {
141 throw new IOException( "Cannot connect to " + host + ":" + port, ioe );
142 }
143 }
144
145
146
147
148
149
150
151 public <K, V> void send( final LateralElementDescriptor<K, V> led )
152 throws IOException
153 {
154 sendCnt++;
155 if ( log.isInfoEnabled() && sendCnt % 100 == 0 )
156 {
157 log.info( "Send Count {0} = {1}", client.getRemoteAddress(), sendCnt );
158 }
159
160 log.debug( "sending LateralElementDescriptor" );
161
162 if ( led == null )
163 {
164 return;
165 }
166
167 lock.lock();
168 try
169 {
170 serializer.serializeTo(led, client, socketSoTimeOut);
171 }
172 finally
173 {
174 lock.unlock();
175 }
176 }
177
178
179
180
181
182
183
184
185
186
187
188 public <K, V> Object sendAndReceive( final LateralElementDescriptor<K, V> led )
189 throws IOException
190 {
191 if ( led == null )
192 {
193 return null;
194 }
195
196
197
198
199
200
201
202 Object response = null;
203
204 lock.lock();
205 try
206 {
207
208 send(led);
209 response = serializer.deSerializeFrom(client, socketSoTimeOut, null);
210 }
211 catch ( final IOException | ClassNotFoundException ioe )
212 {
213 final String message = "Could not open channel to " +
214 client.getRemoteAddress() + " SoTimeout [" + socketSoTimeOut +
215 "] Connected [" + client.isOpen() + "]";
216 log.error( message, ioe );
217 throw new IOException(message, ioe);
218 }
219 finally
220 {
221 lock.unlock();
222 }
223
224 return response;
225 }
226
227
228
229
230
231
232
233
234 public void dispose()
235 throws IOException
236 {
237 log.info( "Dispose called" );
238 client.close();
239 }
240 }