1 package org.apache.commons.jcs3.auxiliary.lateral.socket.tcp;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.IOException;
23 import java.io.InputStream;
24 import java.io.OutputStream;
25 import java.net.InetSocketAddress;
26 import java.net.ServerSocket;
27 import java.net.Socket;
28 import java.net.SocketAddress;
29 import java.nio.channels.SelectionKey;
30 import java.nio.channels.Selector;
31 import java.nio.channels.ServerSocketChannel;
32 import java.nio.channels.SocketChannel;
33 import java.util.Iterator;
34 import java.util.Map;
35 import java.util.Set;
36 import java.util.concurrent.ConcurrentHashMap;
37 import java.util.concurrent.atomic.AtomicBoolean;
38
39 import org.apache.commons.jcs3.auxiliary.lateral.LateralElementDescriptor;
40 import org.apache.commons.jcs3.auxiliary.lateral.behavior.ILateralCacheListener;
41 import org.apache.commons.jcs3.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
42 import org.apache.commons.jcs3.engine.CacheInfo;
43 import org.apache.commons.jcs3.engine.behavior.ICacheElement;
44 import org.apache.commons.jcs3.engine.behavior.ICompositeCacheManager;
45 import org.apache.commons.jcs3.engine.behavior.IElementSerializer;
46 import org.apache.commons.jcs3.engine.behavior.IShutdownObserver;
47 import org.apache.commons.jcs3.engine.control.CompositeCache;
48 import org.apache.commons.jcs3.log.Log;
49 import org.apache.commons.jcs3.log.LogManager;
50 import org.apache.commons.jcs3.utils.serialization.StandardSerializer;
51
52
53
54
55
56
57 public class LateralTCPListener<K, V>
58 implements ILateralCacheListener<K, V>, IShutdownObserver
59 {
60
61 private static final Log log = LogManager.getLog( LateralTCPListener.class );
62
63
64 private static final int acceptTimeOut = 1000;
65
66
67 private transient ICompositeCacheManager cacheManager;
68
69
70 private static final ConcurrentHashMap<String, ILateralCacheListener<?, ?>> instances =
71 new ConcurrentHashMap<>();
72
73
74 private ITCPLateralCacheAttributes tcpLateralCacheAttributes;
75
76
77 private Thread listenerThread;
78
79
80
81
82 private IElementSerializer serializer;
83
84
85 private int putCnt;
86
87
88 private int removeCnt;
89
90
91 private int getCnt;
92
93
94
95
96
97 private long listenerId = CacheInfo.listenerId;
98
99
100 private final AtomicBoolean shutdown = new AtomicBoolean();
101
102
103 private final AtomicBoolean terminated = new AtomicBoolean();
104
105
106
107
108
109
110
111
112
113 @Deprecated
114 @SuppressWarnings("unchecked")
115 public static <K, V> LateralTCPListener<K, V>
116 getInstance( final ITCPLateralCacheAttributes ilca, final ICompositeCacheManager cacheMgr)
117 {
118 return (LateralTCPListener<K, V>) instances.computeIfAbsent(
119 String.valueOf( ilca.getTcpListenerPort() ),
120 k -> {
121 final LateralTCPListener<K, V> newIns = new LateralTCPListener<>( ilca, new StandardSerializer() );
122
123 newIns.init();
124 newIns.setCacheManager( cacheMgr );
125
126 log.info("Created new listener {0}", ilca::getTcpListenerPort);
127
128 return newIns;
129 });
130 }
131
132
133
134
135
136
137
138
139
140 @SuppressWarnings("unchecked")
141 public static <K, V> LateralTCPListener<K, V>
142 getInstance( final ITCPLateralCacheAttributes ilca, final ICompositeCacheManager cacheMgr, final IElementSerializer serializer )
143 {
144 return (LateralTCPListener<K, V>) instances.computeIfAbsent(
145 String.valueOf( ilca.getTcpListenerPort() ),
146 k -> {
147 final LateralTCPListener<K, V> newIns = new LateralTCPListener<>( ilca, serializer );
148
149 newIns.init();
150 newIns.setCacheManager( cacheMgr );
151
152 log.info("Created new listener {0}", ilca::getTcpListenerPort);
153
154 return newIns;
155 });
156 }
157
158
159
160
161
162
163
164 @Deprecated
165 protected LateralTCPListener( final ITCPLateralCacheAttributes ilca )
166 {
167 this(ilca, new StandardSerializer());
168 }
169
170
171
172
173
174
175
176 protected LateralTCPListener( final ITCPLateralCacheAttributes ilca, final IElementSerializer serializer )
177 {
178 this.setTcpLateralCacheAttributes( ilca );
179 this.serializer = serializer;
180 }
181
182
183
184
185 @Override
186 public synchronized void init()
187 {
188 try
189 {
190 final int port = getTcpLateralCacheAttributes().getTcpListenerPort();
191 final String host = getTcpLateralCacheAttributes().getTcpListenerHost();
192
193 terminated.set(false);
194 shutdown.set(false);
195
196 final ServerSocketChannel serverSocket = ServerSocketChannel.open();
197
198 SocketAddress endPoint;
199
200 if (host != null && !host.isEmpty())
201 {
202 log.info( "Listening on {0}:{1}", host, port );
203
204 endPoint = new InetSocketAddress(host, port);
205 }
206 else
207 {
208 log.info( "Listening on port {0}", port );
209 endPoint = new InetSocketAddress(port);
210 }
211
212 serverSocket.bind(endPoint);
213 serverSocket.configureBlocking(false);
214
215 listenerThread = new Thread(() -> runListener(serverSocket),
216 "JCS-LateralTCPListener-" + host + ":" + port);
217 listenerThread.setDaemon(true);
218 listenerThread.start();
219 }
220 catch ( final IOException ex )
221 {
222 throw new IllegalStateException( ex );
223 }
224 }
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241 @Override
242 public void setListenerId( final long id )
243 throws IOException
244 {
245 this.listenerId = id;
246 log.debug( "set listenerId = {0}", id );
247 }
248
249
250
251
252
253
254
255 @Override
256 public long getListenerId()
257 throws IOException
258 {
259 return this.listenerId;
260 }
261
262
263
264
265
266
267
268 @Override
269 public void handlePut( final ICacheElement<K, V> element )
270 throws IOException
271 {
272 putCnt++;
273 if ( log.isInfoEnabled() && getPutCnt() % 100 == 0 )
274 {
275 log.info( "Put Count (port {0}) = {1}",
276 () -> getTcpLateralCacheAttributes().getTcpListenerPort(),
277 this::getPutCnt);
278 }
279
280 log.debug( "handlePut> cacheName={0}, key={1}",
281 element::getCacheName, element::getKey);
282
283 getCache( element.getCacheName() ).localUpdate( element );
284 }
285
286
287
288
289
290
291
292
293 @Override
294 public void handleRemove( final String cacheName, final K key )
295 throws IOException
296 {
297 removeCnt++;
298 if ( log.isInfoEnabled() && getRemoveCnt() % 100 == 0 )
299 {
300 log.info( "Remove Count = {0}", this::getRemoveCnt);
301 }
302
303 log.debug( "handleRemove> cacheName={0}, key={1}", cacheName, key );
304
305 getCache( cacheName ).localRemove( key );
306 }
307
308
309
310
311
312
313 @Override
314 public void handleRemoveAll( final String cacheName )
315 throws IOException
316 {
317 log.debug( "handleRemoveAll> cacheName={0}", cacheName );
318
319 getCache( cacheName ).localRemoveAll();
320 }
321
322
323
324
325
326
327
328
329
330 public ICacheElement<K, V> handleGet( final String cacheName, final K key )
331 throws IOException
332 {
333 getCnt++;
334 if ( log.isInfoEnabled() && getGetCnt() % 100 == 0 )
335 {
336 log.info( "Get Count (port {0}) = {1}",
337 () -> getTcpLateralCacheAttributes().getTcpListenerPort(),
338 this::getGetCnt);
339 }
340
341 log.debug( "handleGet> cacheName={0}, key={1}", cacheName, key );
342
343 return getCache( cacheName ).localGet( key );
344 }
345
346
347
348
349
350
351
352
353
354 public Map<K, ICacheElement<K, V>> handleGetMatching( final String cacheName, final String pattern )
355 throws IOException
356 {
357 getCnt++;
358 if ( log.isInfoEnabled() && getGetCnt() % 100 == 0 )
359 {
360 log.info( "GetMatching Count (port {0}) = {1}",
361 () -> getTcpLateralCacheAttributes().getTcpListenerPort(),
362 this::getGetCnt);
363 }
364
365 log.debug( "handleGetMatching> cacheName={0}, pattern={1}", cacheName, pattern );
366
367 return getCache( cacheName ).localGetMatching( pattern );
368 }
369
370
371
372
373
374
375
376
377 public Set<K> handleGetKeySet( final String cacheName ) throws IOException
378 {
379 return getCache( cacheName ).getKeySet(true);
380 }
381
382
383
384
385
386
387 @Override
388 public void handleDispose( final String cacheName )
389 throws IOException
390 {
391 log.info( "handleDispose > cacheName={0} | Ignoring message. "
392 + "Do not dispose from remote.", cacheName );
393
394
395 dispose();
396 }
397
398 @Override
399 public synchronized void dispose()
400 {
401 if (terminated.compareAndSet(false, true))
402 {
403 notify();
404 listenerThread.interrupt();
405 }
406 }
407
408
409
410
411
412
413
414
415
416
417 protected CompositeCache<K, V> getCache( final String name )
418 {
419 return getCacheManager().getCache( name );
420 }
421
422
423
424
425
426
427 public int getPutCnt()
428 {
429 return putCnt;
430 }
431
432
433
434
435 public int getGetCnt()
436 {
437 return getCnt;
438 }
439
440
441
442
443 public int getRemoveCnt()
444 {
445 return removeCnt;
446 }
447
448
449
450
451 @Override
452 public void setCacheManager( final ICompositeCacheManager cacheMgr )
453 {
454 this.cacheManager = cacheMgr;
455 }
456
457
458
459
460 @Override
461 public ICompositeCacheManager getCacheManager()
462 {
463 return cacheManager;
464 }
465
466
467
468
469 public void setTcpLateralCacheAttributes( final ITCPLateralCacheAttributes tcpLateralCacheAttributes )
470 {
471 this.tcpLateralCacheAttributes = tcpLateralCacheAttributes;
472 }
473
474
475
476
477 public ITCPLateralCacheAttributes getTcpLateralCacheAttributes()
478 {
479 return tcpLateralCacheAttributes;
480 }
481
482
483
484
485
486
487 @Deprecated
488 public class ListenerThread
489 extends Thread
490 {
491
492 private final ServerSocket serverSocket;
493
494
495
496
497
498
499 public ListenerThread(final ServerSocket serverSocket)
500 {
501 this.serverSocket = serverSocket;
502 }
503
504
505 @Override
506 public void run()
507 {
508 runListener(serverSocket.getChannel());
509 }
510 }
511
512
513
514
515
516 private void runListener(final ServerSocketChannel serverSocket)
517 {
518 try (Selector selector = Selector.open())
519 {
520 serverSocket.register(selector, SelectionKey.OP_ACCEPT);
521 log.debug("Waiting for clients to connect");
522
523
524 while (!terminated.get())
525 {
526 int activeKeys = selector.select(acceptTimeOut);
527 if (activeKeys == 0)
528 {
529 continue;
530 }
531
532 for (Iterator<SelectionKey> i = selector.selectedKeys().iterator(); i.hasNext();)
533 {
534 if (terminated.get())
535 {
536 break;
537 }
538
539 SelectionKey key = i.next();
540
541 if (!key.isValid())
542 {
543 continue;
544 }
545
546 if (key.isAcceptable())
547 {
548 ServerSocketChannel server = (ServerSocketChannel) key.channel();
549 SocketChannel client = server.accept();
550 if (client == null)
551 {
552
553 continue;
554 }
555
556 log.info("Connected to client at {0}", client.getRemoteAddress());
557
558 client.configureBlocking(false);
559 client.register(selector, SelectionKey.OP_READ);
560 }
561
562 if (key.isReadable())
563 {
564 handleClient(key);
565 }
566
567 i.remove();
568 }
569 }
570
571 log.debug("Thread terminated, exiting gracefully");
572
573
574 selector.keys().forEach(key -> {
575 try
576 {
577 key.channel().close();
578 }
579 catch (IOException e)
580 {
581 log.warn("Problem closing channel", e);
582 }
583 });
584 }
585 catch (final IOException e)
586 {
587 log.error( "Exception caught in TCP listener", e );
588 }
589 finally
590 {
591 try
592 {
593 serverSocket.close();
594 }
595 catch (IOException e)
596 {
597 log.error( "Exception closing TCP listener", e );
598 }
599 }
600 }
601
602
603
604
605
606 @Deprecated
607 public class ConnectionHandler
608 implements Runnable
609 {
610
611 private final Socket socket;
612
613
614
615
616
617 public ConnectionHandler( final Socket socket )
618 {
619 this.socket = socket;
620 }
621
622
623
624
625 @Override
626 public void run()
627 {
628 try (InputStream is = socket.getInputStream())
629 {
630 while ( true )
631 {
632 final LateralElementDescriptor<K, V> led =
633 serializer.deSerializeFrom(is, null);
634
635 if ( led == null )
636 {
637 log.debug( "LateralElementDescriptor is null" );
638 continue;
639 }
640 if ( led.getRequesterId() == getListenerId() )
641 {
642 log.debug( "from self" );
643 }
644 else
645 {
646 log.debug( "receiving LateralElementDescriptor from another led = {0}",
647 led );
648
649 Object obj = handleElement(led);
650 if (obj != null)
651 {
652 OutputStream os = socket.getOutputStream();
653 serializer.serializeTo(obj, os);
654 os.flush();
655 }
656 }
657 }
658 }
659 catch (final IOException e)
660 {
661 log.info("Caught {0}, closing connection.", e.getClass().getSimpleName(), e);
662 }
663 catch (final ClassNotFoundException e)
664 {
665 log.error( "Deserialization failed reading from socket", e );
666 }
667 }
668 }
669
670
671
672
673 private void handleClient(final SelectionKey key)
674 {
675 final SocketChannel socketChannel = (SocketChannel) key.channel();
676
677 try
678 {
679 final LateralElementDescriptor<K, V> led =
680 serializer.deSerializeFrom(socketChannel, null);
681
682 if ( led == null )
683 {
684 log.debug("LateralElementDescriptor is null");
685 return;
686 }
687
688 if ( led.getRequesterId() == getListenerId() )
689 {
690 log.debug( "from self" );
691 }
692 else
693 {
694 log.debug( "receiving LateralElementDescriptor from another led = {0}",
695 led );
696
697 Object obj = handleElement(led);
698 if (obj != null)
699 {
700 serializer.serializeTo(obj, socketChannel);
701 }
702 }
703 }
704 catch (final IOException e)
705 {
706 log.info("Caught {0}, closing connection.", e.getClass().getSimpleName(), e);
707 try
708 {
709 socketChannel.close();
710 }
711 catch (IOException e1)
712 {
713 log.error("Error while closing connection", e );
714 }
715 }
716 catch (final ClassNotFoundException e)
717 {
718 log.error( "Deserialization failed reading from socket", e );
719 }
720 }
721
722
723
724
725
726
727
728
729
730 private Object handleElement(final LateralElementDescriptor<K, V> led) throws IOException
731 {
732 final String cacheName = led.getPayload().getCacheName();
733 final K key = led.getPayload().getKey();
734 Object obj = null;
735
736 switch (led.getCommand())
737 {
738 case UPDATE:
739 handlePut(led.getPayload());
740 break;
741
742 case REMOVE:
743
744
745
746 if (led.getValHashCode() != -1 &&
747 getTcpLateralCacheAttributes().isFilterRemoveByHashCode())
748 {
749 final ICacheElement<K, V> test = getCache( cacheName ).localGet( key );
750 if ( test != null )
751 {
752 if ( test.getVal().hashCode() == led.getValHashCode() )
753 {
754 log.debug( "Filtering detected identical hashCode [{0}], "
755 + "not issuing a remove for led {1}",
756 led.getValHashCode(), led );
757 return null;
758 }
759 log.debug( "Different hash codes, in cache [{0}] sent [{1}]",
760 test.getVal()::hashCode, led::getValHashCode );
761 }
762 }
763 handleRemove( cacheName, key );
764 break;
765
766 case REMOVEALL:
767 handleRemoveAll( cacheName );
768 break;
769
770 case GET:
771 obj = handleGet( cacheName, key );
772 break;
773
774 case GET_MATCHING:
775 obj = handleGetMatching( cacheName, (String) key );
776 break;
777
778 case GET_KEYSET:
779 obj = handleGetKeySet(cacheName);
780 break;
781
782 default: break;
783 }
784
785 return obj;
786 }
787
788
789
790
791 @Override
792 public void shutdown()
793 {
794 if ( shutdown.compareAndSet(false, true) )
795 {
796 log.info( "Shutting down TCP Lateral receiver." );
797 dispose();
798 }
799 else
800 {
801 log.debug( "Shutdown already called." );
802 }
803 }
804 }