1 package org.apache.jcs.auxiliary.lateral.socket.tcp;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.IOException;
23 import java.io.ObjectInputStream;
24 import java.io.ObjectOutputStream;
25 import java.io.Serializable;
26 import java.net.InetAddress;
27 import java.net.ServerSocket;
28 import java.net.Socket;
29 import java.net.SocketTimeoutException;
30 import java.util.HashMap;
31 import java.util.Map;
32 import java.util.Set;
33 import java.util.concurrent.ExecutorService;
34 import java.util.concurrent.Executors;
35 import java.util.concurrent.ThreadFactory;
36
37 import org.apache.commons.logging.Log;
38 import org.apache.commons.logging.LogFactory;
39 import org.apache.jcs.access.exception.CacheException;
40 import org.apache.jcs.auxiliary.lateral.LateralCacheInfo;
41 import org.apache.jcs.auxiliary.lateral.LateralCommand;
42 import org.apache.jcs.auxiliary.lateral.LateralElementDescriptor;
43 import org.apache.jcs.auxiliary.lateral.behavior.ILateralCacheListener;
44 import org.apache.jcs.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
45 import org.apache.jcs.engine.behavior.ICacheElement;
46 import org.apache.jcs.engine.behavior.ICompositeCacheManager;
47 import org.apache.jcs.engine.behavior.IShutdownObserver;
48 import org.apache.jcs.engine.control.CompositeCache;
49 import org.apache.jcs.engine.control.CompositeCacheManager;
50
51
52
53
54
55
56 public class LateralTCPListener<K extends Serializable, V extends Serializable>
57 implements ILateralCacheListener<K, V>, Serializable, IShutdownObserver
58 {
59
60 private static final long serialVersionUID = -9107062664967131738L;
61
62
63 protected final static Log log = LogFactory.getLog( LateralTCPListener.class );
64
65
66 private final static int acceptTimeOut = 1000;
67
68
69 private transient ICompositeCacheManager cacheManager;
70
71
72 protected final static HashMap<String, ILateralCacheListener<?, ?>> instances =
73 new HashMap<String, ILateralCacheListener<?, ?>>();
74
75
76 private ListenerThread receiver;
77
78
79 private ITCPLateralCacheAttributes tcpLateralCacheAttributes;
80
81
82 protected int port;
83
84
85 protected ExecutorService pooledExecutor;
86
87
88 private int putCnt = 0;
89
90
91 private int removeCnt = 0;
92
93
94 private int getCnt = 0;
95
96
97
98
99
100 private long listenerId = LateralCacheInfo.listenerId;
101
102
103 protected boolean shutdown = false;
104
105
106 protected boolean terminated = false;
107
108
109
110
111
112
113
114
115 public synchronized static <K extends Serializable, V extends Serializable> LateralTCPListener<K, V>
116 getInstance( ITCPLateralCacheAttributes ilca, ICompositeCacheManager cacheMgr )
117 {
118 @SuppressWarnings("unchecked")
119 LateralTCPListener<K, V> ins = (LateralTCPListener<K, V>) instances.get( String.valueOf( ilca.getTcpListenerPort() ) );
120
121 if ( ins == null )
122 {
123 ins = new LateralTCPListener<K, V>( ilca );
124
125 ins.init();
126
127 ins.setCacheManager( cacheMgr );
128
129 instances.put( String.valueOf( ilca.getTcpListenerPort() ), ins );
130
131 if ( log.isInfoEnabled() )
132 {
133 log.info( "Created new listener " + ilca.getTcpListenerPort() );
134 }
135 }
136
137 return ins;
138 }
139
140
141
142
143
144
145 protected LateralTCPListener( ITCPLateralCacheAttributes ilca )
146 {
147 this.setTcpLateralCacheAttributes( ilca );
148 }
149
150
151
152
153 public void init()
154 {
155 try
156 {
157 this.port = getTcpLateralCacheAttributes().getTcpListenerPort();
158
159 receiver = new ListenerThread();
160 receiver.setDaemon( true );
161 receiver.start();
162
163 pooledExecutor = Executors.newCachedThreadPool(new MyThreadFactory());
164 terminated = false;
165 shutdown = false;
166 }
167 catch ( Exception ex )
168 {
169 log.error( ex );
170 throw new IllegalStateException( ex.getMessage() );
171 }
172 }
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189 public void setListenerId( long id )
190 throws IOException
191 {
192 this.listenerId = id;
193 if ( log.isDebugEnabled() )
194 {
195 log.debug( "set listenerId = " + id );
196 }
197 }
198
199
200
201
202
203
204
205 public long getListenerId()
206 throws IOException
207 {
208 return this.listenerId;
209 }
210
211
212
213
214
215
216
217 public void handlePut( ICacheElement<K, V> element )
218 throws IOException
219 {
220 putCnt++;
221 if ( log.isInfoEnabled() )
222 {
223 if ( getPutCnt() % 100 == 0 )
224 {
225 log.info( "Put Count (port " + getTcpLateralCacheAttributes().getTcpListenerPort() + ") = "
226 + getPutCnt() );
227 }
228 }
229
230 if ( log.isDebugEnabled() )
231 {
232 log.debug( "handlePut> cacheName=" + element.getCacheName() + ", key=" + element.getKey() );
233 }
234
235 getCache( element.getCacheName() ).localUpdate( element );
236 }
237
238
239
240
241
242
243
244
245 public void handleRemove( String cacheName, K key )
246 throws IOException
247 {
248 removeCnt++;
249 if ( log.isInfoEnabled() )
250 {
251 if ( getRemoveCnt() % 100 == 0 )
252 {
253 log.info( "Remove Count = " + getRemoveCnt() );
254 }
255 }
256
257 if ( log.isDebugEnabled() )
258 {
259 log.debug( "handleRemove> cacheName=" + cacheName + ", key=" + key );
260 }
261
262 getCache( cacheName ).localRemove( key );
263 }
264
265
266
267
268
269
270 public void handleRemoveAll( String cacheName )
271 throws IOException
272 {
273 if ( log.isDebugEnabled() )
274 {
275 log.debug( "handleRemoveAll> cacheName=" + cacheName );
276 }
277
278 getCache( cacheName ).localRemoveAll();
279 }
280
281
282
283
284
285
286
287
288
289 public ICacheElement<K, V> handleGet( String cacheName, K key )
290 throws IOException
291 {
292 getCnt++;
293 if ( log.isInfoEnabled() )
294 {
295 if ( getGetCnt() % 100 == 0 )
296 {
297 log.info( "Get Count (port " + getTcpLateralCacheAttributes().getTcpListenerPort() + ") = "
298 + getGetCnt() );
299 }
300 }
301
302 if ( log.isDebugEnabled() )
303 {
304 log.debug( "handleGet> cacheName=" + cacheName + ", key = " + key );
305 }
306
307 return getCache( cacheName ).localGet( key );
308 }
309
310
311
312
313
314
315
316
317
318 public Map<K, ICacheElement<K, V>> handleGetMatching( String cacheName, String pattern )
319 throws IOException
320 {
321 getCnt++;
322 if ( log.isInfoEnabled() )
323 {
324 if ( getGetCnt() % 100 == 0 )
325 {
326 log.info( "GetMatching Count (port " + getTcpLateralCacheAttributes().getTcpListenerPort() + ") = "
327 + getGetCnt() );
328 }
329 }
330
331 if ( log.isDebugEnabled() )
332 {
333 log.debug( "handleGetMatching> cacheName=" + cacheName + ", pattern = " + pattern );
334 }
335
336 return getCache( cacheName ).localGetMatching( pattern );
337 }
338
339
340
341
342
343
344
345
346
347 public Set<K> handleGetGroupKeys( String cacheName, String group ) throws IOException
348 {
349 return getCache( cacheName ).getGroupKeys(group, true);
350 }
351
352
353
354
355
356
357
358
359 public Set<String> handleGetGroupNames( String cacheName ) throws IOException
360 {
361 return getCache( cacheName ).getGroupNames(true);
362 }
363
364
365
366
367
368
369 public void handleDispose( String cacheName )
370 throws IOException
371 {
372 if ( log.isInfoEnabled() )
373 {
374 log.info( "handleDispose > cacheName=" + cacheName + " | Ignoring message. Do not dispose from remote." );
375 }
376
377
378 synchronized (this)
379 {
380 terminated = true;
381 }
382 }
383
384 public synchronized void dispose()
385 {
386 terminated = true;
387 notify();
388 }
389
390
391
392
393
394
395
396
397
398
399 protected CompositeCache<K, V> getCache( String name )
400 {
401 if ( getCacheManager() == null )
402 {
403
404 try
405 {
406 setCacheManager( CompositeCacheManager.getInstance() );
407 }
408 catch (CacheException e)
409 {
410 throw new RuntimeException("Could not retrieve cache manager instance", e);
411 }
412
413 if ( log.isDebugEnabled() )
414 {
415 log.debug( "cacheMgr = " + getCacheManager() );
416 }
417 }
418
419 return getCacheManager().getCache( name );
420 }
421
422
423
424
425
426
427 public int getPutCnt()
428 {
429 return putCnt;
430 }
431
432
433
434
435 public int getGetCnt()
436 {
437 return getCnt;
438 }
439
440
441
442
443 public int getRemoveCnt()
444 {
445 return removeCnt;
446 }
447
448
449
450
451 public void setCacheManager( ICompositeCacheManager cacheMgr )
452 {
453 this.cacheManager = cacheMgr;
454 }
455
456
457
458
459 public ICompositeCacheManager getCacheManager()
460 {
461 return cacheManager;
462 }
463
464
465
466
467 public void setTcpLateralCacheAttributes( ITCPLateralCacheAttributes tcpLateralCacheAttributes )
468 {
469 this.tcpLateralCacheAttributes = tcpLateralCacheAttributes;
470 }
471
472
473
474
475 public ITCPLateralCacheAttributes getTcpLateralCacheAttributes()
476 {
477 return tcpLateralCacheAttributes;
478 }
479
480
481
482
483
484 public class ListenerThread
485 extends Thread
486 {
487
488 @Override
489 public void run()
490 {
491 try
492 {
493 log.info( "Listening on port " + port );
494
495 ServerSocket serverSocket = new ServerSocket( port );
496 serverSocket.setSoTimeout( acceptTimeOut );
497
498 ConnectionHandler handler;
499
500 outer: while ( true )
501 {
502 if ( log.isDebugEnabled() )
503 {
504 log.debug( "Waiting for clients to connect " );
505 }
506
507 Socket socket = null;
508 inner: while (true)
509 {
510
511 synchronized (this)
512 {
513 if (terminated)
514 {
515 if (log.isDebugEnabled())
516 {
517 log.debug("Thread terminated, exiting gracefully");
518 }
519 break outer;
520 }
521 }
522 try
523 {
524 socket = serverSocket.accept();
525 break inner;
526 }
527 catch (SocketTimeoutException e)
528 {
529
530 continue inner;
531 }
532 }
533
534 if ( socket != null && log.isDebugEnabled() )
535 {
536 InetAddress inetAddress = socket.getInetAddress();
537 log.debug( "Connected to client at " + inetAddress );
538 }
539
540 handler = new ConnectionHandler( socket );
541 pooledExecutor.execute( handler );
542 }
543 }
544 catch ( Exception e )
545 {
546 log.error( "Exception caught in TCP listener", e );
547 }
548 }
549 }
550
551
552
553
554 public class ConnectionHandler
555 implements Runnable
556 {
557
558 private final Socket socket;
559
560
561
562
563
564 public ConnectionHandler( Socket socket )
565 {
566 this.socket = socket;
567 }
568
569
570
571
572 @SuppressWarnings("unchecked")
573 public void run()
574 {
575 ObjectInputStream ois;
576
577 try
578 {
579 ois = new ObjectInputStream( socket.getInputStream() );
580 }
581 catch ( Exception e )
582 {
583 log.error( "Could not open ObjectInputStream on " + socket, e );
584
585 return;
586 }
587
588 LateralElementDescriptor<K, V> led;
589
590 try
591 {
592 while ( true )
593 {
594 led = (LateralElementDescriptor<K, V>) ois.readObject();
595
596 if ( led == null )
597 {
598 log.debug( "LateralElementDescriptor is null" );
599 continue;
600 }
601 if ( led.requesterId == getListenerId() )
602 {
603 log.debug( "from self" );
604 }
605 else
606 {
607 if ( log.isDebugEnabled() )
608 {
609 log.debug( "receiving LateralElementDescriptor from another" + "led = " + led
610 + ", led.command = " + led.command + ", led.ce = " + led.ce );
611 }
612
613 handle( led );
614 }
615 }
616 }
617 catch ( java.io.EOFException e )
618 {
619 log.info( "Caught java.io.EOFException closing connection." + e.getMessage() );
620 }
621 catch ( java.net.SocketException e )
622 {
623 log.info( "Caught java.net.SocketException closing connection." + e.getMessage() );
624 }
625 catch ( Exception e )
626 {
627 log.error( "Unexpected exception.", e );
628 }
629
630 try
631 {
632 ois.close();
633 }
634 catch ( IOException e )
635 {
636 log.error( "Could not close object input stream.", e );
637 }
638 }
639
640
641
642
643
644
645
646
647 private void handle( LateralElementDescriptor<K, V> led )
648 throws IOException
649 {
650 String cacheName = led.ce.getCacheName();
651 K key = led.ce.getKey();
652
653 if ( led.command == LateralCommand.UPDATE )
654 {
655 handlePut( led.ce );
656 }
657 else if ( led.command == LateralCommand.REMOVE )
658 {
659
660
661
662 if ( led.valHashCode != -1 )
663 {
664 if ( getTcpLateralCacheAttributes().isFilterRemoveByHashCode() )
665 {
666 ICacheElement<K, V> test = getCache( cacheName ).localGet( key );
667 if ( test != null )
668 {
669 if ( test.getVal().hashCode() == led.valHashCode )
670 {
671 if ( log.isDebugEnabled() )
672 {
673 log.debug( "Filtering detected identical hashCode [" + led.valHashCode
674 + "], not issuing a remove for led " + led );
675 }
676 return;
677 }
678 else
679 {
680 if ( log.isDebugEnabled() )
681 {
682 log.debug( "Different hashcodes, in cache [" + test.getVal().hashCode()
683 + "] sent [" + led.valHashCode + "]" );
684 }
685 }
686 }
687 }
688 }
689 handleRemove( cacheName, key );
690 }
691 else if ( led.command == LateralCommand.REMOVEALL )
692 {
693 handleRemoveAll( cacheName );
694 }
695 else if ( led.command == LateralCommand.GET )
696 {
697 Serializable obj = handleGet( cacheName, key );
698
699 ObjectOutputStream oos = new ObjectOutputStream( socket.getOutputStream() );
700 oos.writeObject( obj );
701 oos.flush();
702 }
703 else if ( led.command == LateralCommand.GET_MATCHING )
704 {
705 Map<K, ICacheElement<K, V>> obj = handleGetMatching( cacheName, (String) key );
706
707 ObjectOutputStream oos = new ObjectOutputStream( socket.getOutputStream() );
708 oos.writeObject( obj );
709 oos.flush();
710 }
711 else if ( led.command == LateralCommand.GET_GROUP_KEYS )
712 {
713 String groupName = (String) key;
714 Set<K> obj = handleGetGroupKeys(cacheName, groupName);
715
716 ObjectOutputStream oos = new ObjectOutputStream( socket.getOutputStream() );
717 oos.writeObject( obj );
718 oos.flush();
719 }
720 else if ( led.command == LateralCommand.GET_GROUP_NAMES )
721 {
722 Set<String> obj = handleGetGroupNames(cacheName);
723
724 ObjectOutputStream oos = new ObjectOutputStream( socket.getOutputStream() );
725 oos.writeObject( obj );
726 oos.flush();
727 }
728 }
729 }
730
731
732
733
734
735
736 protected static class MyThreadFactory
737 implements ThreadFactory
738 {
739
740
741
742
743 public Thread newThread( Runnable runner )
744 {
745 Thread t = new Thread( runner );
746 t.setDaemon( true );
747 t.setPriority( Thread.MIN_PRIORITY );
748 return t;
749 }
750 }
751
752
753
754
755 public void shutdown()
756 {
757 if ( !shutdown )
758 {
759 shutdown = true;
760
761 if ( log.isInfoEnabled() )
762 {
763 log.info( "Shutting down TCP Lateral receiver." );
764 }
765 receiver.interrupt();
766 }
767 else
768 {
769 if ( log.isDebugEnabled() )
770 {
771 log.debug( "Shutdown already called." );
772 }
773 }
774 }
775 }