1 package org.apache.jcs.auxiliary.remote.server;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 import java.io.IOException;
19 import java.io.Serializable;
20 import java.rmi.RemoteException;
21 import java.rmi.registry.Registry;
22 import java.rmi.server.RMISocketFactory;
23 import java.rmi.server.UnicastRemoteObject;
24 import java.rmi.server.Unreferenced;
25 import java.util.Collections;
26 import java.util.Hashtable;
27 import java.util.Iterator;
28 import java.util.Map;
29 import java.util.Set;
30
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.apache.jcs.access.exception.CacheException;
34 import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheListener;
35 import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheObserver;
36 import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheServiceAdmin;
37 import org.apache.jcs.auxiliary.remote.server.behavior.IRemoteCacheServerAttributes;
38 import org.apache.jcs.auxiliary.remote.server.behavior.RemoteType;
39 import org.apache.jcs.engine.CacheEventQueueFactory;
40 import org.apache.jcs.engine.CacheListeners;
41 import org.apache.jcs.engine.behavior.ICacheElement;
42 import org.apache.jcs.engine.behavior.ICacheEventQueue;
43 import org.apache.jcs.engine.behavior.ICacheListener;
44 import org.apache.jcs.engine.behavior.ICacheServiceNonLocal;
45 import org.apache.jcs.engine.control.CompositeCache;
46 import org.apache.jcs.engine.control.CompositeCacheManager;
47 import org.apache.jcs.engine.logging.CacheEvent;
48 import org.apache.jcs.engine.logging.behavior.ICacheEvent;
49 import org.apache.jcs.engine.logging.behavior.ICacheEventLogger;
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66 public class RemoteCacheServer<K extends Serializable, V extends Serializable>
67 extends UnicastRemoteObject
68 implements ICacheServiceNonLocal<K, V>, IRemoteCacheObserver, IRemoteCacheServiceAdmin, Unreferenced
69 {
70
71 private static final long serialVersionUID = -8072345435941473116L;
72
73
74 private final static Log log = LogFactory.getLog( RemoteCacheServer.class );
75
76
77 protected final static boolean timing = true;
78
79
80 private int puts = 0;
81
82
83 private final Hashtable<String, CacheListeners<K, V>> cacheListenersMap =
84 new Hashtable<String, CacheListeners<K, V>>();
85
86
87 private final Hashtable<String, CacheListeners<K, V>> clusterListenersMap =
88 new Hashtable<String, CacheListeners<K, V>>();
89
90
91 private CompositeCacheManager cacheManager;
92
93
94 private final Hashtable<Long, RemoteType> idTypeMap = new Hashtable<Long, RemoteType>();
95
96
97 private final Hashtable<Long, String> idIPMap = new Hashtable<Long, String>();
98
99
100 private final int[] listenerId = new int[1];
101
102
103 protected IRemoteCacheServerAttributes remoteCacheServerAttributes;
104
105
106 private final int logInterval = 100;
107
108
109 private transient ICacheEventLogger cacheEventLogger;
110
111
112
113
114
115
116
117
118 protected RemoteCacheServer( IRemoteCacheServerAttributes rcsa )
119 throws RemoteException
120 {
121 super( rcsa.getServicePort() );
122 this.remoteCacheServerAttributes = rcsa;
123 init( rcsa.getConfigFileName() );
124 }
125
126
127
128
129
130
131
132
133
134 protected RemoteCacheServer( IRemoteCacheServerAttributes rcsa, RMISocketFactory customRMISocketFactory )
135 throws RemoteException
136 {
137 super( rcsa.getServicePort(), customRMISocketFactory, customRMISocketFactory );
138 this.remoteCacheServerAttributes = rcsa;
139 init( rcsa.getConfigFileName() );
140 }
141
142
143
144
145
146
147
148 private void init( String prop ) throws RemoteException
149 {
150 try
151 {
152 cacheManager = createCacheManager( prop );
153 }
154 catch (CacheException e)
155 {
156 throw new RemoteException(e.getMessage(), e);
157 }
158
159
160
161 String[] list = cacheManager.getCacheNames();
162 for ( int i = 0; i < list.length; i++ )
163 {
164 String name = list[i];
165 CompositeCache<K, V> cache = cacheManager.getCache( name );
166 cacheListenersMap.put( name, new CacheListeners<K, V>( cache ) );
167 }
168 }
169
170
171
172
173
174
175
176
177
178 private CompositeCacheManager createCacheManager( String prop ) throws CacheException
179 {
180 CompositeCacheManager hub = CompositeCacheManager.getUnconfiguredInstance();
181
182 if ( prop == null )
183 {
184 hub.configure( "/remote.cache.ccf" );
185 }
186 else
187 {
188 hub.configure( prop );
189 }
190 return hub;
191 }
192
193
194
195
196
197
198
199
200
201
202
203 public void put( ICacheElement<K, V> item )
204 throws IOException
205 {
206 update( item );
207 }
208
209
210
211
212
213 public void update( ICacheElement<K, V> item )
214 throws IOException
215 {
216 update( item, 0 );
217 }
218
219
220
221
222
223
224
225
226 public void update( ICacheElement<K, V> item, long requesterId )
227 throws IOException
228 {
229 ICacheEvent<ICacheElement<K, V>> cacheEvent = createICacheEvent( item, requesterId, ICacheEventLogger.UPDATE_EVENT );
230 try
231 {
232 processUpdate( item, requesterId );
233 }
234 finally
235 {
236 logICacheEvent( cacheEvent );
237 }
238 }
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263 private void processUpdate( ICacheElement<K, V> item, long requesterId )
264 {
265 long start = 0;
266 if ( timing )
267 {
268 start = System.currentTimeMillis();
269 }
270
271 logUpdateInfo( item );
272
273 try
274 {
275 CacheListeners<K, V> cacheDesc = getCacheListeners( item.getCacheName() );
276
277
278 boolean fromCluster = isRequestFromCluster( requesterId );
279
280 if ( log.isDebugEnabled() )
281 {
282 log.debug( "In update, requesterId = [" + requesterId + "] fromCluster = " + fromCluster );
283 }
284
285
286 synchronized ( cacheDesc )
287 {
288 try
289 {
290 CompositeCache<K, V> c = (CompositeCache<K, V>) cacheDesc.cache;
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306 if ( fromCluster )
307 {
308 if ( log.isDebugEnabled() )
309 {
310 log.debug( "Put FROM cluster, NOT updating other auxiliaries for region. "
311 + " requesterId [" + requesterId + "]" );
312 }
313 c.localUpdate( item );
314 }
315 else
316 {
317 if ( log.isDebugEnabled() )
318 {
319 log.debug( "Put NOT from cluster, updating other auxiliaries for region. "
320 + " requesterId [" + requesterId + "]" );
321 }
322 c.update( item );
323 }
324 }
325 catch ( Exception ce )
326 {
327
328 if ( log.isInfoEnabled() )
329 {
330 log.info( "Exception caught updating item. requesterId [" + requesterId + "] "
331 + ce.getMessage() );
332 }
333 }
334
335
336
337 if ( !fromCluster || ( fromCluster && remoteCacheServerAttributes.isLocalClusterConsistency() ) )
338 {
339 ICacheEventQueue<K, V>[] qlist = getEventQList( cacheDesc, requesterId );
340
341 if ( qlist != null )
342 {
343 if ( log.isDebugEnabled() )
344 {
345 log.debug( "qlist.length = " + qlist.length );
346 }
347 for ( int i = 0; i < qlist.length; i++ )
348 {
349 qlist[i].addPutEvent( item );
350 }
351 }
352 else
353 {
354 if ( log.isDebugEnabled() )
355 {
356 log.debug( "q list is null" );
357 }
358 }
359 }
360 }
361 }
362 catch ( IOException e )
363 {
364 if ( cacheEventLogger != null )
365 {
366 cacheEventLogger.logError( "RemoteCacheServer", ICacheEventLogger.UPDATE_EVENT, e.getMessage()
367 + " REGION: " + item.getCacheName() + " ITEM: " + item );
368 }
369
370 log.error( "Trouble in Update. requesterId [" + requesterId + "]", e );
371 }
372
373
374 if ( timing )
375 {
376 long end = System.currentTimeMillis();
377 if ( log.isDebugEnabled() )
378 {
379 log.debug( "put took " + String.valueOf( end - start ) + " ms." );
380 }
381 }
382 }
383
384
385
386
387
388
389 private void logUpdateInfo( ICacheElement<K, V> item )
390 {
391 if ( log.isInfoEnabled() )
392 {
393
394 puts++;
395 if ( puts % logInterval == 0 )
396 {
397 log.info( "puts = " + puts );
398 }
399 }
400
401 if ( log.isDebugEnabled() )
402 {
403 log.debug( "In update, put [" + item.getKey() + "] in [" + item.getCacheName() + "]" );
404 }
405 }
406
407
408
409
410
411
412
413
414
415
416 public ICacheElement<K, V> get( String cacheName, K key )
417 throws IOException
418 {
419 return this.get( cacheName, key, 0 );
420 }
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435 public ICacheElement<K, V> get( String cacheName, K key, long requesterId )
436 throws IOException
437 {
438 ICacheElement<K, V> element = null;
439 ICacheEvent<K> cacheEvent = createICacheEvent( cacheName, key, requesterId, ICacheEventLogger.GET_EVENT );
440 try
441 {
442 element = processGet( cacheName, key, requesterId );
443 }
444 finally
445 {
446 logICacheEvent( cacheEvent );
447 }
448 return element;
449 }
450
451
452
453
454
455
456
457
458
459
460
461 private ICacheElement<K, V> processGet( String cacheName, K key, long requesterId )
462 {
463 boolean fromCluster = isRequestFromCluster( requesterId );
464
465 if ( log.isDebugEnabled() )
466 {
467 log.debug( "get [" + key + "] from cache [" + cacheName + "] requesterId = [" + requesterId
468 + "] fromCluster = " + fromCluster );
469 }
470
471 CacheListeners<K, V> cacheDesc = null;
472 try
473 {
474 cacheDesc = getCacheListeners( cacheName );
475 }
476 catch ( Exception e )
477 {
478 log.error( "Problem getting listeners.", e );
479
480 if ( cacheEventLogger != null )
481 {
482 cacheEventLogger.logError( "RemoteCacheServer", ICacheEventLogger.GET_EVENT, e.getMessage() + cacheName
483 + " KEY: " + key );
484 }
485 }
486
487 ICacheElement<K, V> element = null;
488
489 element = getFromCacheListeners( key, fromCluster, cacheDesc, element );
490 return element;
491 }
492
493
494
495
496
497
498
499
500
501
502 private ICacheElement<K, V> getFromCacheListeners( K key, boolean fromCluster, CacheListeners<K, V> cacheDesc,
503 ICacheElement<K, V> element )
504 {
505 ICacheElement<K, V> returnElement = element;
506
507 if ( cacheDesc != null )
508 {
509 CompositeCache<K, V> c = (CompositeCache<K, V>) cacheDesc.cache;
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525 if ( !fromCluster && this.remoteCacheServerAttributes.isAllowClusterGet() )
526 {
527 if ( log.isDebugEnabled() )
528 {
529 log.debug( "NonLocalGet. fromCluster [" + fromCluster + "] AllowClusterGet ["
530 + this.remoteCacheServerAttributes.isAllowClusterGet() + "]" );
531 }
532 returnElement = c.get( key );
533 }
534 else
535 {
536
537
538
539
540 if ( log.isDebugEnabled() )
541 {
542 log.debug( "LocalGet. fromCluster [" + fromCluster + "] AllowClusterGet ["
543 + this.remoteCacheServerAttributes.isAllowClusterGet() + "]" );
544 }
545 returnElement = c.localGet( key );
546 }
547 }
548
549 return returnElement;
550 }
551
552
553
554
555
556
557
558
559
560 public Map<K, ICacheElement<K, V>> getMatching( String cacheName, String pattern )
561 throws IOException
562 {
563 return getMatching( cacheName, pattern, 0 );
564 }
565
566
567
568
569
570
571
572
573
574
575 public Map<K, ICacheElement<K, V>> getMatching( String cacheName, String pattern, long requesterId )
576 throws IOException
577 {
578 ICacheEvent<String> cacheEvent = createICacheEvent( cacheName, pattern, requesterId,
579 ICacheEventLogger.GETMATCHING_EVENT );
580 try
581 {
582 return processGetMatching( cacheName, pattern, requesterId );
583 }
584 finally
585 {
586 logICacheEvent( cacheEvent );
587 }
588 }
589
590
591
592
593
594
595
596
597
598 protected Map<K, ICacheElement<K, V>> processGetMatching( String cacheName, String pattern, long requesterId )
599 {
600 boolean fromCluster = isRequestFromCluster( requesterId );
601
602 if ( log.isDebugEnabled() )
603 {
604 log.debug( "getMatching [" + pattern + "] from cache [" + cacheName + "] requesterId = [" + requesterId
605 + "] fromCluster = " + fromCluster );
606 }
607
608 CacheListeners<K, V> cacheDesc = null;
609 try
610 {
611 cacheDesc = getCacheListeners( cacheName );
612 }
613 catch ( Exception e )
614 {
615 log.error( "Problem getting listeners.", e );
616
617 if ( cacheEventLogger != null )
618 {
619 cacheEventLogger.logError( "RemoteCacheServer", ICacheEventLogger.GETMATCHING_EVENT, e.getMessage()
620 + cacheName + " pattern: " + pattern );
621 }
622 }
623
624 return getMatchingFromCacheListeners( pattern, fromCluster, cacheDesc );
625 }
626
627
628
629
630
631
632
633
634
635 private Map<K, ICacheElement<K, V>> getMatchingFromCacheListeners( String pattern, boolean fromCluster, CacheListeners<K, V> cacheDesc )
636 {
637 Map<K, ICacheElement<K, V>> elements = null;
638 if ( cacheDesc != null )
639 {
640 CompositeCache<K, V> c = (CompositeCache<K, V>) cacheDesc.cache;
641
642
643
644
645
646 if ( !fromCluster && this.remoteCacheServerAttributes.isAllowClusterGet() )
647 {
648 if ( log.isDebugEnabled() )
649 {
650 log.debug( "NonLocalGetMatching. fromCluster [" + fromCluster + "] AllowClusterGet ["
651 + this.remoteCacheServerAttributes.isAllowClusterGet() + "]" );
652 }
653 elements = c.getMatching( pattern );
654 }
655 else
656 {
657
658
659
660
661 if ( log.isDebugEnabled() )
662 {
663 log.debug( "LocalGetMatching. fromCluster [" + fromCluster + "] AllowClusterGet ["
664 + this.remoteCacheServerAttributes.isAllowClusterGet() + "]" );
665 }
666 elements = c.localGetMatching( pattern );
667 }
668 }
669 return elements;
670 }
671
672
673
674
675
676
677
678
679
680
681 public Map<K, ICacheElement<K, V>> getMultiple( String cacheName, Set<K> keys )
682 throws IOException
683 {
684 return this.getMultiple( cacheName, keys, 0 );
685 }
686
687
688
689
690
691
692
693
694
695
696
697
698
699 public Map<K, ICacheElement<K, V>> getMultiple( String cacheName, Set<K> keys, long requesterId )
700 throws IOException
701 {
702 ICacheEvent<Serializable> cacheEvent = createICacheEvent( cacheName, (Serializable) keys, requesterId,
703 ICacheEventLogger.GETMULTIPLE_EVENT );
704 try
705 {
706 return processGetMultiple( cacheName, keys, requesterId );
707 }
708 finally
709 {
710 logICacheEvent( cacheEvent );
711 }
712 }
713
714
715
716
717
718
719
720
721
722
723 private Map<K, ICacheElement<K, V>> processGetMultiple( String cacheName, Set<K> keys, long requesterId )
724 {
725 Map<K, ICacheElement<K, V>> elements = null;
726
727 boolean fromCluster = isRequestFromCluster( requesterId );
728
729 if ( log.isDebugEnabled() )
730 {
731 log.debug( "getMultiple [" + keys + "] from cache [" + cacheName + "] requesterId = [" + requesterId
732 + "] fromCluster = " + fromCluster );
733 }
734
735 CacheListeners<K, V> cacheDesc = null;
736 try
737 {
738 cacheDesc = getCacheListeners( cacheName );
739 }
740 catch ( Exception e )
741 {
742 log.error( "Problem getting listeners.", e );
743 }
744
745 elements = getMultipleFromCacheListeners( keys, elements, fromCluster, cacheDesc );
746 return elements;
747 }
748
749
750
751
752
753
754
755
756
757 private boolean isRequestFromCluster( long requesterId )
758 {
759 RemoteType remoteTypeL = idTypeMap.get( Long.valueOf( requesterId ) );
760 return remoteTypeL == RemoteType.CLUSTER;
761 }
762
763
764
765
766
767
768
769
770
771
772 private Map<K, ICacheElement<K, V>> getMultipleFromCacheListeners( Set<K> keys, Map<K, ICacheElement<K, V>> elements, boolean fromCluster, CacheListeners<K, V> cacheDesc )
773 {
774 Map<K, ICacheElement<K, V>> returnElements = elements;
775
776 if ( cacheDesc != null )
777 {
778 CompositeCache<K, V> c = (CompositeCache<K, V>) cacheDesc.cache;
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794 if ( !fromCluster && this.remoteCacheServerAttributes.isAllowClusterGet() )
795 {
796 if ( log.isDebugEnabled() )
797 {
798 log.debug( "NonLocalGetMultiple. fromCluster [" + fromCluster + "] AllowClusterGet ["
799 + this.remoteCacheServerAttributes.isAllowClusterGet() + "]" );
800 }
801
802 returnElements = c.getMultiple( keys );
803 }
804 else
805 {
806
807
808
809
810 if ( log.isDebugEnabled() )
811 {
812 log.debug( "LocalGetMultiple. fromCluster [" + fromCluster + "] AllowClusterGet ["
813 + this.remoteCacheServerAttributes.isAllowClusterGet() + "]" );
814 }
815
816 returnElements = c.localGetMultiple( keys );
817 }
818 }
819
820 return returnElements;
821 }
822
823
824
825
826
827
828
829
830 public Set<K> getGroupKeys( String cacheName, String group )
831 {
832 return processGetGroupKeys( cacheName, group );
833 }
834
835
836
837
838
839
840
841
842 protected Set<K> processGetGroupKeys( String cacheName, String group )
843 {
844 CacheListeners<K, V> cacheDesc = null;
845 try
846 {
847 cacheDesc = getCacheListeners( cacheName );
848 }
849 catch ( Exception e )
850 {
851 log.error( "Problem getting listeners.", e );
852 }
853
854 if ( cacheDesc == null )
855 {
856 return Collections.emptySet();
857 }
858
859 CompositeCache<K, V> c = (CompositeCache<K, V>) cacheDesc.cache;
860 return c.getGroupKeys( group );
861 }
862
863
864
865
866
867
868
869 public Set<String> getGroupNames(String cacheName)
870 {
871 return processGetGroupNames(cacheName);
872 }
873
874
875
876
877
878
879
880 protected Set<String> processGetGroupNames(String cacheName)
881 {
882 CacheListeners<K, V> cacheDesc = null;
883 try
884 {
885 cacheDesc = getCacheListeners(cacheName);
886 }
887 catch (Exception e)
888 {
889 log.error("Problem getting listeners.", e);
890 }
891
892 if (cacheDesc == null)
893 {
894 return Collections.emptySet();
895 }
896
897 CompositeCache<K, V> c = (CompositeCache<K, V>) cacheDesc.cache;
898 return c.getGroupNames();
899 }
900
901
902
903
904
905
906
907
908 public void remove( String cacheName, K key )
909 throws IOException
910 {
911 remove( cacheName, key, 0 );
912 }
913
914
915
916
917
918
919
920
921
922
923
924 public void remove( String cacheName, K key, long requesterId )
925 throws IOException
926 {
927 ICacheEvent<K> cacheEvent = createICacheEvent( cacheName, key, requesterId, ICacheEventLogger.REMOVE_EVENT );
928 try
929 {
930 processRemove( cacheName, key, requesterId );
931 }
932 finally
933 {
934 logICacheEvent( cacheEvent );
935 }
936 }
937
938
939
940
941
942
943
944
945
946 private void processRemove( String cacheName, K key, long requesterId )
947 throws IOException
948 {
949 if ( log.isDebugEnabled() )
950 {
951 log.debug( "remove [" + key + "] from cache [" + cacheName + "]" );
952 }
953
954 CacheListeners<K, V> cacheDesc = cacheListenersMap.get( cacheName );
955
956 boolean fromCluster = isRequestFromCluster( requesterId );
957
958 if ( cacheDesc != null )
959 {
960
961
962 synchronized ( cacheDesc )
963 {
964 boolean removeSuccess = false;
965
966
967 CompositeCache<K, V> c = (CompositeCache<K, V>) cacheDesc.cache;
968
969 if ( fromCluster )
970 {
971 if ( log.isDebugEnabled() )
972 {
973 log.debug( "Remove FROM cluster, NOT updating other auxiliaries for region" );
974 }
975 removeSuccess = c.localRemove( key );
976 }
977 else
978 {
979 if ( log.isDebugEnabled() )
980 {
981 log.debug( "Remove NOT from cluster, updating other auxiliaries for region" );
982 }
983 removeSuccess = c.remove( key );
984 }
985
986 if ( log.isDebugEnabled() )
987 {
988 log.debug( "remove [" + key + "] from cache [" + cacheName + "] success (was it found) = "
989 + removeSuccess );
990 }
991
992
993
994 if ( !fromCluster || ( fromCluster && remoteCacheServerAttributes.isLocalClusterConsistency() ) )
995 {
996 ICacheEventQueue<K, V>[] qlist = getEventQList( cacheDesc, requesterId );
997
998 for ( int i = 0; i < qlist.length; i++ )
999 {
1000 qlist[i].addRemoveEvent( key );
1001 }
1002 }
1003 }
1004 }
1005 }
1006
1007
1008
1009
1010
1011
1012
1013 public void removeAll( String cacheName )
1014 throws IOException
1015 {
1016 removeAll( cacheName, 0 );
1017 }
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028 public void removeAll( String cacheName, long requesterId )
1029 throws IOException
1030 {
1031 ICacheEvent<String> cacheEvent = createICacheEvent( cacheName, "all", requesterId, ICacheEventLogger.REMOVEALL_EVENT );
1032 try
1033 {
1034 processRemoveAll( cacheName, requesterId );
1035 }
1036 finally
1037 {
1038 logICacheEvent( cacheEvent );
1039 }
1040 }
1041
1042
1043
1044
1045
1046
1047
1048
1049 private void processRemoveAll( String cacheName, long requesterId )
1050 throws IOException
1051 {
1052 CacheListeners<K, V> cacheDesc = cacheListenersMap.get( cacheName );
1053
1054 boolean fromCluster = isRequestFromCluster( requesterId );
1055
1056 if ( cacheDesc != null )
1057 {
1058
1059
1060 synchronized ( cacheDesc )
1061 {
1062
1063 CompositeCache<K, V> c = (CompositeCache<K, V>) cacheDesc.cache;
1064
1065 if ( fromCluster )
1066 {
1067 if ( log.isDebugEnabled() )
1068 {
1069 log.debug( "RemoveALL FROM cluster, NOT updating other auxiliaries for region" );
1070 }
1071 c.localRemoveAll();
1072 }
1073 else
1074 {
1075 if ( log.isDebugEnabled() )
1076 {
1077 log.debug( "RemoveALL NOT from cluster, updating other auxiliaries for region" );
1078 }
1079 c.removeAll();
1080 }
1081
1082
1083 if ( !fromCluster || ( fromCluster && remoteCacheServerAttributes.isLocalClusterConsistency() ) )
1084 {
1085 ICacheEventQueue<K, V>[] qlist = getEventQList( cacheDesc, requesterId );
1086
1087 for ( int i = 0; i < qlist.length; i++ )
1088 {
1089 qlist[i].addRemoveAllEvent();
1090 }
1091 }
1092 }
1093 }
1094 }
1095
1096
1097
1098
1099
1100
1101 protected int getPutCount()
1102 {
1103 return puts;
1104 }
1105
1106
1107
1108
1109
1110
1111
1112 public void dispose( String cacheName )
1113 throws IOException
1114 {
1115 dispose( cacheName, 0 );
1116 }
1117
1118
1119
1120
1121
1122
1123
1124
1125 public void dispose( String cacheName, long requesterId )
1126 throws IOException
1127 {
1128 ICacheEvent<String> cacheEvent = createICacheEvent( cacheName, "none", requesterId, ICacheEventLogger.DISPOSE_EVENT );
1129 try
1130 {
1131 processDispose( cacheName, requesterId );
1132 }
1133 finally
1134 {
1135 logICacheEvent( cacheEvent );
1136 }
1137 }
1138
1139
1140
1141
1142
1143
1144 private void processDispose( String cacheName, long requesterId )
1145 throws IOException
1146 {
1147 if ( log.isInfoEnabled() )
1148 {
1149 log.info( "Dispose request received from listener [" + requesterId + "]" );
1150 }
1151
1152 CacheListeners<K, V> cacheDesc = cacheListenersMap.get( cacheName );
1153
1154
1155 if ( cacheDesc != null )
1156 {
1157
1158 synchronized ( cacheDesc )
1159 {
1160 ICacheEventQueue<K, V>[] qlist = getEventQList( cacheDesc, requesterId );
1161
1162 for ( int i = 0; i < qlist.length; i++ )
1163 {
1164 qlist[i].addDisposeEvent();
1165 }
1166 cacheManager.freeCache( cacheName );
1167 }
1168 }
1169 }
1170
1171
1172
1173
1174
1175
1176 public void release()
1177 throws IOException
1178 {
1179 synchronized ( cacheListenersMap )
1180 {
1181 for (CacheListeners<K, V> cacheDesc : cacheListenersMap.values())
1182 {
1183 ICacheEventQueue<K, V>[] qlist = getEventQList( cacheDesc, 0 );
1184
1185 for ( int i = 0; i < qlist.length; i++ )
1186 {
1187 qlist[i].addDisposeEvent();
1188 }
1189 }
1190 cacheManager.release();
1191 }
1192 }
1193
1194
1195
1196
1197
1198
1199
1200
1201 protected CacheListeners<K, V> getCacheListeners( String cacheName )
1202 {
1203 CacheListeners<K, V> cacheListeners = cacheListenersMap.get( cacheName );
1204 synchronized ( cacheListenersMap )
1205 {
1206 if ( cacheListeners == null )
1207 {
1208 cacheListeners = cacheListenersMap.get( cacheName );
1209 if ( cacheListeners == null )
1210 {
1211 CompositeCache<K, V> cache = cacheManager.getCache( cacheName );
1212 cacheListeners = new CacheListeners<K, V>( cache );
1213 cacheListenersMap.put( cacheName, cacheListeners );
1214 }
1215 }
1216 }
1217 return cacheListeners;
1218 }
1219
1220
1221
1222
1223
1224
1225
1226
1227 protected CacheListeners<K, V> getClusterListeners( String cacheName )
1228 {
1229 CacheListeners<K, V> cacheListeners = clusterListenersMap.get( cacheName );
1230 synchronized ( clusterListenersMap )
1231 {
1232 if ( cacheListeners == null )
1233 {
1234 cacheListeners = clusterListenersMap.get( cacheName );
1235 if ( cacheListeners == null )
1236 {
1237 CompositeCache<K, V> cache = cacheManager.getCache( cacheName );
1238 cacheListeners = new CacheListeners<K, V>( cache );
1239 clusterListenersMap.put( cacheName, cacheListeners );
1240 }
1241 }
1242 }
1243 return cacheListeners;
1244 }
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258 @SuppressWarnings("unchecked")
1259 private ICacheEventQueue<K, V>[] getEventQList( CacheListeners<K, V> cacheListeners, long requesterId )
1260 {
1261 ICacheEventQueue<K, V>[] list = null;
1262 synchronized ( cacheListeners.eventQMap )
1263 {
1264 list = cacheListeners.eventQMap.values().toArray( new ICacheEventQueue[0] );
1265 }
1266 int count = 0;
1267
1268 for ( int i = 0; i < list.length; i++ )
1269 {
1270 ICacheEventQueue<K, V> q = list[i];
1271 if ( q.isWorking() && q.getListenerId() != requesterId )
1272 {
1273 count++;
1274 }
1275 else
1276 {
1277 list[i] = null;
1278 }
1279 }
1280 if ( count == list.length )
1281 {
1282
1283 return list;
1284 }
1285
1286
1287 ICacheEventQueue<K, V>[] qq = new ICacheEventQueue[count];
1288 count = 0;
1289 for ( int i = 0; i < list.length; i++ )
1290 {
1291 if ( list[i] != null )
1292 {
1293 qq[count++] = list[i];
1294 }
1295 }
1296 return qq;
1297 }
1298
1299
1300
1301
1302
1303
1304 private static <KK extends Serializable, VV extends Serializable> void cleanupEventQMap( Map<Long, ICacheEventQueue<KK, VV>> eventQMap )
1305 {
1306 synchronized ( eventQMap )
1307 {
1308 for (Iterator<Map.Entry<Long, ICacheEventQueue<KK, VV>>> itr = eventQMap.entrySet().iterator(); itr.hasNext(); )
1309 {
1310 Map.Entry<Long, ICacheEventQueue<KK, VV>> e = itr.next();
1311 ICacheEventQueue<KK, VV> q = e.getValue();
1312
1313
1314
1315
1316
1317 if ( !q.isWorking() )
1318 {
1319 itr.remove();
1320 log.warn( "Cache event queue " + q + " is not working and removed from cache server." );
1321 }
1322 }
1323 }
1324 }
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337 @SuppressWarnings("unchecked")
1338 public <KK extends Serializable, VV extends Serializable> void addCacheListener( String cacheName, ICacheListener<KK, VV> listener )
1339 throws IOException
1340 {
1341 if ( cacheName == null || listener == null )
1342 {
1343 throw new IllegalArgumentException( "cacheName and listener must not be null" );
1344 }
1345 CacheListeners<KK, VV> cacheListeners;
1346
1347 IRemoteCacheListener<KK, VV> ircl = (IRemoteCacheListener<KK, VV>) listener;
1348
1349 String listenerAddress = ircl.getLocalHostAddress();
1350
1351 RemoteType remoteType = ircl.getRemoteType();
1352 if ( remoteType == RemoteType.CLUSTER )
1353 {
1354 log.debug( "adding cluster listener, listenerAddress [" + listenerAddress + "]" );
1355 cacheListeners = (CacheListeners<KK, VV>)getClusterListeners( cacheName );
1356 }
1357 else
1358 {
1359 log.debug( "adding normal listener, listenerAddress [" + listenerAddress + "]" );
1360 cacheListeners = (CacheListeners<KK, VV>)getCacheListeners( cacheName );
1361 }
1362 Map<Long, ICacheEventQueue<KK, VV>> eventQMap = cacheListeners.eventQMap;
1363 cleanupEventQMap( eventQMap );
1364
1365
1366 synchronized ( ICacheListener.class )
1367 {
1368 long id = 0;
1369 try
1370 {
1371 id = listener.getListenerId();
1372
1373 if ( id == 0 )
1374 {
1375
1376 long listenerIdB = nextListenerId();
1377 if ( log.isDebugEnabled() )
1378 {
1379 log.debug( "listener id=" + ( listenerIdB & 0xff ) + " addded for cache [" + cacheName
1380 + "], listenerAddress [" + listenerAddress + "]" );
1381 }
1382 listener.setListenerId( listenerIdB );
1383 id = listenerIdB;
1384
1385
1386 String message = "Adding vm listener under new id = [" + listenerIdB + "], listenerAddress ["
1387 + listenerAddress + "]";
1388 logApplicationEvent( "RemoteCacheServer", "addCacheListener", message );
1389 if ( log.isInfoEnabled() )
1390 {
1391 log.info( message );
1392 }
1393 }
1394 else
1395 {
1396 String message = "Adding listener under existing id = [" + id + "], listenerAddress ["
1397 + listenerAddress + "]";
1398 logApplicationEvent( "RemoteCacheServer", "addCacheListener", message );
1399 if ( log.isInfoEnabled() )
1400 {
1401 log.info( message );
1402 }
1403
1404
1405 }
1406
1407
1408 this.idTypeMap.put( Long.valueOf( id ), remoteType);
1409 if ( listenerAddress != null )
1410 {
1411 this.idIPMap.put( Long.valueOf( id ), listenerAddress );
1412 }
1413 }
1414 catch ( IOException ioe )
1415 {
1416 String message = "Problem setting listener id, listenerAddress [" + listenerAddress + "]";
1417 log.error( message, ioe );
1418
1419 if ( cacheEventLogger != null )
1420 {
1421 cacheEventLogger.logError( "RemoteCacheServer", "addCacheListener", message + " - "
1422 + ioe.getMessage() );
1423 }
1424 }
1425
1426 CacheEventQueueFactory<KK, VV> fact = new CacheEventQueueFactory<KK, VV>();
1427 ICacheEventQueue<KK, VV> q = fact.createCacheEventQueue( listener, id, cacheName, remoteCacheServerAttributes
1428 .getEventQueuePoolName(), remoteCacheServerAttributes.getEventQueueType() );
1429
1430 eventQMap.put(Long.valueOf(listener.getListenerId()), q);
1431
1432 if ( log.isInfoEnabled() )
1433 {
1434 log.info( cacheListeners );
1435 }
1436 }
1437 }
1438
1439
1440
1441
1442
1443
1444
1445 public <KK extends Serializable, VV extends Serializable> void addCacheListener( ICacheListener<KK, VV> listener )
1446 throws IOException
1447 {
1448 for (String cacheName : cacheListenersMap.keySet())
1449 {
1450 addCacheListener( cacheName, listener );
1451
1452 if ( log.isDebugEnabled() )
1453 {
1454 log.debug( "Adding listener for cache [" + cacheName + "]" );
1455 }
1456 }
1457 }
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467 public <KK extends Serializable, VV extends Serializable> void removeCacheListener( String cacheName, ICacheListener<KK, VV> listener )
1468 throws IOException
1469 {
1470 removeCacheListener( cacheName, listener.getListenerId() );
1471 }
1472
1473
1474
1475
1476
1477
1478
1479
1480 public void removeCacheListener( String cacheName, long listenerId )
1481 {
1482 String message = "Removing listener for cache region = [" + cacheName + "] and listenerId [" + listenerId + "]";
1483 logApplicationEvent( "RemoteCacheServer", "removeCacheListener", message );
1484 if ( log.isInfoEnabled() )
1485 {
1486 log.info( message );
1487 }
1488
1489 boolean isClusterListener = isRequestFromCluster( listenerId );
1490
1491 CacheListeners<K, V> cacheDesc = null;
1492
1493 if ( isClusterListener )
1494 {
1495 cacheDesc = getClusterListeners( cacheName );
1496 }
1497 else
1498 {
1499 cacheDesc = getCacheListeners( cacheName );
1500 }
1501 Map<Long, ICacheEventQueue<K, V>> eventQMap = cacheDesc.eventQMap;
1502 cleanupEventQMap( eventQMap );
1503 ICacheEventQueue<K, V> q = eventQMap.remove( Long.valueOf( listenerId ) );
1504
1505 if ( q != null )
1506 {
1507 if ( log.isDebugEnabled() )
1508 {
1509 log.debug( "Found queue for cache region = [" + cacheName + "] and listenerId [" + listenerId + "]" );
1510 }
1511 q.destroy();
1512 cleanupEventQMap( eventQMap );
1513 }
1514 else
1515 {
1516 if ( log.isDebugEnabled() )
1517 {
1518 log.debug( "Did not find queue for cache region = [" + cacheName + "] and listenerId [" + listenerId
1519 + "]" );
1520 }
1521 }
1522
1523
1524 idTypeMap.remove( Long.valueOf( listenerId ) );
1525 idIPMap.remove( Long.valueOf( listenerId ) );
1526
1527 if ( log.isInfoEnabled() )
1528 {
1529 log.info( "After removing listener [" + listenerId + "] cache region " + cacheName + "'s listener size ["
1530 + cacheDesc.eventQMap.size() + "]" );
1531 }
1532 }
1533
1534
1535
1536
1537
1538
1539
1540 public <KK extends Serializable, VV extends Serializable> void removeCacheListener( ICacheListener<KK, VV> listener )
1541 throws IOException
1542 {
1543 for (String cacheName : cacheListenersMap.keySet())
1544 {
1545 removeCacheListener( cacheName, listener );
1546
1547 if ( log.isInfoEnabled() )
1548 {
1549 log.info( "Removing listener for cache [" + cacheName + "]" );
1550 }
1551 }
1552 return;
1553 }
1554
1555
1556
1557
1558
1559
1560 public void shutdown()
1561 throws IOException
1562 {
1563 RemoteCacheServerFactory.shutdownImpl( "", Registry.REGISTRY_PORT );
1564 }
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574 public void shutdown( String host, int port )
1575 throws IOException
1576 {
1577 if ( log.isInfoEnabled() )
1578 {
1579 log.info( "Received shutdown request. Shutting down server." );
1580 }
1581 RemoteCacheServerFactory.shutdownImpl( host, port );
1582 this.cacheManager.shutDown();
1583 }
1584
1585
1586
1587
1588
1589
1590 public void unreferenced()
1591 {
1592 if ( log.isInfoEnabled() )
1593 {
1594 log.info( "*** Server now unreferenced and subject to GC. ***" );
1595 }
1596 }
1597
1598
1599
1600
1601
1602
1603 private long nextListenerId()
1604 {
1605 long id = 0;
1606 if ( listenerId[0] == Long.MAX_VALUE )
1607 {
1608 synchronized ( listenerId )
1609 {
1610 id = listenerId[0];
1611 listenerId[0] = 0;
1612
1613
1614
1615
1616
1617 }
1618 }
1619 else
1620 {
1621 synchronized ( listenerId )
1622 {
1623 id = ++listenerId[0];
1624 }
1625 }
1626 return id;
1627 }
1628
1629
1630
1631
1632
1633
1634
1635 public String getStats()
1636 throws IOException
1637 {
1638 return cacheManager.getStats();
1639 }
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649 private ICacheEvent<ICacheElement<K, V>> createICacheEvent( ICacheElement<K, V> item, long requesterId, String eventName )
1650 {
1651 if ( cacheEventLogger == null )
1652 {
1653 return new CacheEvent<ICacheElement<K, V>>();
1654 }
1655 String ipAddress = getExtraInfoForRequesterId( requesterId );
1656 return cacheEventLogger
1657 .createICacheEvent( "RemoteCacheServer", item.getCacheName(), eventName, ipAddress, item );
1658 }
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669 private <T extends Serializable> ICacheEvent<T> createICacheEvent( String cacheName, T key, long requesterId, String eventName )
1670 {
1671 if ( cacheEventLogger == null )
1672 {
1673 return new CacheEvent<T>();
1674 }
1675 String ipAddress = getExtraInfoForRequesterId( requesterId );
1676 return cacheEventLogger.createICacheEvent( "RemoteCacheServer", cacheName, eventName, ipAddress, key );
1677 }
1678
1679
1680
1681
1682
1683
1684
1685
1686 protected void logApplicationEvent( String source, String eventName, String optionalDetails )
1687 {
1688 if ( cacheEventLogger != null )
1689 {
1690 cacheEventLogger.logApplicationEvent( source, eventName, optionalDetails );
1691 }
1692 }
1693
1694
1695
1696
1697
1698
1699 protected <T extends Serializable> void logICacheEvent( ICacheEvent<T> cacheEvent )
1700 {
1701 if ( cacheEventLogger != null )
1702 {
1703 cacheEventLogger.logICacheEvent( cacheEvent );
1704 }
1705 }
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715 protected String getExtraInfoForRequesterId( long requesterId )
1716 {
1717 String ipAddress = idIPMap.get( Long.valueOf( requesterId ) );
1718 return ipAddress;
1719 }
1720
1721
1722
1723
1724
1725
1726 public void setCacheEventLogger( ICacheEventLogger cacheEventLogger )
1727 {
1728 this.cacheEventLogger = cacheEventLogger;
1729 }
1730 }