1 package org.apache.commons.jcs.auxiliary.lateral.socket.tcp;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import org.apache.commons.jcs.auxiliary.AuxiliaryCache;
23 import org.apache.commons.jcs.auxiliary.AuxiliaryCacheAttributes;
24 import org.apache.commons.jcs.auxiliary.lateral.LateralCacheAttributes;
25 import org.apache.commons.jcs.auxiliary.lateral.LateralCacheNoWait;
26 import org.apache.commons.jcs.auxiliary.lateral.LateralCacheNoWaitFacade;
27 import org.apache.commons.jcs.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
28 import org.apache.commons.jcs.engine.behavior.ICompositeCacheManager;
29 import org.apache.commons.jcs.utils.discovery.DiscoveredService;
30 import org.apache.commons.jcs.utils.discovery.behavior.IDiscoveryListener;
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33
34 import java.util.ArrayList;
35 import java.util.Collections;
36 import java.util.HashMap;
37 import java.util.HashSet;
38 import java.util.Map;
39 import java.util.Set;
40
41
42
43
44
45
46 public class LateralTCPDiscoveryListener
47 implements IDiscoveryListener
48 {
49
50 private static final Log log = LogFactory.getLog( LateralTCPDiscoveryListener.class );
51
52
53
54
55
56 private final Map<String, LateralCacheNoWaitFacade<?, ?>> facades =
57 Collections.synchronizedMap( new HashMap<String, LateralCacheNoWaitFacade<?, ?>>() );
58
59
60
61
62
63 private final Set<String> knownDifferentlyConfiguredRegions =
64 Collections.synchronizedSet( new HashSet<String>() );
65
66
67 private String factoryName;
68
69
70 private ICompositeCacheManager cacheManager;
71
72
73
74
75
76
77
78 protected LateralTCPDiscoveryListener( String factoryName, ICompositeCacheManager cacheManager )
79 {
80 this.factoryName = factoryName;
81 this.cacheManager = cacheManager;
82 }
83
84
85
86
87
88
89
90
91
92
93
94 public synchronized boolean addNoWaitFacade( String cacheName, LateralCacheNoWaitFacade<?, ?> facade )
95 {
96 boolean isNew = !containsNoWaitFacade( cacheName );
97
98
99 facades.put( cacheName, facade );
100 knownDifferentlyConfiguredRegions.remove( cacheName );
101
102 return isNew;
103 }
104
105
106
107
108
109
110
111 public boolean containsNoWaitFacade( String cacheName )
112 {
113 return facades.containsKey( cacheName );
114 }
115
116
117
118
119
120
121
122
123 public <K, V> boolean containsNoWait( String cacheName, LateralCacheNoWait<K, V> noWait )
124 {
125 @SuppressWarnings("unchecked")
126 LateralCacheNoWaitFacade<K, V> facade = (LateralCacheNoWaitFacade<K, V>)facades.get( noWait.getCacheName() );
127 if ( facade == null )
128 {
129 return false;
130 }
131
132 return facade.containsNoWait( noWait );
133 }
134
135
136
137
138
139
140
141
142
143
144
145
146
147 protected <K, V> boolean addNoWait( LateralCacheNoWait<K, V> noWait )
148 {
149 @SuppressWarnings("unchecked")
150 LateralCacheNoWaitFacade<K, V> facade = (LateralCacheNoWaitFacade<K, V>)facades.get( noWait.getCacheName() );
151 if ( log.isDebugEnabled() )
152 {
153 log.debug( "addNoWait > Got facade for " + noWait.getCacheName() + " = " + facade );
154 }
155
156 if ( facade != null )
157 {
158 boolean isNew = facade.addNoWait( noWait );
159 if ( log.isDebugEnabled() )
160 {
161 log.debug( "Called addNoWait, isNew = " + isNew );
162 }
163 return isNew;
164 }
165 else
166 {
167 if ( !knownDifferentlyConfiguredRegions.contains( noWait.getCacheName() ) )
168 {
169 if ( log.isInfoEnabled() )
170 {
171 log.info( "addNoWait > Different nodes are configured differently or region ["
172 + noWait.getCacheName() + "] is not yet used on this side. " );
173 }
174 knownDifferentlyConfiguredRegions.add( noWait.getCacheName() );
175 }
176 return false;
177 }
178 }
179
180
181
182
183
184
185
186
187 protected <K, V> boolean removeNoWait( LateralCacheNoWait<K, V> noWait )
188 {
189 @SuppressWarnings("unchecked")
190 LateralCacheNoWaitFacade<K, V> facade = (LateralCacheNoWaitFacade<K, V>)facades.get( noWait.getCacheName() );
191 if ( log.isDebugEnabled() )
192 {
193 log.debug( "removeNoWait > Got facade for " + noWait.getCacheName() + " = " + facade );
194 }
195
196 if ( facade != null )
197 {
198 boolean removed = facade.removeNoWait( noWait );
199 if ( log.isDebugEnabled() )
200 {
201 log.debug( "Called removeNoWait, removed " + removed );
202 }
203 return removed;
204 }
205 else
206 {
207 if ( !knownDifferentlyConfiguredRegions.contains( noWait.getCacheName() ) )
208 {
209 if ( log.isInfoEnabled() )
210 {
211 log.info( "removeNoWait > Different nodes are configured differently or region ["
212 + noWait.getCacheName() + "] is not yet used on this side. " );
213 }
214 knownDifferentlyConfiguredRegions.add( noWait.getCacheName() );
215 }
216 return false;
217 }
218 }
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236 @Override
237 public void addDiscoveredService( DiscoveredService service )
238 {
239
240
241
242 ArrayList<String> regions = service.getCacheNames();
243 String serverAndPort = service.getServiceAddress() + ":" + service.getServicePort();
244
245 if ( regions != null )
246 {
247
248 for (String cacheName : regions)
249 {
250 AuxiliaryCache<?, ?> ic = cacheManager.getAuxiliaryCache(factoryName, cacheName);
251
252 if ( log.isDebugEnabled() )
253 {
254 log.debug( "Got cache, ic = " + ic );
255 }
256
257
258 if ( ic != null )
259 {
260 AuxiliaryCacheAttributes aca = ic.getAuxiliaryCacheAttributes();
261 if (aca instanceof ITCPLateralCacheAttributes)
262 {
263 ITCPLateralCacheAttributes lca = (ITCPLateralCacheAttributes)aca;
264 if (lca.getTransmissionType() != LateralCacheAttributes.Type.TCP
265 || !serverAndPort.equals(lca.getTcpServer()) )
266 {
267
268 continue;
269 }
270 }
271
272 addNoWait( (LateralCacheNoWait<?, ?>) ic );
273 if ( log.isDebugEnabled() )
274 {
275 log.debug( "Called addNoWait for cacheName [" + cacheName + "]" );
276 }
277 }
278 }
279 }
280 else
281 {
282 log.warn( "No cache names found in message " + service );
283 }
284 }
285
286
287
288
289
290
291
292
293
294 @Override
295 public void removeDiscoveredService( DiscoveredService service )
296 {
297
298
299
300 ArrayList<String> regions = service.getCacheNames();
301 String serverAndPort = service.getServiceAddress() + ":" + service.getServicePort();
302
303 if ( regions != null )
304 {
305
306 for (String cacheName : regions)
307 {
308 AuxiliaryCache<?, ?> ic = cacheManager.getAuxiliaryCache(factoryName, cacheName);
309
310 if ( log.isDebugEnabled() )
311 {
312 log.debug( "Got cache, ic = " + ic );
313 }
314
315
316 if ( ic != null )
317 {
318 AuxiliaryCacheAttributes aca = ic.getAuxiliaryCacheAttributes();
319 if (aca instanceof ITCPLateralCacheAttributes)
320 {
321 ITCPLateralCacheAttributes lca = (ITCPLateralCacheAttributes)aca;
322 if (lca.getTransmissionType() != LateralCacheAttributes.Type.TCP
323 || !serverAndPort.equals(lca.getTcpServer()) )
324 {
325
326 continue;
327 }
328 }
329
330 removeNoWait( (LateralCacheNoWait<?, ?>) ic );
331 if ( log.isDebugEnabled() )
332 {
333 log.debug( "Called removeNoWait for cacheName [" + cacheName + "]" );
334 }
335 }
336 }
337 }
338 else
339 {
340 log.warn( "No cache names found in message " + service );
341 }
342 }
343 }