1 package org.apache.jcs.auxiliary.remote.server;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 import java.io.IOException;
19 import java.io.Serializable;
20 import java.rmi.RemoteException;
21 import java.rmi.registry.Registry;
22 import java.rmi.server.RMISocketFactory;
23 import java.rmi.server.UnicastRemoteObject;
24 import java.rmi.server.Unreferenced;
25 import java.util.Collections;
26 import java.util.Hashtable;
27 import java.util.Iterator;
28 import java.util.Map;
29 import java.util.Set;
30
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.apache.jcs.access.exception.CacheException;
34 import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheAttributes;
35 import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheListener;
36 import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheObserver;
37 import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheService;
38 import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheServiceAdmin;
39 import org.apache.jcs.auxiliary.remote.server.behavior.IRemoteCacheServerAttributes;
40 import org.apache.jcs.engine.CacheEventQueueFactory;
41 import org.apache.jcs.engine.CacheListeners;
42 import org.apache.jcs.engine.behavior.ICacheElement;
43 import org.apache.jcs.engine.behavior.ICacheEventQueue;
44 import org.apache.jcs.engine.behavior.ICacheListener;
45 import org.apache.jcs.engine.control.CompositeCache;
46 import org.apache.jcs.engine.control.CompositeCacheManager;
47 import org.apache.jcs.engine.logging.CacheEvent;
48 import org.apache.jcs.engine.logging.behavior.ICacheEvent;
49 import org.apache.jcs.engine.logging.behavior.ICacheEventLogger;
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66 public class RemoteCacheServer
67 extends UnicastRemoteObject
68 implements IRemoteCacheService, IRemoteCacheObserver, IRemoteCacheServiceAdmin, Unreferenced
69 {
70
71 private static final long serialVersionUID = -8072345435941473116L;
72
73
74 private final static Log log = LogFactory.getLog( RemoteCacheServer.class );
75
76
77 protected final static boolean timing = true;
78
79
80 private int puts = 0;
81
82
83 private final Hashtable<String, CacheListeners> cacheListenersMap =
84 new Hashtable<String, CacheListeners>();
85
86
87 private final Hashtable<String, CacheListeners> clusterListenersMap =
88 new Hashtable<String, CacheListeners>();
89
90
91 private CompositeCacheManager cacheManager;
92
93
94 private final Hashtable<Long, Integer> idTypeMap = new Hashtable<Long, Integer>();
95
96
97 private final Hashtable<Long, String> idIPMap = new Hashtable<Long, String>();
98
99
100 private final int[] listenerId = new int[1];
101
102
103 protected IRemoteCacheServerAttributes remoteCacheServerAttributes;
104
105
106 private final int logInterval = 100;
107
108
109 private transient ICacheEventLogger cacheEventLogger;
110
111
112 private static final ICacheEvent EMPTY_ICACHE_EVENT = new CacheEvent();
113
114
115
116
117
118
119
120
121 protected RemoteCacheServer( IRemoteCacheServerAttributes rcsa )
122 throws RemoteException
123 {
124 super( rcsa.getServicePort() );
125 this.remoteCacheServerAttributes = rcsa;
126 init( rcsa.getConfigFileName() );
127 }
128
129
130
131
132
133
134
135
136
137 protected RemoteCacheServer( IRemoteCacheServerAttributes rcsa, RMISocketFactory customRMISocketFactory )
138 throws RemoteException
139 {
140 super( rcsa.getServicePort(), customRMISocketFactory, customRMISocketFactory );
141 this.remoteCacheServerAttributes = rcsa;
142 init( rcsa.getConfigFileName() );
143 }
144
145
146
147
148
149
150
151 private void init( String prop ) throws RemoteException
152 {
153 try
154 {
155 cacheManager = createCacheManager( prop );
156 }
157 catch (CacheException e)
158 {
159 throw new RemoteException(e.getMessage(), e);
160 }
161
162
163
164 String[] list = cacheManager.getCacheNames();
165 for ( int i = 0; i < list.length; i++ )
166 {
167 String name = list[i];
168 cacheListenersMap.put( name, new CacheListeners( cacheManager.getCache( name ) ) );
169 }
170 }
171
172
173
174
175
176
177
178
179
180 private CompositeCacheManager createCacheManager( String prop ) throws CacheException
181 {
182 CompositeCacheManager hub = CompositeCacheManager.getUnconfiguredInstance();
183
184 if ( prop == null )
185 {
186 hub.configure( "/remote.cache.ccf" );
187 }
188 else
189 {
190 hub.configure( prop );
191 }
192 return hub;
193 }
194
195
196
197
198
199
200
201
202
203
204
205 public void put( ICacheElement item )
206 throws IOException
207 {
208 update( item );
209 }
210
211
212
213
214
215 public void update( ICacheElement item )
216 throws IOException
217 {
218 update( item, 0 );
219 }
220
221
222
223
224
225
226
227
228 public void update( ICacheElement item, long requesterId )
229 throws IOException
230 {
231 ICacheEvent cacheEvent = createICacheEvent( item, requesterId, ICacheEventLogger.UPDATE_EVENT );
232 try
233 {
234 processUpdate( item, requesterId );
235 }
236 finally
237 {
238 logICacheEvent( cacheEvent );
239 }
240 }
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265 private void processUpdate( ICacheElement item, long requesterId )
266 {
267 long start = 0;
268 if ( timing )
269 {
270 start = System.currentTimeMillis();
271 }
272
273 logUpdateInfo( item );
274
275 try
276 {
277 CacheListeners cacheDesc = getCacheListeners( item.getCacheName() );
278
279
280 boolean fromCluster = isRequestFromCluster( requesterId );
281
282 if ( log.isDebugEnabled() )
283 {
284 log.debug( "In update, requesterId = [" + requesterId + "] fromCluster = " + fromCluster );
285 }
286
287
288 synchronized ( cacheDesc )
289 {
290 try
291 {
292 CompositeCache c = (CompositeCache) cacheDesc.cache;
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308 if ( fromCluster )
309 {
310 if ( log.isDebugEnabled() )
311 {
312 log.debug( "Put FROM cluster, NOT updating other auxiliaries for region. "
313 + " requesterId [" + requesterId + "]" );
314 }
315 c.localUpdate( item );
316 }
317 else
318 {
319 if ( log.isDebugEnabled() )
320 {
321 log.debug( "Put NOT from cluster, updating other auxiliaries for region. "
322 + " requesterId [" + requesterId + "]" );
323 }
324 c.update( item );
325 }
326 }
327 catch ( Exception ce )
328 {
329
330 if ( log.isInfoEnabled() )
331 {
332 log.info( "Exception caught updating item. requesterId [" + requesterId + "] "
333 + ce.getMessage() );
334 }
335 }
336
337
338
339 if ( !fromCluster || ( fromCluster && remoteCacheServerAttributes.getLocalClusterConsistency() ) )
340 {
341 ICacheEventQueue[] qlist = getEventQList( cacheDesc, requesterId );
342
343 if ( qlist != null )
344 {
345 if ( log.isDebugEnabled() )
346 {
347 log.debug( "qlist.length = " + qlist.length );
348 }
349 for ( int i = 0; i < qlist.length; i++ )
350 {
351 qlist[i].addPutEvent( item );
352 }
353 }
354 else
355 {
356 if ( log.isDebugEnabled() )
357 {
358 log.debug( "q list is null" );
359 }
360 }
361 }
362 }
363 }
364 catch ( IOException e )
365 {
366 if ( cacheEventLogger != null )
367 {
368 cacheEventLogger.logError( "RemoteCacheServer", ICacheEventLogger.UPDATE_EVENT, e.getMessage()
369 + " REGION: " + item.getCacheName() + " ITEM: " + item );
370 }
371
372 log.error( "Trouble in Update. requesterId [" + requesterId + "]", e );
373 }
374
375
376 if ( timing )
377 {
378 long end = System.currentTimeMillis();
379 if ( log.isDebugEnabled() )
380 {
381 log.debug( "put took " + String.valueOf( end - start ) + " ms." );
382 }
383 }
384 }
385
386
387
388
389
390
391 private void logUpdateInfo( ICacheElement item )
392 {
393 if ( log.isInfoEnabled() )
394 {
395
396 puts++;
397 if ( puts % logInterval == 0 )
398 {
399 log.info( "puts = " + puts );
400 }
401 }
402
403 if ( log.isDebugEnabled() )
404 {
405 log.debug( "In update, put [" + item.getKey() + "] in [" + item.getCacheName() + "]" );
406 }
407 }
408
409
410
411
412
413
414
415
416
417
418 public ICacheElement get( String cacheName, Serializable key )
419 throws IOException
420 {
421 return this.get( cacheName, key, 0 );
422 }
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437 public ICacheElement get( String cacheName, Serializable key, long requesterId )
438 throws IOException
439 {
440 ICacheElement element = null;
441 ICacheEvent cacheEvent = createICacheEvent( cacheName, key, requesterId, ICacheEventLogger.GET_EVENT );
442 try
443 {
444 element = processGet( cacheName, key, requesterId );
445 }
446 finally
447 {
448 logICacheEvent( cacheEvent );
449 }
450 return element;
451 }
452
453
454
455
456
457
458
459
460
461
462
463 private ICacheElement processGet( String cacheName, Serializable key, long requesterId )
464 {
465 boolean fromCluster = isRequestFromCluster( requesterId );
466
467 if ( log.isDebugEnabled() )
468 {
469 log.debug( "get [" + key + "] from cache [" + cacheName + "] requesterId = [" + requesterId
470 + "] fromCluster = " + fromCluster );
471 }
472
473 CacheListeners cacheDesc = null;
474 try
475 {
476 cacheDesc = getCacheListeners( cacheName );
477 }
478 catch ( Exception e )
479 {
480 log.error( "Problem getting listeners.", e );
481
482 if ( cacheEventLogger != null )
483 {
484 cacheEventLogger.logError( "RemoteCacheServer", ICacheEventLogger.GET_EVENT, e.getMessage() + cacheName
485 + " KEY: " + key );
486 }
487 }
488
489 ICacheElement element = null;
490
491 element = getFromCacheListeners( key, fromCluster, cacheDesc, element );
492 return element;
493 }
494
495
496
497
498
499
500
501
502
503
504 private ICacheElement getFromCacheListeners( Serializable key, boolean fromCluster, CacheListeners cacheDesc,
505 ICacheElement element )
506 {
507 if ( cacheDesc != null )
508 {
509 CompositeCache c = (CompositeCache) cacheDesc.cache;
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525 if ( !fromCluster && this.remoteCacheServerAttributes.getAllowClusterGet() )
526 {
527 if ( log.isDebugEnabled() )
528 {
529 log.debug( "NonLocalGet. fromCluster [" + fromCluster + "] AllowClusterGet ["
530 + this.remoteCacheServerAttributes.getAllowClusterGet() + "]" );
531 }
532 element = c.get( key );
533 }
534 else
535 {
536
537
538
539
540 if ( log.isDebugEnabled() )
541 {
542 log.debug( "LocalGet. fromCluster [" + fromCluster + "] AllowClusterGet ["
543 + this.remoteCacheServerAttributes.getAllowClusterGet() + "]" );
544 }
545 element = c.localGet( key );
546 }
547 }
548 return element;
549 }
550
551
552
553
554
555
556
557
558
559 public Map<Serializable, ICacheElement> getMatching( String cacheName, String pattern )
560 throws IOException
561 {
562 return getMatching( cacheName, pattern, 0 );
563 }
564
565
566
567
568
569
570
571
572
573
574 public Map<Serializable, ICacheElement> getMatching( String cacheName, String pattern, long requesterId )
575 throws IOException
576 {
577 ICacheEvent cacheEvent = createICacheEvent( cacheName, pattern, requesterId,
578 ICacheEventLogger.GETMATCHING_EVENT );
579 try
580 {
581 return processGetMatching( cacheName, pattern, requesterId );
582 }
583 finally
584 {
585 logICacheEvent( cacheEvent );
586 }
587 }
588
589
590
591
592
593
594
595
596
597 protected Map<Serializable, ICacheElement> processGetMatching( String cacheName, String pattern, long requesterId )
598 {
599 boolean fromCluster = isRequestFromCluster( requesterId );
600
601 if ( log.isDebugEnabled() )
602 {
603 log.debug( "getMatching [" + pattern + "] from cache [" + cacheName + "] requesterId = [" + requesterId
604 + "] fromCluster = " + fromCluster );
605 }
606
607 CacheListeners cacheDesc = null;
608 try
609 {
610 cacheDesc = getCacheListeners( cacheName );
611 }
612 catch ( Exception e )
613 {
614 log.error( "Problem getting listeners.", e );
615
616 if ( cacheEventLogger != null )
617 {
618 cacheEventLogger.logError( "RemoteCacheServer", ICacheEventLogger.GETMATCHING_EVENT, e.getMessage()
619 + cacheName + " pattern: " + pattern );
620 }
621 }
622
623 return getMatchingFromCacheListeners( pattern, fromCluster, cacheDesc );
624 }
625
626
627
628
629
630
631
632
633
634 private Map<Serializable, ICacheElement> getMatchingFromCacheListeners( String pattern, boolean fromCluster, CacheListeners cacheDesc )
635 {
636 Map<Serializable, ICacheElement> elements = null;
637 if ( cacheDesc != null )
638 {
639 CompositeCache c = (CompositeCache) cacheDesc.cache;
640
641
642
643
644
645 if ( !fromCluster && this.remoteCacheServerAttributes.getAllowClusterGet() )
646 {
647 if ( log.isDebugEnabled() )
648 {
649 log.debug( "NonLocalGetMatching. fromCluster [" + fromCluster + "] AllowClusterGet ["
650 + this.remoteCacheServerAttributes.getAllowClusterGet() + "]" );
651 }
652 elements = c.getMatching( pattern );
653 }
654 else
655 {
656
657
658
659
660 if ( log.isDebugEnabled() )
661 {
662 log.debug( "LocalGetMatching. fromCluster [" + fromCluster + "] AllowClusterGet ["
663 + this.remoteCacheServerAttributes.getAllowClusterGet() + "]" );
664 }
665 elements = c.localGetMatching( pattern );
666 }
667 }
668 return elements;
669 }
670
671
672
673
674
675
676
677
678
679
680 public Map<Serializable, ICacheElement> getMultiple( String cacheName, Set<Serializable> keys )
681 throws IOException
682 {
683 return this.getMultiple( cacheName, keys, 0 );
684 }
685
686
687
688
689
690
691
692
693
694
695
696
697
698 public Map<Serializable, ICacheElement> getMultiple( String cacheName, Set<Serializable> keys, long requesterId )
699 throws IOException
700 {
701 ICacheEvent cacheEvent = createICacheEvent( cacheName, (Serializable) keys, requesterId,
702 ICacheEventLogger.GETMULTIPLE_EVENT );
703 try
704 {
705 return processGetMultiple( cacheName, keys, requesterId );
706 }
707 finally
708 {
709 logICacheEvent( cacheEvent );
710 }
711 }
712
713
714
715
716
717
718
719
720
721
722 private Map<Serializable, ICacheElement> processGetMultiple( String cacheName, Set<Serializable> keys, long requesterId )
723 {
724 Map<Serializable, ICacheElement> elements = null;
725
726 boolean fromCluster = isRequestFromCluster( requesterId );
727
728 if ( log.isDebugEnabled() )
729 {
730 log.debug( "getMultiple [" + keys + "] from cache [" + cacheName + "] requesterId = [" + requesterId
731 + "] fromCluster = " + fromCluster );
732 }
733
734 CacheListeners cacheDesc = null;
735 try
736 {
737 cacheDesc = getCacheListeners( cacheName );
738 }
739 catch ( Exception e )
740 {
741 log.error( "Problem getting listeners.", e );
742 }
743
744 elements = getMultipleFromCacheListeners( keys, elements, fromCluster, cacheDesc );
745 return elements;
746 }
747
748
749
750
751
752
753
754
755
756 private boolean isRequestFromCluster( long requesterId )
757 {
758 Integer remoteTypeL = idTypeMap.get( Long.valueOf( requesterId ) );
759
760 boolean fromCluster = false;
761 if ( remoteTypeL != null && remoteTypeL.intValue() == IRemoteCacheAttributes.CLUSTER )
762 {
763 fromCluster = true;
764 }
765 return fromCluster;
766 }
767
768
769
770
771
772
773
774
775
776
777 private Map<Serializable, ICacheElement> getMultipleFromCacheListeners( Set<Serializable> keys, Map<Serializable, ICacheElement> elements, boolean fromCluster, CacheListeners cacheDesc )
778 {
779 if ( cacheDesc != null )
780 {
781 CompositeCache c = (CompositeCache) cacheDesc.cache;
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797 if ( !fromCluster && this.remoteCacheServerAttributes.getAllowClusterGet() )
798 {
799 if ( log.isDebugEnabled() )
800 {
801 log.debug( "NonLocalGetMultiple. fromCluster [" + fromCluster + "] AllowClusterGet ["
802 + this.remoteCacheServerAttributes.getAllowClusterGet() + "]" );
803 }
804
805 elements = c.getMultiple( keys );
806 }
807 else
808 {
809
810
811
812
813 if ( log.isDebugEnabled() )
814 {
815 log.debug( "LocalGetMultiple. fromCluster [" + fromCluster + "] AllowClusterGet ["
816 + this.remoteCacheServerAttributes.getAllowClusterGet() + "]" );
817 }
818
819 elements = c.localGetMultiple( keys );
820 }
821 }
822 return elements;
823 }
824
825
826
827
828
829
830
831
832 public Set<Serializable> getGroupKeys( String cacheName, String group )
833 {
834 return processGetGroupKeys( cacheName, group );
835 }
836
837
838
839
840
841
842
843
844 protected Set<Serializable> processGetGroupKeys( String cacheName, String group )
845 {
846 CacheListeners cacheDesc = null;
847 try
848 {
849 cacheDesc = getCacheListeners( cacheName );
850 }
851 catch ( Exception e )
852 {
853 log.error( "Problem getting listeners.", e );
854 }
855
856 if ( cacheDesc == null )
857 {
858 return Collections.emptySet();
859 }
860
861 CompositeCache c = (CompositeCache) cacheDesc.cache;
862 return c.getGroupKeys( group );
863 }
864
865
866
867
868
869
870
871
872 public void remove( String cacheName, Serializable key )
873 throws IOException
874 {
875 remove( cacheName, key, 0 );
876 }
877
878
879
880
881
882
883
884
885
886
887
888 public void remove( String cacheName, Serializable key, long requesterId )
889 throws IOException
890 {
891 ICacheEvent cacheEvent = createICacheEvent( cacheName, key, requesterId, ICacheEventLogger.REMOVE_EVENT );
892 try
893 {
894 processRemove( cacheName, key, requesterId );
895 }
896 finally
897 {
898 logICacheEvent( cacheEvent );
899 }
900 }
901
902
903
904
905
906
907
908
909
910 private void processRemove( String cacheName, Serializable key, long requesterId )
911 throws IOException
912 {
913 if ( log.isDebugEnabled() )
914 {
915 log.debug( "remove [" + key + "] from cache [" + cacheName + "]" );
916 }
917
918 CacheListeners cacheDesc = cacheListenersMap.get( cacheName );
919
920 boolean fromCluster = isRequestFromCluster( requesterId );
921
922 if ( cacheDesc != null )
923 {
924
925
926 synchronized ( cacheDesc )
927 {
928 boolean removeSuccess = false;
929
930
931 CompositeCache c = (CompositeCache) cacheDesc.cache;
932
933 if ( fromCluster )
934 {
935 if ( log.isDebugEnabled() )
936 {
937 log.debug( "Remove FROM cluster, NOT updating other auxiliaries for region" );
938 }
939 removeSuccess = c.localRemove( key );
940 }
941 else
942 {
943 if ( log.isDebugEnabled() )
944 {
945 log.debug( "Remove NOT from cluster, updating other auxiliaries for region" );
946 }
947 removeSuccess = c.remove( key );
948 }
949
950 if ( log.isDebugEnabled() )
951 {
952 log.debug( "remove [" + key + "] from cache [" + cacheName + "] success (was it found) = "
953 + removeSuccess );
954 }
955
956
957
958 if ( !fromCluster || ( fromCluster && remoteCacheServerAttributes.getLocalClusterConsistency() ) )
959 {
960 ICacheEventQueue[] qlist = getEventQList( cacheDesc, requesterId );
961
962 for ( int i = 0; i < qlist.length; i++ )
963 {
964 qlist[i].addRemoveEvent( key );
965 }
966 }
967 }
968 }
969 }
970
971
972
973
974
975
976
977 public void removeAll( String cacheName )
978 throws IOException
979 {
980 removeAll( cacheName, 0 );
981 }
982
983
984
985
986
987
988
989
990
991
992 public void removeAll( String cacheName, long requesterId )
993 throws IOException
994 {
995 ICacheEvent cacheEvent = createICacheEvent( cacheName, "all", requesterId, ICacheEventLogger.REMOVEALL_EVENT );
996 try
997 {
998 processRemoveAll( cacheName, requesterId );
999 }
1000 finally
1001 {
1002 logICacheEvent( cacheEvent );
1003 }
1004 }
1005
1006
1007
1008
1009
1010
1011
1012
1013 private void processRemoveAll( String cacheName, long requesterId )
1014 throws IOException
1015 {
1016 CacheListeners cacheDesc = cacheListenersMap.get( cacheName );
1017
1018 boolean fromCluster = isRequestFromCluster( requesterId );
1019
1020 if ( cacheDesc != null )
1021 {
1022
1023
1024 synchronized ( cacheDesc )
1025 {
1026
1027 CompositeCache c = (CompositeCache) cacheDesc.cache;
1028
1029 if ( fromCluster )
1030 {
1031 if ( log.isDebugEnabled() )
1032 {
1033 log.debug( "RemoveALL FROM cluster, NOT updating other auxiliaries for region" );
1034 }
1035 c.localRemoveAll();
1036 }
1037 else
1038 {
1039 if ( log.isDebugEnabled() )
1040 {
1041 log.debug( "RemoveALL NOT from cluster, updating other auxiliaries for region" );
1042 }
1043 c.removeAll();
1044 }
1045
1046
1047 if ( !fromCluster || ( fromCluster && remoteCacheServerAttributes.getLocalClusterConsistency() ) )
1048 {
1049 ICacheEventQueue[] qlist = getEventQList( cacheDesc, requesterId );
1050
1051 for ( int i = 0; i < qlist.length; i++ )
1052 {
1053 qlist[i].addRemoveAllEvent();
1054 }
1055 }
1056 }
1057 }
1058 }
1059
1060
1061
1062
1063
1064
1065 protected int getPutCount()
1066 {
1067 return puts;
1068 }
1069
1070
1071
1072
1073
1074
1075
1076 public void dispose( String cacheName )
1077 throws IOException
1078 {
1079 dispose( cacheName, 0 );
1080 }
1081
1082
1083
1084
1085
1086
1087
1088
1089 public void dispose( String cacheName, long requesterId )
1090 throws IOException
1091 {
1092 ICacheEvent cacheEvent = createICacheEvent( cacheName, "none", requesterId, ICacheEventLogger.DISPOSE_EVENT );
1093 try
1094 {
1095 processDispose( cacheName, requesterId );
1096 }
1097 finally
1098 {
1099 logICacheEvent( cacheEvent );
1100 }
1101 }
1102
1103
1104
1105
1106
1107
1108 private void processDispose( String cacheName, long requesterId )
1109 throws IOException
1110 {
1111 if ( log.isInfoEnabled() )
1112 {
1113 log.info( "Dispose request received from listener [" + requesterId + "]" );
1114 }
1115
1116 CacheListeners cacheDesc = cacheListenersMap.get( cacheName );
1117
1118
1119 if ( cacheDesc != null )
1120 {
1121
1122 synchronized ( cacheDesc )
1123 {
1124 ICacheEventQueue[] qlist = getEventQList( cacheDesc, requesterId );
1125
1126 for ( int i = 0; i < qlist.length; i++ )
1127 {
1128 qlist[i].addDisposeEvent();
1129 }
1130 cacheManager.freeCache( cacheName );
1131 }
1132 }
1133 }
1134
1135
1136
1137
1138
1139
1140 public void release()
1141 throws IOException
1142 {
1143 synchronized ( cacheListenersMap )
1144 {
1145 for (CacheListeners cacheDesc : cacheListenersMap.values())
1146 {
1147 ICacheEventQueue[] qlist = getEventQList( cacheDesc, 0 );
1148
1149 for ( int i = 0; i < qlist.length; i++ )
1150 {
1151 qlist[i].addDisposeEvent();
1152 }
1153 }
1154 cacheManager.release();
1155 }
1156 }
1157
1158
1159
1160
1161
1162
1163
1164
1165 protected CacheListeners getCacheListeners( String cacheName )
1166 {
1167 CacheListeners cacheListeners = cacheListenersMap.get( cacheName );
1168 synchronized ( cacheListenersMap )
1169 {
1170 if ( cacheListeners == null )
1171 {
1172 cacheListeners = cacheListenersMap.get( cacheName );
1173 if ( cacheListeners == null )
1174 {
1175 cacheListeners = new CacheListeners( cacheManager.getCache( cacheName ) );
1176 cacheListenersMap.put( cacheName, cacheListeners );
1177 }
1178 }
1179 }
1180 return cacheListeners;
1181 }
1182
1183
1184
1185
1186
1187
1188
1189
1190 protected CacheListeners getClusterListeners( String cacheName )
1191 {
1192 CacheListeners cacheListeners = clusterListenersMap.get( cacheName );
1193 synchronized ( clusterListenersMap )
1194 {
1195 if ( cacheListeners == null )
1196 {
1197 cacheListeners = clusterListenersMap.get( cacheName );
1198 if ( cacheListeners == null )
1199 {
1200 cacheListeners = new CacheListeners( cacheManager.getCache( cacheName ) );
1201 clusterListenersMap.put( cacheName, cacheListeners );
1202 }
1203 }
1204 }
1205 return cacheListeners;
1206 }
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220 private ICacheEventQueue[] getEventQList( CacheListeners cacheListeners, long requesterId )
1221 {
1222 ICacheEventQueue[] list = null;
1223 synchronized ( cacheListeners.eventQMap )
1224 {
1225 list = cacheListeners.eventQMap.values().toArray( new ICacheEventQueue[0] );
1226 }
1227 int count = 0;
1228
1229 for ( int i = 0; i < list.length; i++ )
1230 {
1231 ICacheEventQueue q = list[i];
1232 if ( q.isWorking() && q.getListenerId() != requesterId )
1233 {
1234 count++;
1235 }
1236 else
1237 {
1238 list[i] = null;
1239 }
1240 }
1241 if ( count == list.length )
1242 {
1243
1244 return list;
1245 }
1246
1247
1248 ICacheEventQueue[] qq = new ICacheEventQueue[count];
1249 count = 0;
1250 for ( int i = 0; i < list.length; i++ )
1251 {
1252 if ( list[i] != null )
1253 {
1254 qq[count++] = list[i];
1255 }
1256 }
1257 return qq;
1258 }
1259
1260
1261
1262
1263
1264
1265 private static void cleanupEventQMap( Map<Long, ICacheEventQueue> eventQMap )
1266 {
1267 synchronized ( eventQMap )
1268 {
1269 for (Iterator<Map.Entry<Long, ICacheEventQueue>> itr = eventQMap.entrySet().iterator(); itr.hasNext(); )
1270 {
1271 Map.Entry<Long, ICacheEventQueue> e = itr.next();
1272 ICacheEventQueue q = e.getValue();
1273
1274
1275
1276
1277
1278 if ( !q.isWorking() )
1279 {
1280 itr.remove();
1281 log.warn( "Cache event queue " + q + " is not working and removed from cache server." );
1282 }
1283 }
1284 }
1285 }
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298 public void addCacheListener( String cacheName, ICacheListener listener )
1299 throws IOException
1300 {
1301 if ( cacheName == null || listener == null )
1302 {
1303 throw new IllegalArgumentException( "cacheName and listener must not be null" );
1304 }
1305 CacheListeners cacheListeners;
1306
1307 IRemoteCacheListener ircl = (IRemoteCacheListener) listener;
1308
1309 String listenerAddress = ircl.getLocalHostAddress();
1310
1311 int remoteType = ircl.getRemoteType();
1312 if ( remoteType == IRemoteCacheAttributes.CLUSTER )
1313 {
1314 log.debug( "adding cluster listener, listenerAddress [" + listenerAddress + "]" );
1315 cacheListeners = getClusterListeners( cacheName );
1316 }
1317 else
1318 {
1319 log.debug( "adding normal listener, listenerAddress [" + listenerAddress + "]" );
1320 cacheListeners = getCacheListeners( cacheName );
1321 }
1322 Map<Long, ICacheEventQueue> eventQMap = cacheListeners.eventQMap;
1323 cleanupEventQMap( eventQMap );
1324
1325
1326 synchronized ( ICacheListener.class )
1327 {
1328 long id = 0;
1329 try
1330 {
1331 id = listener.getListenerId();
1332
1333 if ( id == 0 )
1334 {
1335
1336 long listenerIdB = nextListenerId();
1337 if ( log.isDebugEnabled() )
1338 {
1339 log.debug( "listener id=" + ( listenerIdB & 0xff ) + " addded for cache [" + cacheName
1340 + "], listenerAddress [" + listenerAddress + "]" );
1341 }
1342 listener.setListenerId( listenerIdB );
1343 id = listenerIdB;
1344
1345
1346 String message = "Adding vm listener under new id = [" + listenerIdB + "], listenerAddress ["
1347 + listenerAddress + "]";
1348 logApplicationEvent( "RemoteCacheServer", "addCacheListener", message );
1349 if ( log.isInfoEnabled() )
1350 {
1351 log.info( message );
1352 }
1353 }
1354 else
1355 {
1356 String message = "Adding listener under existing id = [" + id + "], listenerAddress ["
1357 + listenerAddress + "]";
1358 logApplicationEvent( "RemoteCacheServer", "addCacheListener", message );
1359 if ( log.isInfoEnabled() )
1360 {
1361 log.info( message );
1362 }
1363
1364
1365 }
1366
1367
1368 this.idTypeMap.put( Long.valueOf( id ), Integer.valueOf( remoteType ) );
1369 if ( listenerAddress != null )
1370 {
1371 this.idIPMap.put( Long.valueOf( id ), listenerAddress );
1372 }
1373 }
1374 catch ( IOException ioe )
1375 {
1376 String message = "Problem setting listener id, listenerAddress [" + listenerAddress + "]";
1377 log.error( message, ioe );
1378
1379 if ( cacheEventLogger != null )
1380 {
1381 cacheEventLogger.logError( "RemoteCacheServer", "addCacheListener", message + " - "
1382 + ioe.getMessage() );
1383 }
1384 }
1385
1386 CacheEventQueueFactory fact = new CacheEventQueueFactory();
1387 ICacheEventQueue q = fact.createCacheEventQueue( listener, id, cacheName, remoteCacheServerAttributes
1388 .getEventQueuePoolName(), remoteCacheServerAttributes.getEventQueueType() );
1389
1390 eventQMap.put(Long.valueOf(listener.getListenerId()), q);
1391
1392 if ( log.isInfoEnabled() )
1393 {
1394 log.info( cacheListeners );
1395 }
1396 }
1397 }
1398
1399
1400
1401
1402
1403
1404
1405 public void addCacheListener( ICacheListener listener )
1406 throws IOException
1407 {
1408 for (String cacheName : cacheListenersMap.keySet())
1409 {
1410 addCacheListener( cacheName, listener );
1411
1412 if ( log.isDebugEnabled() )
1413 {
1414 log.debug( "Adding listener for cache [" + cacheName + "]" );
1415 }
1416 }
1417 }
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427 public void removeCacheListener( String cacheName, ICacheListener listener )
1428 throws IOException
1429 {
1430 removeCacheListener( cacheName, listener.getListenerId() );
1431 }
1432
1433
1434
1435
1436
1437
1438
1439
1440 public void removeCacheListener( String cacheName, long listenerId )
1441 {
1442 String message = "Removing listener for cache region = [" + cacheName + "] and listenerId [" + listenerId + "]";
1443 logApplicationEvent( "RemoteCacheServer", "removeCacheListener", message );
1444 if ( log.isInfoEnabled() )
1445 {
1446 log.info( message );
1447 }
1448
1449 boolean isClusterListener = isRequestFromCluster( listenerId );
1450
1451 CacheListeners cacheDesc = null;
1452
1453 if ( isClusterListener )
1454 {
1455 cacheDesc = getClusterListeners( cacheName );
1456 }
1457 else
1458 {
1459 cacheDesc = getCacheListeners( cacheName );
1460 }
1461 Map<Long, ICacheEventQueue> eventQMap = cacheDesc.eventQMap;
1462 cleanupEventQMap( eventQMap );
1463 ICacheEventQueue q = eventQMap.remove( Long.valueOf( listenerId ) );
1464
1465 if ( q != null )
1466 {
1467 if ( log.isDebugEnabled() )
1468 {
1469 log.debug( "Found queue for cache region = [" + cacheName + "] and listenerId [" + listenerId + "]" );
1470 }
1471 q.destroy();
1472 cleanupEventQMap( eventQMap );
1473 }
1474 else
1475 {
1476 if ( log.isDebugEnabled() )
1477 {
1478 log.debug( "Did not find queue for cache region = [" + cacheName + "] and listenerId [" + listenerId
1479 + "]" );
1480 }
1481 }
1482
1483
1484 idTypeMap.remove( Long.valueOf( listenerId ) );
1485 idIPMap.remove( Long.valueOf( listenerId ) );
1486
1487 if ( log.isInfoEnabled() )
1488 {
1489 log.info( "After removing listener [" + listenerId + "] cache region " + cacheName + "'s listener size ["
1490 + cacheDesc.eventQMap.size() + "]" );
1491 }
1492 }
1493
1494
1495
1496
1497
1498
1499
1500 public void removeCacheListener( ICacheListener listener )
1501 throws IOException
1502 {
1503 for (String cacheName : cacheListenersMap.keySet())
1504 {
1505 removeCacheListener( cacheName, listener );
1506
1507 if ( log.isInfoEnabled() )
1508 {
1509 log.info( "Removing listener for cache [" + cacheName + "]" );
1510 }
1511 }
1512 return;
1513 }
1514
1515
1516
1517
1518
1519
1520 public void shutdown()
1521 throws IOException
1522 {
1523 RemoteCacheServerFactory.shutdownImpl( "", Registry.REGISTRY_PORT );
1524 }
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534 public void shutdown( String host, int port )
1535 throws IOException
1536 {
1537 if ( log.isInfoEnabled() )
1538 {
1539 log.info( "Received shutdown request. Shutting down server." );
1540 }
1541 RemoteCacheServerFactory.shutdownImpl( host, port );
1542 this.cacheManager.shutDown();
1543 }
1544
1545
1546
1547
1548
1549
1550 public void unreferenced()
1551 {
1552 if ( log.isInfoEnabled() )
1553 {
1554 log.info( "*** Server now unreferenced and subject to GC. ***" );
1555 }
1556 }
1557
1558
1559
1560
1561
1562
1563 private long nextListenerId()
1564 {
1565 long id = 0;
1566 if ( listenerId[0] == Long.MAX_VALUE )
1567 {
1568 synchronized ( listenerId )
1569 {
1570 id = listenerId[0];
1571 listenerId[0] = 0;
1572
1573
1574
1575
1576
1577 }
1578 }
1579 else
1580 {
1581 synchronized ( listenerId )
1582 {
1583 id = ++listenerId[0];
1584 }
1585 }
1586 return id;
1587 }
1588
1589
1590
1591
1592
1593
1594
1595 public String getStats()
1596 throws IOException
1597 {
1598 return cacheManager.getStats();
1599 }
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609 private ICacheEvent createICacheEvent( ICacheElement item, long requesterId, String eventName )
1610 {
1611 if ( cacheEventLogger == null )
1612 {
1613 return EMPTY_ICACHE_EVENT;
1614 }
1615 String ipAddress = getExtraInfoForRequesterId( requesterId );
1616 return cacheEventLogger
1617 .createICacheEvent( "RemoteCacheServer", item.getCacheName(), eventName, ipAddress, item );
1618 }
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629 private ICacheEvent createICacheEvent( String cacheName, Serializable key, long requesterId, String eventName )
1630 {
1631 if ( cacheEventLogger == null )
1632 {
1633 return EMPTY_ICACHE_EVENT;
1634 }
1635 String ipAddress = getExtraInfoForRequesterId( requesterId );
1636 return cacheEventLogger.createICacheEvent( "RemoteCacheServer", cacheName, eventName, ipAddress, key );
1637 }
1638
1639
1640
1641
1642
1643
1644
1645
1646 protected void logApplicationEvent( String source, String eventName, String optionalDetails )
1647 {
1648 if ( cacheEventLogger != null )
1649 {
1650 cacheEventLogger.logApplicationEvent( source, eventName, optionalDetails );
1651 }
1652 }
1653
1654
1655
1656
1657
1658
1659 protected void logICacheEvent( ICacheEvent cacheEvent )
1660 {
1661 if ( cacheEventLogger != null )
1662 {
1663 cacheEventLogger.logICacheEvent( cacheEvent );
1664 }
1665 }
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675 protected String getExtraInfoForRequesterId( long requesterId )
1676 {
1677 String ipAddress = idIPMap.get( Long.valueOf( requesterId ) );
1678 return ipAddress;
1679 }
1680
1681
1682
1683
1684
1685
1686 public void setCacheEventLogger( ICacheEventLogger cacheEventLogger )
1687 {
1688 this.cacheEventLogger = cacheEventLogger;
1689 }
1690 }