1 package org.apache.commons.jcs3.auxiliary.lateral.socket.tcp;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.ArrayList;
23 import java.util.concurrent.ConcurrentHashMap;
24 import java.util.concurrent.ConcurrentMap;
25 import java.util.concurrent.CopyOnWriteArrayList;
26
27 import org.apache.commons.jcs3.auxiliary.lateral.LateralCacheNoWait;
28 import org.apache.commons.jcs3.auxiliary.lateral.LateralCacheNoWaitFacade;
29 import org.apache.commons.jcs3.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
30 import org.apache.commons.jcs3.engine.behavior.ICompositeCacheManager;
31 import org.apache.commons.jcs3.engine.behavior.IElementSerializer;
32 import org.apache.commons.jcs3.engine.control.CompositeCacheManager;
33 import org.apache.commons.jcs3.engine.logging.behavior.ICacheEventLogger;
34 import org.apache.commons.jcs3.log.Log;
35 import org.apache.commons.jcs3.log.LogManager;
36 import org.apache.commons.jcs3.utils.discovery.DiscoveredService;
37 import org.apache.commons.jcs3.utils.discovery.behavior.IDiscoveryListener;
38
39
40
41
42
43
44 public class LateralTCPDiscoveryListener
45 implements IDiscoveryListener
46 {
47
48 private static final Log log = LogManager.getLog( LateralTCPDiscoveryListener.class );
49
50
51
52
53
54 private final ConcurrentMap<String, LateralCacheNoWaitFacade<?, ?>> facades =
55 new ConcurrentHashMap<>();
56
57
58
59
60
61 private final CopyOnWriteArrayList<String> knownDifferentlyConfiguredRegions =
62 new CopyOnWriteArrayList<>();
63
64
65 private final String factoryName;
66
67
68 private final CompositeCacheManager cacheManager;
69
70
71 private final ICacheEventLogger cacheEventLogger;
72
73
74 private final IElementSerializer elementSerializer;
75
76
77
78
79
80
81
82
83 @Deprecated
84 protected LateralTCPDiscoveryListener( final String factoryName, final ICompositeCacheManager cacheManager )
85 {
86 this(factoryName, (CompositeCacheManager) cacheManager, null, null);
87 }
88
89
90
91
92
93
94
95
96
97
98
99 protected LateralTCPDiscoveryListener( final String factoryName,
100 final CompositeCacheManager cacheManager,
101 final ICacheEventLogger cacheEventLogger,
102 final IElementSerializer elementSerializer)
103 {
104 this.factoryName = factoryName;
105 this.cacheManager = cacheManager;
106 this.cacheEventLogger = cacheEventLogger;
107 this.elementSerializer = elementSerializer;
108 }
109
110
111
112
113
114
115
116
117
118
119
120 public boolean addNoWaitFacade( final String cacheName, final LateralCacheNoWaitFacade<?, ?> facade )
121 {
122 final boolean isNew = !containsNoWaitFacade( cacheName );
123
124
125 facades.put( cacheName, facade );
126 knownDifferentlyConfiguredRegions.remove( cacheName );
127
128 return isNew;
129 }
130
131
132
133
134
135
136
137 public boolean containsNoWaitFacade( final String cacheName )
138 {
139 return facades.containsKey( cacheName );
140 }
141
142
143
144
145
146
147
148
149 public <K, V> boolean containsNoWait( final String cacheName, final LateralCacheNoWait<K, V> noWait )
150 {
151 @SuppressWarnings("unchecked")
152 final
153 LateralCacheNoWaitFacade<K, V> facade =
154 (LateralCacheNoWaitFacade<K, V>)facades.get( noWait.getCacheName() );
155
156 if ( facade == null )
157 {
158 return false;
159 }
160
161 return facade.containsNoWait( noWait );
162 }
163
164
165
166
167
168
169
170
171
172
173
174
175
176 protected <K, V> boolean addNoWait( final LateralCacheNoWait<K, V> noWait )
177 {
178 @SuppressWarnings("unchecked")
179 final
180 LateralCacheNoWaitFacade<K, V> facade =
181 (LateralCacheNoWaitFacade<K, V>)facades.get( noWait.getCacheName() );
182 log.debug( "addNoWait > Got facade for {0} = {1}", noWait.getCacheName(), facade );
183
184 return addNoWait(noWait, facade);
185 }
186
187
188
189
190
191
192
193
194
195
196
197 protected <K, V> boolean addNoWait(final LateralCacheNoWait<K, V> noWait,
198 final LateralCacheNoWaitFacade<K, V> facade)
199 {
200 if ( facade != null )
201 {
202 final boolean isNew = facade.addNoWait( noWait );
203 log.debug( "Called addNoWait, isNew = {0}", isNew );
204 return isNew;
205 }
206 if ( knownDifferentlyConfiguredRegions.addIfAbsent( noWait.getCacheName() ) )
207 {
208 log.info( "addNoWait > Different nodes are configured differently "
209 + "or region [{0}] is not yet used on this side.",
210 noWait::getCacheName);
211 }
212 return false;
213 }
214
215
216
217
218
219
220
221
222 protected <K, V> boolean removeNoWait( final LateralCacheNoWait<K, V> noWait )
223 {
224 @SuppressWarnings("unchecked")
225 final
226 LateralCacheNoWaitFacade<K, V> facade =
227 (LateralCacheNoWaitFacade<K, V>)facades.get( noWait.getCacheName() );
228 log.debug( "removeNoWait > Got facade for {0} = {1}", noWait.getCacheName(), facade);
229
230 return removeNoWait(facade, noWait.getCacheName(), noWait.getIdentityKey());
231 }
232
233
234
235
236
237
238
239
240
241
242 protected <K, V> boolean removeNoWait(final LateralCacheNoWaitFacade<K, V> facade,
243 final String cacheName, final String tcpServer)
244 {
245 if ( facade != null )
246 {
247 final boolean removed = facade.removeNoWait(tcpServer);
248 log.debug( "Called removeNoWait, removed {0}", removed );
249 return removed;
250 }
251 if (knownDifferentlyConfiguredRegions.addIfAbsent(cacheName))
252 {
253 log.info( "addNoWait > Different nodes are configured differently "
254 + "or region [{0}] is not yet used on this side.",
255 cacheName);
256 }
257 return false;
258 }
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276 @Override
277 public void addDiscoveredService( final DiscoveredService service )
278 {
279
280
281
282 final ArrayList<String> regions = service.getCacheNames();
283 final String serverAndPort = service.getServiceAddress() + ":" + service.getServicePort();
284
285 if ( regions != null )
286 {
287
288 for (final String cacheName : regions)
289 {
290 final LateralCacheNoWaitFacade<?, ?> facade = facades.get(cacheName);
291 log.debug( "Got cache facade {0}", facade );
292
293
294 if (facade != null)
295 {
296
297 if (facade.containsNoWait(serverAndPort))
298 {
299 continue;
300 }
301
302 final ITCPLateralCacheAttributes lca =
303 (ITCPLateralCacheAttributes) facade.getAuxiliaryCacheAttributes().clone();
304 lca.setTcpServer(serverAndPort);
305
306 LateralTCPCacheFactory factory =
307 (LateralTCPCacheFactory) cacheManager.registryFacGet(factoryName);
308
309 LateralCacheNoWait<?, ?> noWait =
310 factory.createCacheNoWait(lca, cacheEventLogger, elementSerializer);
311 factory.monitorCache(noWait);
312
313 if (addNoWait(noWait))
314 {
315 log.debug("Added NoWait for cacheName [{0}] at {1}", cacheName, serverAndPort);
316 }
317 }
318 }
319 }
320 else
321 {
322 log.warn( "No cache names found in message {0}", service );
323 }
324 }
325
326
327
328
329
330
331
332
333
334 @Override
335 public void removeDiscoveredService( final DiscoveredService service )
336 {
337
338
339
340 final ArrayList<String> regions = service.getCacheNames();
341 final String serverAndPort = service.getServiceAddress() + ":" + service.getServicePort();
342
343 if ( regions != null )
344 {
345
346 for (final String cacheName : regions)
347 {
348 final LateralCacheNoWaitFacade<?, ?> facade = facades.get(cacheName);
349 log.debug( "Got cache facade {0}", facade );
350
351
352 if (facade != null && removeNoWait(facade, cacheName, serverAndPort))
353 {
354 log.debug("Removed NoWait for cacheName [{0}] at {1}", cacheName, serverAndPort);
355 }
356 }
357 }
358 else
359 {
360 log.warn( "No cache names found in message {0}", service );
361 }
362 }
363 }