1 package org.apache.commons.jcs.utils.discovery;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.IOException;
23 import java.net.UnknownHostException;
24 import java.util.ArrayList;
25 import java.util.HashSet;
26 import java.util.Set;
27 import java.util.concurrent.CopyOnWriteArraySet;
28 import java.util.concurrent.ScheduledExecutorService;
29 import java.util.concurrent.TimeUnit;
30
31 import org.apache.commons.jcs.engine.behavior.IRequireScheduler;
32 import org.apache.commons.jcs.engine.behavior.IShutdownObserver;
33 import org.apache.commons.jcs.utils.discovery.behavior.IDiscoveryListener;
34 import org.apache.commons.jcs.utils.net.HostNameUtil;
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37
38
39
40
41
42
43
44
45
46
47 public class UDPDiscoveryService
48 implements IShutdownObserver, IRequireScheduler
49 {
50
51 private static final Log log = LogFactory.getLog( UDPDiscoveryService.class );
52
53
54 private Thread udpReceiverThread;
55
56
57 private UDPDiscoveryReceiver receiver;
58
59
60 private UDPDiscoverySenderThread sender = null;
61
62
63 private UDPDiscoveryAttributes udpDiscoveryAttributes = null;
64
65
66 private boolean shutdown = false;
67
68
69 private Set<DiscoveredService> discoveredServices = new CopyOnWriteArraySet<DiscoveredService>();
70
71
72 private final Set<String> cacheNames = new CopyOnWriteArraySet<String>();
73
74
75 private final Set<IDiscoveryListener> discoveryListeners = new CopyOnWriteArraySet<IDiscoveryListener>();
76
77
78
79
80 public UDPDiscoveryService( UDPDiscoveryAttributes attributes)
81 {
82 udpDiscoveryAttributes = (UDPDiscoveryAttributes) attributes.clone();
83
84 try
85 {
86
87 udpDiscoveryAttributes.setServiceAddress( HostNameUtil.getLocalHostAddress() );
88 }
89 catch ( UnknownHostException e )
90 {
91 log.error( "Couldn't get localhost address", e );
92 }
93
94 try
95 {
96
97 receiver = new UDPDiscoveryReceiver( this, getUdpDiscoveryAttributes().getUdpDiscoveryAddr(),
98 getUdpDiscoveryAttributes().getUdpDiscoveryPort() );
99 }
100 catch ( IOException e )
101 {
102 log.error( "Problem creating UDPDiscoveryReceiver, address ["
103 + getUdpDiscoveryAttributes().getUdpDiscoveryAddr() + "] port ["
104 + getUdpDiscoveryAttributes().getUdpDiscoveryPort() + "] we won't be able to find any other caches", e );
105 }
106
107
108 sender = new UDPDiscoverySenderThread( getUdpDiscoveryAttributes(), getCacheNames() );
109 }
110
111
112
113
114 @Override
115 public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutor)
116 {
117 if (sender != null)
118 {
119 scheduledExecutor.scheduleAtFixedRate(sender, 0, 15, TimeUnit.SECONDS);
120 }
121
122
123 UDPCleanupRunner cleanup = new UDPCleanupRunner( this );
124
125
126
127 scheduledExecutor.scheduleAtFixedRate(cleanup, 0, getUdpDiscoveryAttributes().getMaxIdleTimeSec(), TimeUnit.SECONDS);
128 }
129
130
131
132
133
134
135
136 protected void serviceRequestBroadcast()
137 {
138 UDPDiscoverySender sender1 = null;
139 try
140 {
141
142
143 sender1 = new UDPDiscoverySender( getUdpDiscoveryAttributes().getUdpDiscoveryAddr(),
144 getUdpDiscoveryAttributes().getUdpDiscoveryPort() );
145
146 sender1.passiveBroadcast( getUdpDiscoveryAttributes().getServiceAddress(), getUdpDiscoveryAttributes()
147 .getServicePort(), this.getCacheNames() );
148
149
150
151
152 if ( log.isDebugEnabled() )
153 {
154 log.debug( "Called sender to issue a passive broadcast" );
155 }
156 }
157 catch ( Exception e )
158 {
159 log.error( "Problem calling the UDP Discovery Sender. address ["
160 + getUdpDiscoveryAttributes().getUdpDiscoveryAddr() + "] port ["
161 + getUdpDiscoveryAttributes().getUdpDiscoveryPort() + "]", e );
162 }
163 finally
164 {
165 try
166 {
167 if ( sender1 != null )
168 {
169 sender1.destroy();
170 }
171 }
172 catch ( Exception e )
173 {
174 log.error( "Problem closing Passive Broadcast sender, while servicing a request broadcast.", e );
175 }
176 }
177 }
178
179
180
181
182
183
184 public void addParticipatingCacheName( String cacheName )
185 {
186 cacheNames.add( cacheName );
187 sender.setCacheNames( getCacheNames() );
188 }
189
190
191
192
193
194
195 public void removeDiscoveredService( DiscoveredService service )
196 {
197 boolean contained = getDiscoveredServices().remove( service );
198
199 if ( contained )
200 {
201 if ( log.isInfoEnabled() )
202 {
203 log.info( "Removing " + service );
204 }
205 }
206
207 for (IDiscoveryListener listener : getDiscoveryListeners())
208 {
209 listener.removeDiscoveredService( service );
210 }
211 }
212
213
214
215
216
217
218 protected void addOrUpdateService( DiscoveredService discoveredService )
219 {
220 synchronized ( getDiscoveredServices() )
221 {
222
223
224
225 if ( !getDiscoveredServices().contains( discoveredService ) )
226 {
227 if ( log.isInfoEnabled() )
228 {
229 log.info( "Set does not contain service. I discovered " + discoveredService );
230 }
231 if ( log.isDebugEnabled() )
232 {
233 log.debug( "Adding service in the set " + discoveredService );
234 }
235 getDiscoveredServices().add( discoveredService );
236 }
237 else
238 {
239 if ( log.isDebugEnabled() )
240 {
241 log.debug( "Set contains service." );
242 }
243 if ( log.isDebugEnabled() )
244 {
245 log.debug( "Updating service in the set " + discoveredService );
246 }
247
248
249 DiscoveredService theOldServiceInformation = null;
250
251 for (DiscoveredService service1 : getDiscoveredServices())
252 {
253 if ( discoveredService.equals( service1 ) )
254 {
255 theOldServiceInformation = service1;
256 break;
257 }
258 }
259 if ( theOldServiceInformation != null )
260 {
261 if ( !theOldServiceInformation.getCacheNames().equals( discoveredService.getCacheNames() ) )
262 {
263 if ( log.isInfoEnabled() )
264 {
265 log.info( "List of cache names changed for service: " + discoveredService );
266 }
267 }
268 }
269
270
271 getDiscoveredServices().remove( discoveredService );
272 getDiscoveredServices().add( discoveredService );
273 }
274 }
275
276
277
278
279 for (IDiscoveryListener listener : getDiscoveryListeners())
280 {
281 listener.addDiscoveredService( discoveredService );
282 }
283
284 }
285
286
287
288
289
290
291 protected ArrayList<String> getCacheNames()
292 {
293 ArrayList<String> names = new ArrayList<String>();
294 names.addAll( cacheNames );
295 return names;
296 }
297
298
299
300
301 public void setUdpDiscoveryAttributes( UDPDiscoveryAttributes attr )
302 {
303 this.udpDiscoveryAttributes = attr;
304 }
305
306
307
308
309 public UDPDiscoveryAttributes getUdpDiscoveryAttributes()
310 {
311 return this.udpDiscoveryAttributes;
312 }
313
314
315
316
317 public void startup()
318 {
319 udpReceiverThread = new Thread( receiver );
320 udpReceiverThread.setDaemon( true );
321
322 udpReceiverThread.start();
323 }
324
325
326
327
328 @Override
329 public void shutdown()
330 {
331 if ( !shutdown )
332 {
333 shutdown = true;
334
335 if ( log.isInfoEnabled() )
336 {
337 log.info( "Shutting down UDP discovery service receiver." );
338 }
339
340 try
341 {
342
343 receiver.shutdown();
344 udpReceiverThread.interrupt();
345 }
346 catch ( Exception e )
347 {
348 log.error( "Problem interrupting UDP receiver thread." );
349 }
350
351 if ( log.isInfoEnabled() )
352 {
353 log.info( "Shutting down UDP discovery service sender." );
354 }
355
356
357
358 try
359 {
360 sender.shutdown();
361 }
362 catch ( Exception e )
363 {
364 log.error( "Problem issuing remove broadcast via UDP sender." );
365 }
366 }
367 else
368 {
369 if ( log.isDebugEnabled() )
370 {
371 log.debug( "Shutdown already called." );
372 }
373 }
374 }
375
376
377
378
379 public synchronized Set<DiscoveredService> getDiscoveredServices()
380 {
381 return discoveredServices;
382 }
383
384
385
386
387 private Set<IDiscoveryListener> getDiscoveryListeners()
388 {
389 return discoveryListeners;
390 }
391
392
393
394
395 public Set<IDiscoveryListener> getCopyOfDiscoveryListeners()
396 {
397 Set<IDiscoveryListener> copy = new HashSet<IDiscoveryListener>();
398 copy.addAll( getDiscoveryListeners() );
399 return copy;
400 }
401
402
403
404
405
406
407
408 public boolean addDiscoveryListener( IDiscoveryListener listener )
409 {
410 return getDiscoveryListeners().add( listener );
411 }
412
413
414
415
416
417
418
419 public boolean removeDiscoveryListener( IDiscoveryListener listener )
420 {
421 return getDiscoveryListeners().remove( listener );
422 }
423 }