1 package org.apache.commons.jcs.auxiliary.lateral.socket.tcp;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.BufferedReader;
23 import java.io.IOException;
24 import java.io.InputStreamReader;
25 import java.util.Collections;
26 import java.util.HashMap;
27 import java.util.Map;
28 import java.util.Set;
29
30 import org.apache.commons.jcs.auxiliary.lateral.LateralCommand;
31 import org.apache.commons.jcs.auxiliary.lateral.LateralElementDescriptor;
32 import org.apache.commons.jcs.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
33 import org.apache.commons.jcs.engine.CacheElement;
34 import org.apache.commons.jcs.engine.CacheInfo;
35 import org.apache.commons.jcs.engine.behavior.ICacheElement;
36 import org.apache.commons.jcs.engine.behavior.ICacheServiceNonLocal;
37 import org.apache.commons.logging.Log;
38 import org.apache.commons.logging.LogFactory;
39
40
41
42
43
44 public class LateralTCPService<K, V>
45 implements ICacheServiceNonLocal<K, V>
46 {
47
48 private static final Log log = LogFactory.getLog( LateralTCPService.class );
49
50
51 private boolean allowPut;
52 private boolean allowGet;
53 private boolean issueRemoveOnPut;
54
55
56 private LateralTCPSender sender;
57
58
59 private long listenerId = CacheInfo.listenerId;
60
61
62
63
64
65
66
67 public LateralTCPService( ITCPLateralCacheAttributes lca )
68 throws IOException
69 {
70 this.allowGet = lca.isAllowGet();
71 this.allowPut = lca.isAllowPut();
72 this.issueRemoveOnPut = lca.isIssueRemoveOnPut();
73
74 try
75 {
76 sender = new LateralTCPSender( lca );
77
78 if ( log.isInfoEnabled() )
79 {
80 log.debug( "Created sender to [" + lca.getTcpServer() + "]" );
81 }
82 }
83 catch ( IOException e )
84 {
85
86
87
88 log.error( "Could not create sender to [" + lca.getTcpServer() + "] -- " + e.getMessage() );
89
90 throw e;
91 }
92 }
93
94
95
96
97
98 @Override
99 public void update( ICacheElement<K, V> item )
100 throws IOException
101 {
102 update( item, getListenerId() );
103 }
104
105
106
107
108
109
110
111
112
113 @Override
114 public void update( ICacheElement<K, V> item, long requesterId )
115 throws IOException
116 {
117
118 if ( !this.allowPut &&
119
120 !this.issueRemoveOnPut )
121 {
122 return;
123 }
124
125
126 if ( !this.issueRemoveOnPut )
127 {
128 LateralElementDescriptor<K, V> led = new LateralElementDescriptor<K, V>( item );
129 led.requesterId = requesterId;
130 led.command = LateralCommand.UPDATE;
131 sender.send( led );
132 }
133
134
135 else
136 {
137 if ( log.isDebugEnabled() )
138 {
139 log.debug( "Issuing a remove for a put" );
140 }
141
142 CacheElement<K, V> ce = new CacheElement<K, V>( item.getCacheName(), item.getKey(), null );
143 LateralElementDescriptor<K, V> led = new LateralElementDescriptor<K, V>( ce );
144 led.requesterId = requesterId;
145 led.command = LateralCommand.REMOVE;
146 led.valHashCode = item.getVal().hashCode();
147 sender.send( led );
148 }
149 }
150
151
152
153
154
155
156 @Override
157 public void remove( String cacheName, K key )
158 throws IOException
159 {
160 remove( cacheName, key, getListenerId() );
161 }
162
163
164
165
166
167
168 @Override
169 public void remove( String cacheName, K key, long requesterId )
170 throws IOException
171 {
172 CacheElement<K, V> ce = new CacheElement<K, V>( cacheName, key, null );
173 LateralElementDescriptor<K, V> led = new LateralElementDescriptor<K, V>( ce );
174 led.requesterId = requesterId;
175 led.command = LateralCommand.REMOVE;
176 sender.send( led );
177 }
178
179
180
181
182
183
184 @Override
185 public void release()
186 throws IOException
187 {
188
189 }
190
191
192
193
194
195
196
197 @Override
198 public void dispose( String cacheName )
199 throws IOException
200 {
201 sender.dispose();
202 }
203
204
205
206
207
208
209
210 @Override
211 public ICacheElement<K, V> get( String cacheName, K key )
212 throws IOException
213 {
214 return get( cacheName, key, getListenerId() );
215 }
216
217
218
219
220
221
222
223
224
225
226 @Override
227 public ICacheElement<K, V> get( String cacheName, K key, long requesterId )
228 throws IOException
229 {
230
231 if ( this.allowGet )
232 {
233 CacheElement<K, V> ce = new CacheElement<K, V>( cacheName, key, null );
234 LateralElementDescriptor<K, V> led = new LateralElementDescriptor<K, V>( ce );
235
236 led.command = LateralCommand.GET;
237 @SuppressWarnings("unchecked")
238 ICacheElement<K, V> response = (ICacheElement<K, V>)sender.sendAndReceive( led );
239 if ( response != null )
240 {
241 return response;
242 }
243 return null;
244 }
245 else
246 {
247
248 return null;
249 }
250 }
251
252
253
254
255
256
257
258
259
260
261 @Override
262 public Map<K, ICacheElement<K, V>> getMatching( String cacheName, String pattern )
263 throws IOException
264 {
265 return getMatching( cacheName, pattern, getListenerId() );
266 }
267
268
269
270
271
272
273
274
275
276
277
278 @Override
279 @SuppressWarnings("unchecked")
280 public Map<K, ICacheElement<K, V>> getMatching( String cacheName, String pattern, long requesterId )
281 throws IOException
282 {
283
284 if ( this.allowGet )
285 {
286 CacheElement<String, String> ce = new CacheElement<String, String>( cacheName, pattern, null );
287 LateralElementDescriptor<String, String> led = new LateralElementDescriptor<String, String>( ce );
288
289 led.command = LateralCommand.GET_MATCHING;
290
291 Object response = sender.sendAndReceive( led );
292 if ( response != null )
293 {
294 return (Map<K, ICacheElement<K, V>>) response;
295 }
296 return Collections.emptyMap();
297 }
298 else
299 {
300
301 return null;
302 }
303 }
304
305
306
307
308
309
310
311
312
313
314 @Override
315 public Map<K, ICacheElement<K, V>> getMultiple( String cacheName, Set<K> keys )
316 throws IOException
317 {
318 return getMultiple( cacheName, keys, getListenerId() );
319 }
320
321
322
323
324
325
326
327
328
329
330
331
332
333 @Override
334 public Map<K, ICacheElement<K, V>> getMultiple( String cacheName, Set<K> keys, long requesterId )
335 throws IOException
336 {
337 Map<K, ICacheElement<K, V>> elements = new HashMap<K, ICacheElement<K, V>>();
338
339 if ( keys != null && !keys.isEmpty() )
340 {
341 for (K key : keys)
342 {
343 ICacheElement<K, V> element = get( cacheName, key );
344
345 if ( element != null )
346 {
347 elements.put( key, element );
348 }
349 }
350 }
351 return elements;
352 }
353
354
355
356
357
358
359
360 @Override
361 @SuppressWarnings("unchecked")
362 public Set<K> getKeySet(String cacheName) throws IOException
363 {
364 CacheElement<String, String> ce = new CacheElement<String, String>(cacheName, null, null);
365 LateralElementDescriptor<String, String> led = new LateralElementDescriptor<String, String>(ce);
366
367 led.command = LateralCommand.GET_KEYSET;
368 Object response = sender.sendAndReceive(led);
369 if (response != null)
370 {
371 return (Set<K>) response;
372 }
373
374 return null;
375 }
376
377
378
379
380
381 @Override
382 public void removeAll( String cacheName )
383 throws IOException
384 {
385 removeAll( cacheName, getListenerId() );
386 }
387
388
389
390
391
392
393 @Override
394 public void removeAll( String cacheName, long requesterId )
395 throws IOException
396 {
397 CacheElement<String, String> ce = new CacheElement<String, String>( cacheName, "ALL", null );
398 LateralElementDescriptor<String, String> led = new LateralElementDescriptor<String, String>( ce );
399 led.requesterId = requesterId;
400 led.command = LateralCommand.REMOVEALL;
401 sender.send( led );
402 }
403
404
405
406
407 public static void main( String args[] )
408 {
409 try
410 {
411 LateralTCPSender sender = new LateralTCPSender( new TCPLateralCacheAttributes() );
412
413
414 boolean notDone = true;
415 String message = null;
416
417 BufferedReader br = new BufferedReader( new InputStreamReader( System.in, "UTF-8" ) );
418
419 while ( notDone )
420 {
421 System.out.println( "enter message:" );
422 message = br.readLine();
423
424 if (message == null)
425 {
426 notDone = false;
427 continue;
428 }
429
430 CacheElement<String, String> ce = new CacheElement<String, String>( "test", "test", message );
431 LateralElementDescriptor<String, String> led = new LateralElementDescriptor<String, String>( ce );
432 sender.send( led );
433 }
434 }
435 catch ( IOException e )
436 {
437 System.out.println( e.toString() );
438 }
439 }
440
441
442
443
444 protected void setListenerId( long listernId )
445 {
446 this.listenerId = listernId;
447 }
448
449
450
451
452 protected long getListenerId()
453 {
454 return listenerId;
455 }
456 }