1 package org.apache.commons.jcs.auxiliary.remote.server;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.IOException;
23 import java.io.Serializable;
24 import java.rmi.RemoteException;
25 import java.rmi.registry.Registry;
26 import java.rmi.server.RMISocketFactory;
27 import java.rmi.server.UnicastRemoteObject;
28 import java.rmi.server.Unreferenced;
29 import java.util.Collections;
30 import java.util.Iterator;
31 import java.util.Map;
32 import java.util.Properties;
33 import java.util.Set;
34 import java.util.concurrent.ConcurrentHashMap;
35 import java.util.concurrent.ConcurrentMap;
36 import java.util.concurrent.locks.ReentrantLock;
37
38 import org.apache.commons.jcs.access.exception.CacheException;
39 import org.apache.commons.jcs.auxiliary.remote.RemoteUtils;
40 import org.apache.commons.jcs.auxiliary.remote.behavior.IRemoteCacheListener;
41 import org.apache.commons.jcs.auxiliary.remote.server.behavior.IRemoteCacheServer;
42 import org.apache.commons.jcs.auxiliary.remote.server.behavior.IRemoteCacheServerAttributes;
43 import org.apache.commons.jcs.auxiliary.remote.server.behavior.RemoteType;
44 import org.apache.commons.jcs.engine.CacheEventQueueFactory;
45 import org.apache.commons.jcs.engine.CacheListeners;
46 import org.apache.commons.jcs.engine.behavior.ICacheElement;
47 import org.apache.commons.jcs.engine.behavior.ICacheEventQueue;
48 import org.apache.commons.jcs.engine.behavior.ICacheListener;
49 import org.apache.commons.jcs.engine.control.CompositeCache;
50 import org.apache.commons.jcs.engine.control.CompositeCacheManager;
51 import org.apache.commons.jcs.engine.logging.CacheEvent;
52 import org.apache.commons.jcs.engine.logging.behavior.ICacheEvent;
53 import org.apache.commons.jcs.engine.logging.behavior.ICacheEventLogger;
54 import org.apache.commons.logging.Log;
55 import org.apache.commons.logging.LogFactory;
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72 public class RemoteCacheServer<K, V>
73 extends UnicastRemoteObject
74 implements IRemoteCacheServer<K, V>, Unreferenced
75 {
76 public static final String DFEAULT_REMOTE_CONFIGURATION_FILE = "/remote.cache.ccf";
77
78
79 private static final long serialVersionUID = -8072345435941473116L;
80
81
82 private static final Log log = LogFactory.getLog( RemoteCacheServer.class );
83
84
85 private static final boolean timing = true;
86
87
88 private int puts = 0;
89
90
91 private final transient ConcurrentMap<String, CacheListeners<K, V>> cacheListenersMap =
92 new ConcurrentHashMap<String, CacheListeners<K, V>>();
93
94
95 private final transient ConcurrentMap<String, CacheListeners<K, V>> clusterListenersMap =
96 new ConcurrentHashMap<String, CacheListeners<K, V>>();
97
98
99 private transient CompositeCacheManager cacheManager;
100
101
102 private final ConcurrentMap<Long, RemoteType> idTypeMap = new ConcurrentHashMap<Long, RemoteType>();
103
104
105 private final ConcurrentMap<Long, String> idIPMap = new ConcurrentHashMap<Long, String>();
106
107
108 private final int[] listenerId = new int[1];
109
110
111
112 final IRemoteCacheServerAttributes remoteCacheServerAttributes;
113
114
115 private final int logInterval = 100;
116
117
118 private transient ICacheEventLogger cacheEventLogger;
119
120
121 private ReentrantLock cacheListenersLock = new ReentrantLock();
122
123
124 private ReentrantLock clusterListenersLock = new ReentrantLock();
125
126
127
128
129
130
131
132
133
134 protected RemoteCacheServer( IRemoteCacheServerAttributes rcsa, Properties config )
135 throws RemoteException
136 {
137 super( rcsa.getServicePort() );
138 this.remoteCacheServerAttributes = rcsa;
139 init( config );
140 }
141
142
143
144
145
146
147
148
149
150
151 protected RemoteCacheServer( IRemoteCacheServerAttributes rcsa, Properties config, RMISocketFactory customRMISocketFactory )
152 throws RemoteException
153 {
154 super( rcsa.getServicePort(), customRMISocketFactory, customRMISocketFactory );
155 this.remoteCacheServerAttributes = rcsa;
156 init( config );
157 }
158
159
160
161
162
163
164
165
166
167
168 @Deprecated
169 protected RemoteCacheServer( IRemoteCacheServerAttributes rcsa )
170 throws RemoteException
171 {
172 super( rcsa.getServicePort() );
173 this.remoteCacheServerAttributes = rcsa;
174 init( rcsa.getConfigFileName() );
175 }
176
177
178
179
180
181
182
183
184
185
186
187 @Deprecated
188 protected RemoteCacheServer( IRemoteCacheServerAttributes rcsa, RMISocketFactory customRMISocketFactory )
189 throws RemoteException
190 {
191 super( rcsa.getServicePort(), customRMISocketFactory, customRMISocketFactory );
192 this.remoteCacheServerAttributes = rcsa;
193 init( rcsa.getConfigFileName() );
194 }
195
196
197
198
199
200
201
202
203
204 @Deprecated
205 private void init( String propFile ) throws RemoteException
206 {
207 String propFileName = propFile == null ? DFEAULT_REMOTE_CONFIGURATION_FILE : propFile;
208
209 Properties prop = null;
210 try
211 {
212 prop = RemoteUtils.loadProps(propFileName);
213 }
214 catch (IOException e)
215 {
216 throw new RemoteException(e.getMessage(), e);
217 }
218
219 init(prop);
220 }
221
222
223
224
225
226
227
228 private void init( Properties prop ) throws RemoteException
229 {
230 try
231 {
232 cacheManager = createCacheManager( prop );
233 }
234 catch (CacheException e)
235 {
236 throw new RemoteException(e.getMessage(), e);
237 }
238
239
240
241 String[] list = cacheManager.getCacheNames();
242 for ( int i = 0; i < list.length; i++ )
243 {
244 String name = list[i];
245 CompositeCache<K, V> cache = cacheManager.getCache( name );
246 cacheListenersMap.put( name, new CacheListeners<K, V>( cache ) );
247 }
248 }
249
250
251
252
253
254
255
256
257
258 private CompositeCacheManager createCacheManager( Properties prop ) throws CacheException
259 {
260 CompositeCacheManager hub = CompositeCacheManager.getUnconfiguredInstance();
261 hub.configure( prop );
262 return hub;
263 }
264
265
266
267
268
269
270
271
272
273
274
275 public void put( ICacheElement<K, V> item )
276 throws IOException
277 {
278 update( item );
279 }
280
281
282
283
284
285 @Override
286 public void update( ICacheElement<K, V> item )
287 throws IOException
288 {
289 update( item, 0 );
290 }
291
292
293
294
295
296
297
298
299 @Override
300 public void update( ICacheElement<K, V> item, long requesterId )
301 throws IOException
302 {
303 ICacheEvent<ICacheElement<K, V>> cacheEvent = createICacheEvent( item, requesterId, ICacheEventLogger.UPDATE_EVENT );
304 try
305 {
306 processUpdate( item, requesterId );
307 }
308 finally
309 {
310 logICacheEvent( cacheEvent );
311 }
312 }
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337 private void processUpdate( ICacheElement<K, V> item, long requesterId )
338 {
339 long start = 0;
340 if ( timing )
341 {
342 start = System.currentTimeMillis();
343 }
344
345 logUpdateInfo( item );
346
347 try
348 {
349 CacheListeners<K, V> cacheDesc = getCacheListeners( item.getCacheName() );
350 item.getVal();
351
352 boolean fromCluster = isRequestFromCluster( requesterId );
353
354 if ( log.isDebugEnabled() )
355 {
356 log.debug( "In update, requesterId = [" + requesterId + "] fromCluster = " + fromCluster );
357 }
358
359
360 synchronized ( cacheDesc )
361 {
362 try
363 {
364 CompositeCache<K, V> c = (CompositeCache<K, V>) cacheDesc.cache;
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380 if ( fromCluster )
381 {
382 if ( log.isDebugEnabled() )
383 {
384 log.debug( "Put FROM cluster, NOT updating other auxiliaries for region. "
385 + " requesterId [" + requesterId + "]" );
386 }
387 c.localUpdate( item );
388 }
389 else
390 {
391 if ( log.isDebugEnabled() )
392 {
393 log.debug( "Put NOT from cluster, updating other auxiliaries for region. "
394 + " requesterId [" + requesterId + "]" );
395 }
396 c.update( item );
397 }
398 }
399 catch ( Exception ce )
400 {
401
402 if ( log.isInfoEnabled() )
403 {
404 log.info( "Exception caught updating item. requesterId [" + requesterId + "] "
405 + ce.getMessage() );
406 }
407 }
408
409
410
411 if ( !fromCluster || ( fromCluster && remoteCacheServerAttributes.isLocalClusterConsistency() ) )
412 {
413 ICacheEventQueue<K, V>[] qlist = getEventQList( cacheDesc, requesterId );
414 if ( log.isDebugEnabled() )
415 {
416 log.debug( "qlist.length = " + qlist.length );
417 }
418 for ( int i = 0; i < qlist.length; i++ )
419 {
420 qlist[i].addPutEvent( item );
421 }
422 }
423 }
424 }
425 catch ( IOException e )
426 {
427 if ( cacheEventLogger != null )
428 {
429 cacheEventLogger.logError( "RemoteCacheServer", ICacheEventLogger.UPDATE_EVENT, e.getMessage()
430 + " REGION: " + item.getCacheName() + " ITEM: " + item );
431 }
432
433 log.error( "Trouble in Update. requesterId [" + requesterId + "]", e );
434 }
435
436
437 if ( timing )
438 {
439 long end = System.currentTimeMillis();
440 if ( log.isDebugEnabled() )
441 {
442 log.debug( "put took " + String.valueOf( end - start ) + " ms." );
443 }
444 }
445 }
446
447
448
449
450
451
452 private void logUpdateInfo( ICacheElement<K, V> item )
453 {
454
455 puts++;
456
457 if ( log.isInfoEnabled() )
458 {
459 if ( puts % logInterval == 0 )
460 {
461 log.info( "puts = " + puts );
462 }
463 }
464
465 if ( log.isDebugEnabled() )
466 {
467 log.debug( "In update, put [" + item.getKey() + "] in [" + item.getCacheName() + "]" );
468 }
469 }
470
471
472
473
474
475
476
477
478
479
480 @Override
481 public ICacheElement<K, V> get( String cacheName, K key )
482 throws IOException
483 {
484 return this.get( cacheName, key, 0 );
485 }
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500 @Override
501 public ICacheElement<K, V> get( String cacheName, K key, long requesterId )
502 throws IOException
503 {
504 ICacheElement<K, V> element = null;
505 ICacheEvent<K> cacheEvent = createICacheEvent( cacheName, key, requesterId, ICacheEventLogger.GET_EVENT );
506 try
507 {
508 element = processGet( cacheName, key, requesterId );
509 }
510 finally
511 {
512 logICacheEvent( cacheEvent );
513 }
514 return element;
515 }
516
517
518
519
520
521
522
523
524
525
526
527 private ICacheElement<K, V> processGet( String cacheName, K key, long requesterId )
528 {
529 boolean fromCluster = isRequestFromCluster( requesterId );
530
531 if ( log.isDebugEnabled() )
532 {
533 log.debug( "get [" + key + "] from cache [" + cacheName + "] requesterId = [" + requesterId
534 + "] fromCluster = " + fromCluster );
535 }
536
537 CacheListeners<K, V> cacheDesc = null;
538 try
539 {
540 cacheDesc = getCacheListeners( cacheName );
541 }
542 catch ( Exception e )
543 {
544 log.error( "Problem getting listeners.", e );
545
546 if ( cacheEventLogger != null )
547 {
548 cacheEventLogger.logError( "RemoteCacheServer", ICacheEventLogger.GET_EVENT, e.getMessage() + cacheName
549 + " KEY: " + key );
550 }
551 }
552
553 ICacheElement<K, V> element = getFromCacheListeners( key, fromCluster, cacheDesc, null );
554 return element;
555 }
556
557
558
559
560
561
562
563
564
565
566 private ICacheElement<K, V> getFromCacheListeners( K key, boolean fromCluster, CacheListeners<K, V> cacheDesc,
567 ICacheElement<K, V> element )
568 {
569 ICacheElement<K, V> returnElement = element;
570
571 if ( cacheDesc != null )
572 {
573 CompositeCache<K, V> c = (CompositeCache<K, V>) cacheDesc.cache;
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589 if ( !fromCluster && this.remoteCacheServerAttributes.isAllowClusterGet() )
590 {
591 if ( log.isDebugEnabled() )
592 {
593 log.debug( "NonLocalGet. fromCluster [" + fromCluster + "] AllowClusterGet ["
594 + this.remoteCacheServerAttributes.isAllowClusterGet() + "]" );
595 }
596 returnElement = c.get( key );
597 }
598 else
599 {
600
601
602
603
604 if ( log.isDebugEnabled() )
605 {
606 log.debug( "LocalGet. fromCluster [" + fromCluster + "] AllowClusterGet ["
607 + this.remoteCacheServerAttributes.isAllowClusterGet() + "]" );
608 }
609 returnElement = c.localGet( key );
610 }
611 }
612
613 return returnElement;
614 }
615
616
617
618
619
620
621
622
623
624 @Override
625 public Map<K, ICacheElement<K, V>> getMatching( String cacheName, String pattern )
626 throws IOException
627 {
628 return getMatching( cacheName, pattern, 0 );
629 }
630
631
632
633
634
635
636
637
638
639
640 @Override
641 public Map<K, ICacheElement<K, V>> getMatching( String cacheName, String pattern, long requesterId )
642 throws IOException
643 {
644 ICacheEvent<String> cacheEvent = createICacheEvent( cacheName, pattern, requesterId,
645 ICacheEventLogger.GETMATCHING_EVENT );
646 try
647 {
648 return processGetMatching( cacheName, pattern, requesterId );
649 }
650 finally
651 {
652 logICacheEvent( cacheEvent );
653 }
654 }
655
656
657
658
659
660
661
662
663
664 protected Map<K, ICacheElement<K, V>> processGetMatching( String cacheName, String pattern, long requesterId )
665 {
666 boolean fromCluster = isRequestFromCluster( requesterId );
667
668 if ( log.isDebugEnabled() )
669 {
670 log.debug( "getMatching [" + pattern + "] from cache [" + cacheName + "] requesterId = [" + requesterId
671 + "] fromCluster = " + fromCluster );
672 }
673
674 CacheListeners<K, V> cacheDesc = null;
675 try
676 {
677 cacheDesc = getCacheListeners( cacheName );
678 }
679 catch ( Exception e )
680 {
681 log.error( "Problem getting listeners.", e );
682
683 if ( cacheEventLogger != null )
684 {
685 cacheEventLogger.logError( "RemoteCacheServer", ICacheEventLogger.GETMATCHING_EVENT, e.getMessage()
686 + cacheName + " pattern: " + pattern );
687 }
688 }
689
690 return getMatchingFromCacheListeners( pattern, fromCluster, cacheDesc );
691 }
692
693
694
695
696
697
698
699
700
701 private Map<K, ICacheElement<K, V>> getMatchingFromCacheListeners( String pattern, boolean fromCluster, CacheListeners<K, V> cacheDesc )
702 {
703 Map<K, ICacheElement<K, V>> elements = null;
704 if ( cacheDesc != null )
705 {
706 CompositeCache<K, V> c = (CompositeCache<K, V>) cacheDesc.cache;
707
708
709
710
711
712 if ( !fromCluster && this.remoteCacheServerAttributes.isAllowClusterGet() )
713 {
714 if ( log.isDebugEnabled() )
715 {
716 log.debug( "NonLocalGetMatching. fromCluster [" + fromCluster + "] AllowClusterGet ["
717 + this.remoteCacheServerAttributes.isAllowClusterGet() + "]" );
718 }
719 elements = c.getMatching( pattern );
720 }
721 else
722 {
723
724
725
726
727 if ( log.isDebugEnabled() )
728 {
729 log.debug( "LocalGetMatching. fromCluster [" + fromCluster + "] AllowClusterGet ["
730 + this.remoteCacheServerAttributes.isAllowClusterGet() + "]" );
731 }
732 elements = c.localGetMatching( pattern );
733 }
734 }
735 return elements;
736 }
737
738
739
740
741
742
743
744
745
746
747 @Override
748 public Map<K, ICacheElement<K, V>> getMultiple( String cacheName, Set<K> keys )
749 throws IOException
750 {
751 return this.getMultiple( cacheName, keys, 0 );
752 }
753
754
755
756
757
758
759
760
761
762
763
764
765
766 @Override
767 public Map<K, ICacheElement<K, V>> getMultiple( String cacheName, Set<K> keys, long requesterId )
768 throws IOException
769 {
770 ICacheEvent<Serializable> cacheEvent = createICacheEvent( cacheName, (Serializable) keys, requesterId,
771 ICacheEventLogger.GETMULTIPLE_EVENT );
772 try
773 {
774 return processGetMultiple( cacheName, keys, requesterId );
775 }
776 finally
777 {
778 logICacheEvent( cacheEvent );
779 }
780 }
781
782
783
784
785
786
787
788
789
790
791 private Map<K, ICacheElement<K, V>> processGetMultiple( String cacheName, Set<K> keys, long requesterId )
792 {
793 boolean fromCluster = isRequestFromCluster( requesterId );
794
795 if ( log.isDebugEnabled() )
796 {
797 log.debug( "getMultiple [" + keys + "] from cache [" + cacheName + "] requesterId = [" + requesterId
798 + "] fromCluster = " + fromCluster );
799 }
800
801 CacheListeners<K, V> cacheDesc = getCacheListeners( cacheName );
802 Map<K, ICacheElement<K, V>> elements = getMultipleFromCacheListeners( keys, null, fromCluster, cacheDesc );
803 return elements;
804 }
805
806
807
808
809
810
811
812
813
814 private boolean isRequestFromCluster( long requesterId )
815 {
816 RemoteType remoteTypeL = idTypeMap.get( Long.valueOf( requesterId ) );
817 return remoteTypeL == RemoteType.CLUSTER;
818 }
819
820
821
822
823
824
825
826
827
828
829 private Map<K, ICacheElement<K, V>> getMultipleFromCacheListeners( Set<K> keys, Map<K, ICacheElement<K, V>> elements, boolean fromCluster, CacheListeners<K, V> cacheDesc )
830 {
831 Map<K, ICacheElement<K, V>> returnElements = elements;
832
833 if ( cacheDesc != null )
834 {
835 CompositeCache<K, V> c = (CompositeCache<K, V>) cacheDesc.cache;
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851 if ( !fromCluster && this.remoteCacheServerAttributes.isAllowClusterGet() )
852 {
853 if ( log.isDebugEnabled() )
854 {
855 log.debug( "NonLocalGetMultiple. fromCluster [" + fromCluster + "] AllowClusterGet ["
856 + this.remoteCacheServerAttributes.isAllowClusterGet() + "]" );
857 }
858
859 returnElements = c.getMultiple( keys );
860 }
861 else
862 {
863
864
865
866
867 if ( log.isDebugEnabled() )
868 {
869 log.debug( "LocalGetMultiple. fromCluster [" + fromCluster + "] AllowClusterGet ["
870 + this.remoteCacheServerAttributes.isAllowClusterGet() + "]" );
871 }
872
873 returnElements = c.localGetMultiple( keys );
874 }
875 }
876
877 return returnElements;
878 }
879
880
881
882
883
884
885
886 @Override
887 public Set<K> getKeySet(String cacheName) throws IOException
888 {
889 return processGetKeySet( cacheName );
890 }
891
892
893
894
895
896
897
898 protected Set<K> processGetKeySet( String cacheName )
899 {
900 CacheListeners<K, V> cacheDesc = null;
901 try
902 {
903 cacheDesc = getCacheListeners( cacheName );
904 }
905 catch ( Exception e )
906 {
907 log.error( "Problem getting listeners.", e );
908 }
909
910 if ( cacheDesc == null )
911 {
912 return Collections.emptySet();
913 }
914
915 CompositeCache<K, V> c = (CompositeCache<K, V>) cacheDesc.cache;
916 return c.getKeySet();
917 }
918
919
920
921
922
923
924
925
926 @Override
927 public void remove( String cacheName, K key )
928 throws IOException
929 {
930 remove( cacheName, key, 0 );
931 }
932
933
934
935
936
937
938
939
940
941
942
943 @Override
944 public void remove( String cacheName, K key, long requesterId )
945 throws IOException
946 {
947 ICacheEvent<K> cacheEvent = createICacheEvent( cacheName, key, requesterId, ICacheEventLogger.REMOVE_EVENT );
948 try
949 {
950 processRemove( cacheName, key, requesterId );
951 }
952 finally
953 {
954 logICacheEvent( cacheEvent );
955 }
956 }
957
958
959
960
961
962
963
964
965
966 private void processRemove( String cacheName, K key, long requesterId )
967 throws IOException
968 {
969 if ( log.isDebugEnabled() )
970 {
971 log.debug( "remove [" + key + "] from cache [" + cacheName + "]" );
972 }
973
974 CacheListeners<K, V> cacheDesc = cacheListenersMap.get( cacheName );
975
976 boolean fromCluster = isRequestFromCluster( requesterId );
977
978 if ( cacheDesc != null )
979 {
980
981
982 synchronized ( cacheDesc )
983 {
984 boolean removeSuccess = false;
985
986
987 CompositeCache<K, V> c = (CompositeCache<K, V>) cacheDesc.cache;
988
989 if ( fromCluster )
990 {
991 if ( log.isDebugEnabled() )
992 {
993 log.debug( "Remove FROM cluster, NOT updating other auxiliaries for region" );
994 }
995 removeSuccess = c.localRemove( key );
996 }
997 else
998 {
999 if ( log.isDebugEnabled() )
1000 {
1001 log.debug( "Remove NOT from cluster, updating other auxiliaries for region" );
1002 }
1003 removeSuccess = c.remove( key );
1004 }
1005
1006 if ( log.isDebugEnabled() )
1007 {
1008 log.debug( "remove [" + key + "] from cache [" + cacheName + "] success (was it found) = "
1009 + removeSuccess );
1010 }
1011
1012
1013
1014 if ( !fromCluster || ( fromCluster && remoteCacheServerAttributes.isLocalClusterConsistency() ) )
1015 {
1016 ICacheEventQueue<K, V>[] qlist = getEventQList( cacheDesc, requesterId );
1017
1018 for ( int i = 0; i < qlist.length; i++ )
1019 {
1020 qlist[i].addRemoveEvent( key );
1021 }
1022 }
1023 }
1024 }
1025 }
1026
1027
1028
1029
1030
1031
1032
1033 @Override
1034 public void removeAll( String cacheName )
1035 throws IOException
1036 {
1037 removeAll( cacheName, 0 );
1038 }
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049 @Override
1050 public void removeAll( String cacheName, long requesterId )
1051 throws IOException
1052 {
1053 ICacheEvent<String> cacheEvent = createICacheEvent( cacheName, "all", requesterId, ICacheEventLogger.REMOVEALL_EVENT );
1054 try
1055 {
1056 processRemoveAll( cacheName, requesterId );
1057 }
1058 finally
1059 {
1060 logICacheEvent( cacheEvent );
1061 }
1062 }
1063
1064
1065
1066
1067
1068
1069
1070
1071 private void processRemoveAll( String cacheName, long requesterId )
1072 throws IOException
1073 {
1074 CacheListeners<K, V> cacheDesc = cacheListenersMap.get( cacheName );
1075
1076 boolean fromCluster = isRequestFromCluster( requesterId );
1077
1078 if ( cacheDesc != null )
1079 {
1080
1081
1082 synchronized ( cacheDesc )
1083 {
1084
1085 CompositeCache<K, V> c = (CompositeCache<K, V>) cacheDesc.cache;
1086
1087 if ( fromCluster )
1088 {
1089 if ( log.isDebugEnabled() )
1090 {
1091 log.debug( "RemoveALL FROM cluster, NOT updating other auxiliaries for region" );
1092 }
1093 c.localRemoveAll();
1094 }
1095 else
1096 {
1097 if ( log.isDebugEnabled() )
1098 {
1099 log.debug( "RemoveALL NOT from cluster, updating other auxiliaries for region" );
1100 }
1101 c.removeAll();
1102 }
1103
1104
1105 if ( !fromCluster || ( fromCluster && remoteCacheServerAttributes.isLocalClusterConsistency() ) )
1106 {
1107 ICacheEventQueue<K, V>[] qlist = getEventQList( cacheDesc, requesterId );
1108
1109 for ( int i = 0; i < qlist.length; i++ )
1110 {
1111 qlist[i].addRemoveAllEvent();
1112 }
1113 }
1114 }
1115 }
1116 }
1117
1118
1119
1120
1121
1122
1123
1124 int getPutCount()
1125 {
1126 return puts;
1127 }
1128
1129
1130
1131
1132
1133
1134
1135 @Override
1136 public void dispose( String cacheName )
1137 throws IOException
1138 {
1139 dispose( cacheName, 0 );
1140 }
1141
1142
1143
1144
1145
1146
1147
1148
1149 public void dispose( String cacheName, long requesterId )
1150 throws IOException
1151 {
1152 ICacheEvent<String> cacheEvent = createICacheEvent( cacheName, "none", requesterId, ICacheEventLogger.DISPOSE_EVENT );
1153 try
1154 {
1155 processDispose( cacheName, requesterId );
1156 }
1157 finally
1158 {
1159 logICacheEvent( cacheEvent );
1160 }
1161 }
1162
1163
1164
1165
1166
1167
1168 private void processDispose( String cacheName, long requesterId )
1169 throws IOException
1170 {
1171 if ( log.isInfoEnabled() )
1172 {
1173 log.info( "Dispose request received from listener [" + requesterId + "]" );
1174 }
1175
1176 CacheListeners<K, V> cacheDesc = cacheListenersMap.get( cacheName );
1177
1178
1179 if ( cacheDesc != null )
1180 {
1181
1182 synchronized ( cacheDesc )
1183 {
1184 ICacheEventQueue<K, V>[] qlist = getEventQList( cacheDesc, requesterId );
1185
1186 for ( int i = 0; i < qlist.length; i++ )
1187 {
1188 qlist[i].addDisposeEvent();
1189 }
1190 cacheManager.freeCache( cacheName );
1191 }
1192 }
1193 }
1194
1195
1196
1197
1198
1199
1200 @Override
1201 public void release()
1202 throws IOException
1203 {
1204 for (CacheListeners<K, V> cacheDesc : cacheListenersMap.values())
1205 {
1206 ICacheEventQueue<K, V>[] qlist = getEventQList( cacheDesc, 0 );
1207
1208 for ( int i = 0; i < qlist.length; i++ )
1209 {
1210 qlist[i].addDisposeEvent();
1211 }
1212 }
1213 cacheManager.release();
1214 }
1215
1216
1217
1218
1219
1220
1221
1222
1223 protected CacheListeners<K, V> getCacheListeners( String cacheName )
1224 {
1225 CacheListeners<K, V> cacheListeners = cacheListenersMap.get( cacheName );
1226
1227 if ( cacheListeners == null )
1228 {
1229 cacheListenersLock.lock();
1230
1231 try
1232 {
1233
1234 cacheListeners = cacheListenersMap.get( cacheName );
1235 if ( cacheListeners == null )
1236 {
1237 CompositeCache<K, V> cache = cacheManager.getCache( cacheName );
1238 cacheListeners = new CacheListeners<K, V>( cache );
1239 cacheListenersMap.put( cacheName, cacheListeners );
1240 }
1241 }
1242 finally
1243 {
1244 cacheListenersLock.unlock();
1245 }
1246 }
1247
1248 return cacheListeners;
1249 }
1250
1251
1252
1253
1254
1255
1256
1257
1258 protected CacheListeners<K, V> getClusterListeners( String cacheName )
1259 {
1260 CacheListeners<K, V> cacheListeners = clusterListenersMap.get( cacheName );
1261
1262 if ( cacheListeners == null )
1263 {
1264 clusterListenersLock.lock();
1265
1266 try
1267 {
1268 cacheListeners = clusterListenersMap.get( cacheName );
1269 if ( cacheListeners == null )
1270 {
1271 CompositeCache<K, V> cache = cacheManager.getCache( cacheName );
1272 cacheListeners = new CacheListeners<K, V>( cache );
1273 clusterListenersMap.put( cacheName, cacheListeners );
1274 }
1275 }
1276 finally
1277 {
1278 clusterListenersLock.unlock();
1279 }
1280 }
1281
1282 return cacheListeners;
1283 }
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297 @SuppressWarnings("unchecked")
1298 private ICacheEventQueue<K, V>[] getEventQList( CacheListeners<K, V> cacheListeners, long requesterId )
1299 {
1300 ICacheEventQueue<K, V>[] list = cacheListeners.eventQMap.values().toArray( new ICacheEventQueue[0] );
1301 int count = 0;
1302
1303 for ( int i = 0; i < list.length; i++ )
1304 {
1305 ICacheEventQueue<K, V> q = list[i];
1306 if ( q.isWorking() && q.getListenerId() != requesterId )
1307 {
1308 count++;
1309 }
1310 else
1311 {
1312 list[i] = null;
1313 }
1314 }
1315 if ( count == list.length )
1316 {
1317
1318 return list;
1319 }
1320
1321
1322 ICacheEventQueue<K, V>[] qq = new ICacheEventQueue[count];
1323 count = 0;
1324 for ( int i = 0; i < list.length; i++ )
1325 {
1326 if ( list[i] != null )
1327 {
1328 qq[count++] = list[i];
1329 }
1330 }
1331 return qq;
1332 }
1333
1334
1335
1336
1337
1338
1339 private static <KK, VV> void cleanupEventQMap( Map<Long, ICacheEventQueue<KK, VV>> eventQMap )
1340 {
1341 synchronized ( eventQMap )
1342 {
1343 for (Iterator<Map.Entry<Long, ICacheEventQueue<KK, VV>>> itr = eventQMap.entrySet().iterator(); itr.hasNext(); )
1344 {
1345 Map.Entry<Long, ICacheEventQueue<KK, VV>> e = itr.next();
1346 ICacheEventQueue<KK, VV> q = e.getValue();
1347
1348
1349
1350
1351
1352 if ( !q.isWorking() )
1353 {
1354 itr.remove();
1355 log.warn( "Cache event queue " + q + " is not working and removed from cache server." );
1356 }
1357 }
1358 }
1359 }
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372 @Override
1373 @SuppressWarnings("unchecked")
1374 public <KK, VV> void addCacheListener( String cacheName, ICacheListener<KK, VV> listener )
1375 throws IOException
1376 {
1377 if ( cacheName == null || listener == null )
1378 {
1379 throw new IllegalArgumentException( "cacheName and listener must not be null" );
1380 }
1381 CacheListeners<KK, VV> cacheListeners;
1382
1383 IRemoteCacheListener<KK, VV> ircl = (IRemoteCacheListener<KK, VV>) listener;
1384
1385 String listenerAddress = ircl.getLocalHostAddress();
1386
1387 RemoteType remoteType = ircl.getRemoteType();
1388 if ( remoteType == RemoteType.CLUSTER )
1389 {
1390 log.debug( "adding cluster listener, listenerAddress [" + listenerAddress + "]" );
1391 cacheListeners = (CacheListeners<KK, VV>)getClusterListeners( cacheName );
1392 }
1393 else
1394 {
1395 log.debug( "adding normal listener, listenerAddress [" + listenerAddress + "]" );
1396 cacheListeners = (CacheListeners<KK, VV>)getCacheListeners( cacheName );
1397 }
1398 Map<Long, ICacheEventQueue<KK, VV>> eventQMap = cacheListeners.eventQMap;
1399 cleanupEventQMap( eventQMap );
1400
1401
1402 synchronized ( ICacheListener.class )
1403 {
1404 long id = 0;
1405 try
1406 {
1407 id = listener.getListenerId();
1408
1409 if ( id == 0 )
1410 {
1411
1412 long listenerIdB = nextListenerId();
1413 if ( log.isDebugEnabled() )
1414 {
1415 log.debug( "listener id=" + ( listenerIdB & 0xff ) + " addded for cache [" + cacheName
1416 + "], listenerAddress [" + listenerAddress + "]" );
1417 }
1418 listener.setListenerId( listenerIdB );
1419 id = listenerIdB;
1420
1421
1422 String message = "Adding vm listener under new id = [" + listenerIdB + "], listenerAddress ["
1423 + listenerAddress + "]";
1424 logApplicationEvent( "RemoteCacheServer", "addCacheListener", message );
1425 if ( log.isInfoEnabled() )
1426 {
1427 log.info( message );
1428 }
1429 }
1430 else
1431 {
1432 String message = "Adding listener under existing id = [" + id + "], listenerAddress ["
1433 + listenerAddress + "]";
1434 logApplicationEvent( "RemoteCacheServer", "addCacheListener", message );
1435 if ( log.isInfoEnabled() )
1436 {
1437 log.info( message );
1438 }
1439
1440
1441 }
1442
1443
1444 this.idTypeMap.put( Long.valueOf( id ), remoteType);
1445 if ( listenerAddress != null )
1446 {
1447 this.idIPMap.put( Long.valueOf( id ), listenerAddress );
1448 }
1449 }
1450 catch ( IOException ioe )
1451 {
1452 String message = "Problem setting listener id, listenerAddress [" + listenerAddress + "]";
1453 log.error( message, ioe );
1454
1455 if ( cacheEventLogger != null )
1456 {
1457 cacheEventLogger.logError( "RemoteCacheServer", "addCacheListener", message + " - "
1458 + ioe.getMessage() );
1459 }
1460 }
1461
1462 CacheEventQueueFactory<KK, VV> fact = new CacheEventQueueFactory<KK, VV>();
1463 ICacheEventQueue<KK, VV> q = fact.createCacheEventQueue( listener, id, cacheName, remoteCacheServerAttributes
1464 .getEventQueuePoolName(), remoteCacheServerAttributes.getEventQueueType() );
1465
1466 eventQMap.put(Long.valueOf(listener.getListenerId()), q);
1467
1468 if ( log.isInfoEnabled() )
1469 {
1470 log.info( cacheListeners );
1471 }
1472 }
1473 }
1474
1475
1476
1477
1478
1479
1480
1481 @Override
1482 public <KK, VV> void addCacheListener( ICacheListener<KK, VV> listener )
1483 throws IOException
1484 {
1485 for (String cacheName : cacheListenersMap.keySet())
1486 {
1487 addCacheListener( cacheName, listener );
1488
1489 if ( log.isDebugEnabled() )
1490 {
1491 log.debug( "Adding listener for cache [" + cacheName + "]" );
1492 }
1493 }
1494 }
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504 @Override
1505 public <KK, VV> void removeCacheListener( String cacheName, ICacheListener<KK, VV> listener )
1506 throws IOException
1507 {
1508 removeCacheListener( cacheName, listener.getListenerId() );
1509 }
1510
1511
1512
1513
1514
1515
1516
1517
1518 public void removeCacheListener( String cacheName, long listenerId )
1519 {
1520 String message = "Removing listener for cache region = [" + cacheName + "] and listenerId [" + listenerId + "]";
1521 logApplicationEvent( "RemoteCacheServer", "removeCacheListener", message );
1522 if ( log.isInfoEnabled() )
1523 {
1524 log.info( message );
1525 }
1526
1527 boolean isClusterListener = isRequestFromCluster( listenerId );
1528
1529 CacheListeners<K, V> cacheDesc = null;
1530
1531 if ( isClusterListener )
1532 {
1533 cacheDesc = getClusterListeners( cacheName );
1534 }
1535 else
1536 {
1537 cacheDesc = getCacheListeners( cacheName );
1538 }
1539 Map<Long, ICacheEventQueue<K, V>> eventQMap = cacheDesc.eventQMap;
1540 cleanupEventQMap( eventQMap );
1541 ICacheEventQueue<K, V> q = eventQMap.remove( Long.valueOf( listenerId ) );
1542
1543 if ( q != null )
1544 {
1545 if ( log.isDebugEnabled() )
1546 {
1547 log.debug( "Found queue for cache region = [" + cacheName + "] and listenerId [" + listenerId + "]" );
1548 }
1549 q.destroy();
1550 cleanupEventQMap( eventQMap );
1551 }
1552 else
1553 {
1554 if ( log.isDebugEnabled() )
1555 {
1556 log.debug( "Did not find queue for cache region = [" + cacheName + "] and listenerId [" + listenerId
1557 + "]" );
1558 }
1559 }
1560
1561
1562 idTypeMap.remove( Long.valueOf( listenerId ) );
1563 idIPMap.remove( Long.valueOf( listenerId ) );
1564
1565 if ( log.isInfoEnabled() )
1566 {
1567 log.info( "After removing listener [" + listenerId + "] cache region " + cacheName + "'s listener size ["
1568 + cacheDesc.eventQMap.size() + "]" );
1569 }
1570 }
1571
1572
1573
1574
1575
1576
1577
1578 @Override
1579 public <KK, VV> void removeCacheListener( ICacheListener<KK, VV> listener )
1580 throws IOException
1581 {
1582 for (String cacheName : cacheListenersMap.keySet())
1583 {
1584 removeCacheListener( cacheName, listener );
1585
1586 if ( log.isInfoEnabled() )
1587 {
1588 log.info( "Removing listener for cache [" + cacheName + "]" );
1589 }
1590 }
1591 }
1592
1593
1594
1595
1596
1597
1598 @Override
1599 public void shutdown()
1600 throws IOException
1601 {
1602 shutdown("", Registry.REGISTRY_PORT);
1603 }
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613 @Override
1614 public void shutdown( String host, int port )
1615 throws IOException
1616 {
1617 if ( log.isInfoEnabled() )
1618 {
1619 log.info( "Received shutdown request. Shutting down server." );
1620 }
1621
1622 synchronized (listenerId)
1623 {
1624 for (String cacheName : cacheListenersMap.keySet())
1625 {
1626 for (int i = 0; i <= listenerId[0]; i++)
1627 {
1628 removeCacheListener( cacheName, i );
1629 }
1630
1631 if ( log.isInfoEnabled() )
1632 {
1633 log.info( "Removing listener for cache [" + cacheName + "]" );
1634 }
1635 }
1636
1637 cacheListenersMap.clear();
1638 clusterListenersMap.clear();
1639 }
1640 RemoteCacheServerFactory.shutdownImpl( host, port );
1641 this.cacheManager.shutDown();
1642 }
1643
1644
1645
1646
1647
1648
1649 @Override
1650 public void unreferenced()
1651 {
1652 if ( log.isInfoEnabled() )
1653 {
1654 log.info( "*** Server now unreferenced and subject to GC. ***" );
1655 }
1656 }
1657
1658
1659
1660
1661
1662
1663 private long nextListenerId()
1664 {
1665 long id = 0;
1666 if ( listenerId[0] == Integer.MAX_VALUE )
1667 {
1668 synchronized ( listenerId )
1669 {
1670 id = listenerId[0];
1671 listenerId[0] = 0;
1672
1673
1674
1675
1676
1677 }
1678 }
1679 else
1680 {
1681 synchronized ( listenerId )
1682 {
1683 id = ++listenerId[0];
1684 }
1685 }
1686 return id;
1687 }
1688
1689
1690
1691
1692
1693
1694
1695 @Override
1696 public String getStats()
1697 throws IOException
1698 {
1699 return cacheManager.getStats();
1700 }
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710 private ICacheEvent<ICacheElement<K, V>> createICacheEvent( ICacheElement<K, V> item, long requesterId, String eventName )
1711 {
1712 if ( cacheEventLogger == null )
1713 {
1714 return new CacheEvent<ICacheElement<K, V>>();
1715 }
1716 String ipAddress = getExtraInfoForRequesterId( requesterId );
1717 return cacheEventLogger
1718 .createICacheEvent( "RemoteCacheServer", item.getCacheName(), eventName, ipAddress, item );
1719 }
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730 private <T> ICacheEvent<T> createICacheEvent( String cacheName, T key, long requesterId, String eventName )
1731 {
1732 if ( cacheEventLogger == null )
1733 {
1734 return new CacheEvent<T>();
1735 }
1736 String ipAddress = getExtraInfoForRequesterId( requesterId );
1737 return cacheEventLogger.createICacheEvent( "RemoteCacheServer", cacheName, eventName, ipAddress, key );
1738 }
1739
1740
1741
1742
1743
1744
1745
1746
1747 protected void logApplicationEvent( String source, String eventName, String optionalDetails )
1748 {
1749 if ( cacheEventLogger != null )
1750 {
1751 cacheEventLogger.logApplicationEvent( source, eventName, optionalDetails );
1752 }
1753 }
1754
1755
1756
1757
1758
1759
1760 protected <T> void logICacheEvent( ICacheEvent<T> cacheEvent )
1761 {
1762 if ( cacheEventLogger != null )
1763 {
1764 cacheEventLogger.logICacheEvent( cacheEvent );
1765 }
1766 }
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776 protected String getExtraInfoForRequesterId( long requesterId )
1777 {
1778 String ipAddress = idIPMap.get( Long.valueOf( requesterId ) );
1779 return ipAddress;
1780 }
1781
1782
1783
1784
1785
1786
1787 public void setCacheEventLogger( ICacheEventLogger cacheEventLogger )
1788 {
1789 this.cacheEventLogger = cacheEventLogger;
1790 }
1791 }