1 package org.apache.commons.jcs.auxiliary.lateral.socket.tcp;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.EOFException;
23 import java.io.IOException;
24 import java.io.ObjectInputStream;
25 import java.io.ObjectOutputStream;
26 import java.io.Serializable;
27 import java.net.InetAddress;
28 import java.net.ServerSocket;
29 import java.net.Socket;
30 import java.net.SocketException;
31 import java.net.SocketTimeoutException;
32 import java.util.HashMap;
33 import java.util.Map;
34 import java.util.Set;
35 import java.util.concurrent.ExecutorService;
36 import java.util.concurrent.Executors;
37 import java.util.concurrent.atomic.AtomicBoolean;
38
39 import org.apache.commons.jcs.access.exception.CacheException;
40 import org.apache.commons.jcs.auxiliary.lateral.LateralElementDescriptor;
41 import org.apache.commons.jcs.auxiliary.lateral.behavior.ILateralCacheListener;
42 import org.apache.commons.jcs.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
43 import org.apache.commons.jcs.engine.CacheInfo;
44 import org.apache.commons.jcs.engine.behavior.ICacheElement;
45 import org.apache.commons.jcs.engine.behavior.ICompositeCacheManager;
46 import org.apache.commons.jcs.engine.behavior.IShutdownObserver;
47 import org.apache.commons.jcs.engine.control.CompositeCache;
48 import org.apache.commons.jcs.engine.control.CompositeCacheManager;
49 import org.apache.commons.jcs.io.ObjectInputStreamClassLoaderAware;
50 import org.apache.commons.jcs.utils.threadpool.DaemonThreadFactory;
51 import org.apache.commons.logging.Log;
52 import org.apache.commons.logging.LogFactory;
53
54
55
56
57
58
59 public class LateralTCPListener<K, V>
60 implements ILateralCacheListener<K, V>, IShutdownObserver
61 {
62
63 private static final Log log = LogFactory.getLog( LateralTCPListener.class );
64
65
66 private static final int acceptTimeOut = 1000;
67
68
69 private transient ICompositeCacheManager cacheManager;
70
71
72 private static final HashMap<String, ILateralCacheListener<?, ?>> instances =
73 new HashMap<String, ILateralCacheListener<?, ?>>();
74
75
76 private ListenerThread receiver;
77
78
79 private ITCPLateralCacheAttributes tcpLateralCacheAttributes;
80
81
82 private int port;
83
84
85 private ExecutorService pooledExecutor;
86
87
88 private int putCnt = 0;
89
90
91 private int removeCnt = 0;
92
93
94 private int getCnt = 0;
95
96
97
98
99
100 private long listenerId = CacheInfo.listenerId;
101
102
103 private AtomicBoolean shutdown;
104
105
106 private AtomicBoolean terminated;
107
108
109
110
111
112
113
114
115 public synchronized static <K, V> LateralTCPListener<K, V>
116 getInstance( ITCPLateralCacheAttributes ilca, ICompositeCacheManager cacheMgr )
117 {
118 @SuppressWarnings("unchecked")
119 LateralTCPListener<K, V> ins = (LateralTCPListener<K, V>) instances.get( String.valueOf( ilca.getTcpListenerPort() ) );
120
121 if ( ins == null )
122 {
123 ins = new LateralTCPListener<K, V>( ilca );
124
125 ins.init();
126 ins.setCacheManager( cacheMgr );
127
128 instances.put( String.valueOf( ilca.getTcpListenerPort() ), ins );
129
130 if ( log.isInfoEnabled() )
131 {
132 log.info( "Created new listener " + ilca.getTcpListenerPort() );
133 }
134 }
135
136 return ins;
137 }
138
139
140
141
142
143
144 protected LateralTCPListener( ITCPLateralCacheAttributes ilca )
145 {
146 this.setTcpLateralCacheAttributes( ilca );
147 }
148
149
150
151
152 @Override
153 public synchronized void init()
154 {
155 try
156 {
157 this.port = getTcpLateralCacheAttributes().getTcpListenerPort();
158
159 pooledExecutor = Executors.newCachedThreadPool(
160 new DaemonThreadFactory("JCS-LateralTCPListener-"));
161 terminated = new AtomicBoolean(false);
162 shutdown = new AtomicBoolean(false);
163
164 log.info( "Listening on port " + port );
165
166 ServerSocket serverSocket = new ServerSocket( port );
167 serverSocket.setSoTimeout( acceptTimeOut );
168
169 receiver = new ListenerThread(serverSocket);
170 receiver.setDaemon( true );
171 receiver.start();
172 }
173 catch ( IOException ex )
174 {
175 throw new IllegalStateException( ex );
176 }
177 }
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194 @Override
195 public void setListenerId( long id )
196 throws IOException
197 {
198 this.listenerId = id;
199 if ( log.isDebugEnabled() )
200 {
201 log.debug( "set listenerId = " + id );
202 }
203 }
204
205
206
207
208
209
210
211 @Override
212 public long getListenerId()
213 throws IOException
214 {
215 return this.listenerId;
216 }
217
218
219
220
221
222
223
224 @Override
225 public void handlePut( ICacheElement<K, V> element )
226 throws IOException
227 {
228 putCnt++;
229 if ( log.isInfoEnabled() )
230 {
231 if ( getPutCnt() % 100 == 0 )
232 {
233 log.info( "Put Count (port " + getTcpLateralCacheAttributes().getTcpListenerPort() + ") = "
234 + getPutCnt() );
235 }
236 }
237
238 if ( log.isDebugEnabled() )
239 {
240 log.debug( "handlePut> cacheName=" + element.getCacheName() + ", key=" + element.getKey() );
241 }
242
243 getCache( element.getCacheName() ).localUpdate( element );
244 }
245
246
247
248
249
250
251
252
253 @Override
254 public void handleRemove( String cacheName, K key )
255 throws IOException
256 {
257 removeCnt++;
258 if ( log.isInfoEnabled() )
259 {
260 if ( getRemoveCnt() % 100 == 0 )
261 {
262 log.info( "Remove Count = " + getRemoveCnt() );
263 }
264 }
265
266 if ( log.isDebugEnabled() )
267 {
268 log.debug( "handleRemove> cacheName=" + cacheName + ", key=" + key );
269 }
270
271 getCache( cacheName ).localRemove( key );
272 }
273
274
275
276
277
278
279 @Override
280 public void handleRemoveAll( String cacheName )
281 throws IOException
282 {
283 if ( log.isDebugEnabled() )
284 {
285 log.debug( "handleRemoveAll> cacheName=" + cacheName );
286 }
287
288 getCache( cacheName ).localRemoveAll();
289 }
290
291
292
293
294
295
296
297
298
299 public ICacheElement<K, V> handleGet( String cacheName, K key )
300 throws IOException
301 {
302 getCnt++;
303 if ( log.isInfoEnabled() )
304 {
305 if ( getGetCnt() % 100 == 0 )
306 {
307 log.info( "Get Count (port " + getTcpLateralCacheAttributes().getTcpListenerPort() + ") = "
308 + getGetCnt() );
309 }
310 }
311
312 if ( log.isDebugEnabled() )
313 {
314 log.debug( "handleGet> cacheName=" + cacheName + ", key = " + key );
315 }
316
317 return getCache( cacheName ).localGet( key );
318 }
319
320
321
322
323
324
325
326
327
328 public Map<K, ICacheElement<K, V>> handleGetMatching( String cacheName, String pattern )
329 throws IOException
330 {
331 getCnt++;
332 if ( log.isInfoEnabled() )
333 {
334 if ( getGetCnt() % 100 == 0 )
335 {
336 log.info( "GetMatching Count (port " + getTcpLateralCacheAttributes().getTcpListenerPort() + ") = "
337 + getGetCnt() );
338 }
339 }
340
341 if ( log.isDebugEnabled() )
342 {
343 log.debug( "handleGetMatching> cacheName=" + cacheName + ", pattern = " + pattern );
344 }
345
346 return getCache( cacheName ).localGetMatching( pattern );
347 }
348
349
350
351
352
353
354
355
356 public Set<K> handleGetKeySet( String cacheName ) throws IOException
357 {
358 return getCache( cacheName ).getKeySet(true);
359 }
360
361
362
363
364
365
366 @Override
367 public void handleDispose( String cacheName )
368 throws IOException
369 {
370 if ( log.isInfoEnabled() )
371 {
372 log.info( "handleDispose > cacheName=" + cacheName + " | Ignoring message. Do not dispose from remote." );
373 }
374
375
376 terminated.set(true);
377 }
378
379 @Override
380 public synchronized void dispose()
381 {
382 terminated.set(true);
383 notify();
384
385 pooledExecutor.shutdownNow();
386 }
387
388
389
390
391
392
393
394
395
396
397 protected CompositeCache<K, V> getCache( String name )
398 {
399 if ( getCacheManager() == null )
400 {
401
402 try
403 {
404 setCacheManager( CompositeCacheManager.getInstance() );
405 }
406 catch (CacheException e)
407 {
408 throw new RuntimeException("Could not retrieve cache manager instance", e);
409 }
410
411 if ( log.isDebugEnabled() )
412 {
413 log.debug( "cacheMgr = " + getCacheManager() );
414 }
415 }
416
417 return getCacheManager().getCache( name );
418 }
419
420
421
422
423
424
425 public int getPutCnt()
426 {
427 return putCnt;
428 }
429
430
431
432
433 public int getGetCnt()
434 {
435 return getCnt;
436 }
437
438
439
440
441 public int getRemoveCnt()
442 {
443 return removeCnt;
444 }
445
446
447
448
449 @Override
450 public void setCacheManager( ICompositeCacheManager cacheMgr )
451 {
452 this.cacheManager = cacheMgr;
453 }
454
455
456
457
458 @Override
459 public ICompositeCacheManager getCacheManager()
460 {
461 return cacheManager;
462 }
463
464
465
466
467 public void setTcpLateralCacheAttributes( ITCPLateralCacheAttributes tcpLateralCacheAttributes )
468 {
469 this.tcpLateralCacheAttributes = tcpLateralCacheAttributes;
470 }
471
472
473
474
475 public ITCPLateralCacheAttributes getTcpLateralCacheAttributes()
476 {
477 return tcpLateralCacheAttributes;
478 }
479
480
481
482
483
484 public class ListenerThread
485 extends Thread
486 {
487
488 private final ServerSocket serverSocket;
489
490
491
492
493
494
495 public ListenerThread(ServerSocket serverSocket)
496 {
497 super();
498 this.serverSocket = serverSocket;
499 }
500
501
502 @SuppressWarnings("synthetic-access")
503 @Override
504 public void run()
505 {
506 try
507 {
508 ConnectionHandler handler;
509
510 outer: while ( true )
511 {
512 if ( log.isDebugEnabled() )
513 {
514 log.debug( "Waiting for clients to connect " );
515 }
516
517 Socket socket = null;
518 inner: while (true)
519 {
520
521 if (terminated.get())
522 {
523 if (log.isDebugEnabled())
524 {
525 log.debug("Thread terminated, exiting gracefully");
526 }
527 break outer;
528 }
529
530 try
531 {
532 socket = serverSocket.accept();
533 break inner;
534 }
535 catch (SocketTimeoutException e)
536 {
537
538 continue inner;
539 }
540 }
541
542 if ( socket != null && log.isDebugEnabled() )
543 {
544 InetAddress inetAddress = socket.getInetAddress();
545 log.debug( "Connected to client at " + inetAddress );
546 }
547
548 handler = new ConnectionHandler( socket );
549 pooledExecutor.execute( handler );
550 }
551 }
552 catch ( IOException e )
553 {
554 log.error( "Exception caught in TCP listener", e );
555 }
556 finally
557 {
558 if (serverSocket != null)
559 {
560 try
561 {
562 serverSocket.close();
563 }
564 catch (IOException e)
565 {
566 log.error( "Exception caught closing socket", e );
567 }
568 }
569 }
570 }
571 }
572
573
574
575
576 public class ConnectionHandler
577 implements Runnable
578 {
579
580 private final Socket socket;
581
582
583
584
585
586 public ConnectionHandler( Socket socket )
587 {
588 this.socket = socket;
589 }
590
591
592
593
594 @Override
595 @SuppressWarnings({"unchecked",
596 "synthetic-access" })
597 public void run()
598 {
599 ObjectInputStream ois;
600
601 try
602 {
603 ois = new ObjectInputStreamClassLoaderAware( socket.getInputStream(), null );
604 }
605 catch ( Exception e )
606 {
607 log.error( "Could not open ObjectInputStream on " + socket, e );
608
609 return;
610 }
611
612 LateralElementDescriptor<K, V> led;
613
614 try
615 {
616 while ( true )
617 {
618 led = (LateralElementDescriptor<K, V>) ois.readObject();
619
620 if ( led == null )
621 {
622 log.debug( "LateralElementDescriptor is null" );
623 continue;
624 }
625 if ( led.requesterId == getListenerId() )
626 {
627 log.debug( "from self" );
628 }
629 else
630 {
631 if ( log.isDebugEnabled() )
632 {
633 log.debug( "receiving LateralElementDescriptor from another" + "led = " + led
634 + ", led.command = " + led.command + ", led.ce = " + led.ce );
635 }
636
637 handle( led );
638 }
639 }
640 }
641 catch ( EOFException e )
642 {
643 log.info( "Caught java.io.EOFException closing connection." + e.getMessage() );
644 }
645 catch ( SocketException e )
646 {
647 log.info( "Caught java.net.SocketException closing connection." + e.getMessage() );
648 }
649 catch ( Exception e )
650 {
651 log.error( "Unexpected exception.", e );
652 }
653
654 try
655 {
656 ois.close();
657 }
658 catch ( IOException e )
659 {
660 log.error( "Could not close object input stream.", e );
661 }
662 }
663
664
665
666
667
668
669
670
671 @SuppressWarnings("synthetic-access")
672 private void handle( LateralElementDescriptor<K, V> led )
673 throws IOException
674 {
675 String cacheName = led.ce.getCacheName();
676 K key = led.ce.getKey();
677 Serializable obj = null;
678
679 switch (led.command)
680 {
681 case UPDATE:
682 handlePut( led.ce );
683 break;
684
685 case REMOVE:
686
687
688
689 if ( led.valHashCode != -1 )
690 {
691 if ( getTcpLateralCacheAttributes().isFilterRemoveByHashCode() )
692 {
693 ICacheElement<K, V> test = getCache( cacheName ).localGet( key );
694 if ( test != null )
695 {
696 if ( test.getVal().hashCode() == led.valHashCode )
697 {
698 if ( log.isDebugEnabled() )
699 {
700 log.debug( "Filtering detected identical hashCode [" + led.valHashCode
701 + "], not issuing a remove for led " + led );
702 }
703 return;
704 }
705 else
706 {
707 if ( log.isDebugEnabled() )
708 {
709 log.debug( "Different hashcodes, in cache [" + test.getVal().hashCode()
710 + "] sent [" + led.valHashCode + "]" );
711 }
712 }
713 }
714 }
715 }
716 handleRemove( cacheName, key );
717 break;
718
719 case REMOVEALL:
720 handleRemoveAll( cacheName );
721 break;
722
723 case GET:
724 obj = handleGet( cacheName, key );
725 break;
726
727 case GET_MATCHING:
728 obj = (Serializable) handleGetMatching( cacheName, (String) key );
729 break;
730
731 case GET_KEYSET:
732 obj = (Serializable) handleGetKeySet(cacheName);
733 break;
734
735 default: break;
736 }
737
738 if (obj != null)
739 {
740 ObjectOutputStream oos = new ObjectOutputStream( socket.getOutputStream() );
741 oos.writeObject( obj );
742 oos.flush();
743 }
744 }
745 }
746
747
748
749
750 @Override
751 public void shutdown()
752 {
753 if ( shutdown.compareAndSet(false, true) )
754 {
755 if ( log.isInfoEnabled() )
756 {
757 log.info( "Shutting down TCP Lateral receiver." );
758 }
759
760 receiver.interrupt();
761 }
762 else
763 {
764 if ( log.isDebugEnabled() )
765 {
766 log.debug( "Shutdown already called." );
767 }
768 }
769 }
770 }