1 package org.apache.jcs.auxiliary.lateral.socket.tcp;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.BufferedReader;
23 import java.io.IOException;
24 import java.io.InputStreamReader;
25 import java.io.Serializable;
26 import java.util.Collections;
27 import java.util.HashMap;
28 import java.util.Map;
29 import java.util.Set;
30
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.apache.jcs.auxiliary.lateral.LateralCacheInfo;
34 import org.apache.jcs.auxiliary.lateral.LateralCommand;
35 import org.apache.jcs.auxiliary.lateral.LateralElementDescriptor;
36 import org.apache.jcs.auxiliary.lateral.behavior.ILateralCacheObserver;
37 import org.apache.jcs.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
38 import org.apache.jcs.engine.CacheElement;
39 import org.apache.jcs.engine.behavior.ICacheElement;
40 import org.apache.jcs.engine.behavior.ICacheListener;
41 import org.apache.jcs.engine.behavior.ICacheServiceNonLocal;
42
43
44
45
46 public class LateralTCPService<K extends Serializable, V extends Serializable>
47 implements ICacheServiceNonLocal<K, V>, ILateralCacheObserver
48 {
49
50 private final static Log log = LogFactory.getLog( LateralTCPService.class );
51
52
53 private ITCPLateralCacheAttributes tcpLateralCacheAttributes;
54
55
56 private LateralTCPSender sender;
57
58
59 private long listenerId = LateralCacheInfo.listenerId;
60
61
62
63
64
65
66
67 public LateralTCPService( ITCPLateralCacheAttributes lca )
68 throws IOException
69 {
70 this.setTcpLateralCacheAttributes( lca );
71 try
72 {
73 log.debug( "creating sender, attributes = " + getTcpLateralCacheAttributes() );
74
75 sender = new LateralTCPSender( lca );
76
77 if ( log.isInfoEnabled() )
78 {
79 log.debug( "Created sender to [" + lca.getTcpServer() + "]" );
80 }
81 }
82 catch ( IOException e )
83 {
84
85
86
87 log.error( "Could not create sender to [" + lca.getTcpServer() + "] -- " + e.getMessage() );
88
89 throw e;
90 }
91 }
92
93
94
95
96
97 public void update( ICacheElement<K, V> item )
98 throws IOException
99 {
100 update( item, getListenerId() );
101 }
102
103
104
105
106
107
108
109
110
111 public void update( ICacheElement<K, V> item, long requesterId )
112 throws IOException
113 {
114
115 if ( !this.getTcpLateralCacheAttributes().isAllowPut() )
116 {
117
118 if ( !this.getTcpLateralCacheAttributes().isIssueRemoveOnPut() )
119 {
120 return;
121 }
122 }
123
124
125 if ( !this.getTcpLateralCacheAttributes().isIssueRemoveOnPut() )
126 {
127 LateralElementDescriptor<K, V> led = new LateralElementDescriptor<K, V>( item );
128 led.requesterId = requesterId;
129 led.command = LateralCommand.UPDATE;
130 sender.send( led );
131 }
132
133
134 else
135 {
136 if ( log.isDebugEnabled() )
137 {
138 log.debug( "Issuing a remove for a put" );
139 }
140
141 CacheElement<K, V> ce = new CacheElement<K, V>( item.getCacheName(), item.getKey(), null );
142 LateralElementDescriptor<K, V> led = new LateralElementDescriptor<K, V>( ce );
143 led.requesterId = requesterId;
144 led.command = LateralCommand.REMOVE;
145 led.valHashCode = item.getVal().hashCode();
146 sender.send( led );
147 }
148 }
149
150
151
152
153
154
155
156 public void remove( String cacheName, K key )
157 throws IOException
158 {
159 remove( cacheName, key, getListenerId() );
160 }
161
162
163
164
165
166
167
168 public void remove( String cacheName, K key, long requesterId )
169 throws IOException
170 {
171 CacheElement<K, V> ce = new CacheElement<K, V>( cacheName, key, null );
172 LateralElementDescriptor<K, V> led = new LateralElementDescriptor<K, V>( ce );
173 led.requesterId = requesterId;
174 led.command = LateralCommand.REMOVE;
175 sender.send( led );
176 }
177
178
179
180
181
182
183 public void release()
184 throws IOException
185 {
186
187 }
188
189
190
191
192
193
194
195 public void dispose( String cacheName )
196 throws IOException
197 {
198 sender.dispose( cacheName );
199 }
200
201
202
203
204
205
206
207
208 public Serializable get( String key )
209 throws IOException
210 {
211 if ( log.isDebugEnabled() )
212 {
213 log.debug( "balking at get for key [" + key + "]" );
214 }
215
216
217 return null;
218
219 }
220
221
222
223
224
225
226
227 public ICacheElement<K, V> get( String cacheName, K key )
228 throws IOException
229 {
230 return get( cacheName, key, getListenerId() );
231 }
232
233
234
235
236
237
238
239
240
241
242 public ICacheElement<K, V> get( String cacheName, K key, long requesterId )
243 throws IOException
244 {
245
246 if ( this.getTcpLateralCacheAttributes().isAllowGet() )
247 {
248 CacheElement<K, V> ce = new CacheElement<K, V>( cacheName, key, null );
249 LateralElementDescriptor<K, V> led = new LateralElementDescriptor<K, V>( ce );
250
251 led.command = LateralCommand.GET;
252 @SuppressWarnings("unchecked")
253 ICacheElement<K, V> response = (ICacheElement<K, V>)sender.sendAndReceive( led );
254 if ( response != null )
255 {
256 return response;
257 }
258 return null;
259 }
260 else
261 {
262
263 return null;
264 }
265 }
266
267
268
269
270
271
272
273
274
275
276 public Map<K, ICacheElement<K, V>> getMatching( String cacheName, String pattern )
277 throws IOException
278 {
279 return getMatching( cacheName, pattern, getListenerId() );
280 }
281
282
283
284
285
286
287
288
289
290
291
292 @SuppressWarnings("unchecked")
293 public Map<K, ICacheElement<K, V>> getMatching( String cacheName, String pattern, long requesterId )
294 throws IOException
295 {
296
297 if ( this.getTcpLateralCacheAttributes().isAllowGet() )
298 {
299 CacheElement<String, String> ce = new CacheElement<String, String>( cacheName, pattern, null );
300 LateralElementDescriptor<String, String> led = new LateralElementDescriptor<String, String>( ce );
301
302 led.command = LateralCommand.GET_MATCHING;
303
304 Object response = sender.sendAndReceive( led );
305 if ( response != null )
306 {
307 return (Map<K, ICacheElement<K, V>>) response;
308 }
309 return Collections.emptyMap();
310 }
311 else
312 {
313
314 return null;
315 }
316 }
317
318
319
320
321
322
323
324
325
326
327 public Map<K, ICacheElement<K, V>> getMultiple( String cacheName, Set<K> keys )
328 throws IOException
329 {
330 return getMultiple( cacheName, keys, getListenerId() );
331 }
332
333
334
335
336
337
338
339
340
341
342
343
344
345 public Map<K, ICacheElement<K, V>> getMultiple( String cacheName, Set<K> keys, long requesterId )
346 throws IOException
347 {
348 Map<K, ICacheElement<K, V>> elements = new HashMap<K, ICacheElement<K, V>>();
349
350 if ( keys != null && !keys.isEmpty() )
351 {
352 for (K key : keys)
353 {
354 ICacheElement<K, V> element = get( cacheName, key );
355
356 if ( element != null )
357 {
358 elements.put( key, element );
359 }
360 }
361 }
362 return elements;
363 }
364
365
366
367
368
369
370
371
372 @SuppressWarnings("unchecked")
373 public Set<K> getGroupKeys( String cacheName, String group )
374 throws IOException
375 {
376 CacheElement<String, String> ce = new CacheElement<String, String>(cacheName, group, null);
377 LateralElementDescriptor<String, String> led = new LateralElementDescriptor<String, String>(ce);
378
379 led.command = LateralCommand.GET_GROUP_KEYS;
380 Object response = sender.sendAndReceive(led);
381 if (response != null)
382 {
383 return (Set<K>) response;
384 }
385
386 return null;
387 }
388
389
390
391
392
393
394
395
396
397 @SuppressWarnings("unchecked")
398 public Set<String> getGroupNames(String cacheName)
399 throws IOException
400 {
401 CacheElement<String, String> ce = new CacheElement<String, String>(cacheName, null, null);
402 LateralElementDescriptor<String, String> led = new LateralElementDescriptor<String, String>(ce);
403
404 led.command = LateralCommand.GET_GROUP_NAMES;
405 Object response = sender.sendAndReceive(led);
406 if (response != null)
407 {
408 return (Set<String>) response;
409 }
410
411 return null;
412 }
413
414
415
416
417
418 public void removeAll( String cacheName )
419 throws IOException
420 {
421 removeAll( cacheName, getListenerId() );
422 }
423
424
425
426
427
428
429 public void removeAll( String cacheName, long requesterId )
430 throws IOException
431 {
432 CacheElement<String, String> ce = new CacheElement<String, String>( cacheName, "ALL", null );
433 LateralElementDescriptor<String, String> led = new LateralElementDescriptor<String, String>( ce );
434 led.requesterId = requesterId;
435 led.command = LateralCommand.REMOVEALL;
436 sender.send( led );
437 }
438
439
440
441
442 public static void main( String args[] )
443 {
444 try
445 {
446 LateralTCPSender sender = new LateralTCPSender( new TCPLateralCacheAttributes() );
447
448
449 boolean notDone = true;
450 String message = null;
451
452 BufferedReader br = new BufferedReader( new InputStreamReader( System.in ) );
453
454 while ( notDone )
455 {
456 System.out.println( "enter mesage:" );
457 message = br.readLine();
458
459 if (message == null)
460 {
461 notDone = false;
462 continue;
463 }
464
465 CacheElement<String, String> ce = new CacheElement<String, String>( "test", "test", message );
466 LateralElementDescriptor<String, String> led = new LateralElementDescriptor<String, String>( ce );
467 sender.send( led );
468 }
469 }
470 catch ( IOException e )
471 {
472 System.out.println( e.toString() );
473 }
474 }
475
476
477
478
479
480
481
482
483
484
485 public <KK extends Serializable, VV extends Serializable> void addCacheListener( String cacheName, ICacheListener<KK, VV> obj )
486 throws IOException
487 {
488
489 }
490
491
492
493
494
495 public <KK extends Serializable, VV extends Serializable> void addCacheListener( ICacheListener<KK, VV> obj )
496 throws IOException
497 {
498
499 }
500
501
502
503
504
505
506 public <KK extends Serializable, VV extends Serializable> void removeCacheListener( String cacheName, ICacheListener<KK, VV> obj )
507 throws IOException
508 {
509
510 }
511
512
513
514
515
516 public <KK extends Serializable, VV extends Serializable> void removeCacheListener( ICacheListener<KK, VV> obj )
517 throws IOException
518 {
519
520 }
521
522
523
524
525 protected void setListenerId( long listernId )
526 {
527 this.listenerId = listernId;
528 }
529
530
531
532
533 protected long getListenerId()
534 {
535 return listenerId;
536 }
537
538
539
540
541 public void setTcpLateralCacheAttributes( ITCPLateralCacheAttributes tcpLateralCacheAttributes )
542 {
543 this.tcpLateralCacheAttributes = tcpLateralCacheAttributes;
544 }
545
546
547
548
549 public ITCPLateralCacheAttributes getTcpLateralCacheAttributes()
550 {
551 return tcpLateralCacheAttributes;
552 }
553
554
555 }