1 package org.apache.jcs.utils.discovery;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.net.UnknownHostException;
23 import java.util.ArrayList;
24 import java.util.HashSet;
25 import java.util.Set;
26 import java.util.concurrent.CopyOnWriteArraySet;
27 import java.util.concurrent.ScheduledExecutorService;
28 import java.util.concurrent.TimeUnit;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.jcs.engine.behavior.IRequireScheduler;
33 import org.apache.jcs.engine.behavior.IShutdownObserver;
34 import org.apache.jcs.engine.logging.behavior.ICacheEventLogger;
35 import org.apache.jcs.utils.discovery.behavior.IDiscoveryListener;
36 import org.apache.jcs.utils.net.HostNameUtil;
37
38
39
40
41
42
43
44
45
46
47 public class UDPDiscoveryService
48 implements IShutdownObserver, IRequireScheduler
49 {
50
51 private final static Log log = LogFactory.getLog( UDPDiscoveryService.class );
52
53
54 private Thread udpReceiverThread;
55
56
57 private UDPDiscoveryReceiver receiver;
58
59
60 private UDPDiscoverySenderThread sender = null;
61
62
63 private UDPDiscoveryAttributes udpDiscoveryAttributes = null;
64
65
66 private boolean shutdown = false;
67
68
69 private Set<DiscoveredService> discoveredServices = new CopyOnWriteArraySet<DiscoveredService>();
70
71
72 private final Set<String> cacheNames = new CopyOnWriteArraySet<String>();
73
74
75 private final Set<IDiscoveryListener> discoveryListeners = new CopyOnWriteArraySet<IDiscoveryListener>();
76
77
78
79
80
81 public UDPDiscoveryService( UDPDiscoveryAttributes attributes, ICacheEventLogger cacheEventLogger )
82 {
83 udpDiscoveryAttributes = (UDPDiscoveryAttributes) attributes.clone();
84
85 try
86 {
87
88 udpDiscoveryAttributes.setServiceAddress( HostNameUtil.getLocalHostAddress() );
89 }
90 catch ( UnknownHostException e1 )
91 {
92 log.error( "Couldn't get localhost address", e1 );
93 }
94
95 try
96 {
97
98 receiver = new UDPDiscoveryReceiver( this, getUdpDiscoveryAttributes().getUdpDiscoveryAddr(),
99 getUdpDiscoveryAttributes().getUdpDiscoveryPort() );
100 udpReceiverThread = new Thread( receiver );
101 udpReceiverThread.setDaemon( true );
102
103 udpReceiverThread.start();
104 }
105 catch ( Exception e )
106 {
107 log.error( "Problem creating UDPDiscoveryReceiver, address ["
108 + getUdpDiscoveryAttributes().getUdpDiscoveryAddr() + "] port ["
109 + getUdpDiscoveryAttributes().getUdpDiscoveryPort() + "] we won't be able to find any other caches", e );
110 }
111
112
113 sender = new UDPDiscoverySenderThread( getUdpDiscoveryAttributes(), getCacheNames() );
114 }
115
116
117
118
119 public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutor)
120 {
121 if (sender != null)
122 {
123 scheduledExecutor.scheduleAtFixedRate(sender, 0, 15, TimeUnit.SECONDS);
124 }
125
126
127 UDPCleanupRunner cleanup = new UDPCleanupRunner( this );
128
129
130
131 scheduledExecutor.scheduleAtFixedRate(cleanup, 0, getUdpDiscoveryAttributes().getMaxIdleTimeSec(), TimeUnit.SECONDS);
132 }
133
134
135
136
137
138
139
140 protected void serviceRequestBroadcast()
141 {
142 UDPDiscoverySender sender = null;
143 try
144 {
145
146
147 sender = new UDPDiscoverySender( getUdpDiscoveryAttributes().getUdpDiscoveryAddr(),
148 getUdpDiscoveryAttributes().getUdpDiscoveryPort() );
149
150 sender.passiveBroadcast( getUdpDiscoveryAttributes().getServiceAddress(), getUdpDiscoveryAttributes()
151 .getServicePort(), this.getCacheNames() );
152
153
154
155
156 if ( log.isDebugEnabled() )
157 {
158 log.debug( "Called sender to issue a passive broadcast" );
159 }
160 }
161 catch ( Exception e )
162 {
163 log.error( "Problem calling the UDP Discovery Sender. address ["
164 + getUdpDiscoveryAttributes().getUdpDiscoveryAddr() + "] port ["
165 + getUdpDiscoveryAttributes().getUdpDiscoveryPort() + "]", e );
166 }
167 finally
168 {
169 try
170 {
171 if ( sender != null )
172 {
173 sender.destroy();
174 }
175 }
176 catch ( Exception e )
177 {
178 log.error( "Problem closing Passive Broadcast sender, while servicing a request broadcast.", e );
179 }
180 }
181 }
182
183
184
185
186
187
188 public void addParticipatingCacheName( String cacheName )
189 {
190 cacheNames.add( cacheName );
191 sender.setCacheNames( getCacheNames() );
192 }
193
194
195
196
197
198
199 public void removeDiscoveredService( DiscoveredService service )
200 {
201 boolean contained = getDiscoveredServices().remove( service );
202
203 if ( contained )
204 {
205 if ( log.isInfoEnabled() )
206 {
207 log.info( "Removing " + service );
208 }
209 }
210
211 for (IDiscoveryListener listener : getDiscoveryListeners())
212 {
213 listener.removeDiscoveredService( service );
214 }
215 }
216
217
218
219
220
221
222 protected void addOrUpdateService( DiscoveredService discoveredService )
223 {
224 synchronized ( getDiscoveredServices() )
225 {
226
227
228
229 if ( !getDiscoveredServices().contains( discoveredService ) )
230 {
231 if ( log.isInfoEnabled() )
232 {
233 log.info( "Set does not contain service. I discovered " + discoveredService );
234 }
235 if ( log.isDebugEnabled() )
236 {
237 log.debug( "Adding service in the set " + discoveredService );
238 }
239 getDiscoveredServices().add( discoveredService );
240 }
241 else
242 {
243 if ( log.isDebugEnabled() )
244 {
245 log.debug( "Set contains service." );
246 }
247 if ( log.isDebugEnabled() )
248 {
249 log.debug( "Updating service in the set " + discoveredService );
250 }
251
252
253 DiscoveredService theOldServiceInformation = null;
254
255 for (DiscoveredService service1 : getDiscoveredServices())
256 {
257 if ( discoveredService.equals( service1 ) )
258 {
259 theOldServiceInformation = service1;
260 break;
261 }
262 }
263 if ( theOldServiceInformation != null )
264 {
265 if ( !theOldServiceInformation.getCacheNames().equals( discoveredService.getCacheNames() ) )
266 {
267 if ( log.isInfoEnabled() )
268 {
269 log.info( "List of cache names changed for service: " + discoveredService );
270 }
271 }
272 }
273
274
275 getDiscoveredServices().remove( discoveredService );
276 getDiscoveredServices().add( discoveredService );
277 }
278 }
279
280
281
282
283 for (IDiscoveryListener listener : getDiscoveryListeners())
284 {
285 listener.addDiscoveredService( discoveredService );
286 }
287
288 }
289
290
291
292
293
294
295 protected ArrayList<String> getCacheNames()
296 {
297 ArrayList<String> names = new ArrayList<String>();
298 names.addAll( cacheNames );
299 return names;
300 }
301
302
303
304
305 public void setUdpDiscoveryAttributes( UDPDiscoveryAttributes attr )
306 {
307 this.udpDiscoveryAttributes = attr;
308 }
309
310
311
312
313 public UDPDiscoveryAttributes getUdpDiscoveryAttributes()
314 {
315 return this.udpDiscoveryAttributes;
316 }
317
318
319
320
321 public void shutdown()
322 {
323 if ( !shutdown )
324 {
325 shutdown = true;
326
327 if ( log.isInfoEnabled() )
328 {
329 log.info( "Shutting down UDP discovery service receiver." );
330 }
331
332 try
333 {
334
335 receiver.shutdown();
336 udpReceiverThread.interrupt();
337 }
338 catch ( Exception e )
339 {
340 log.error( "Problem interrupting UDP receiver thread." );
341 }
342
343 if ( log.isInfoEnabled() )
344 {
345 log.info( "Shutting down UDP discovery service sender." );
346 }
347
348
349
350 try
351 {
352 sender.shutdown();
353 }
354 catch ( Exception e )
355 {
356 log.error( "Problem issuing remove broadcast via UDP sender." );
357 }
358 }
359 else
360 {
361 if ( log.isDebugEnabled() )
362 {
363 log.debug( "Shutdown already called." );
364 }
365 }
366 }
367
368
369
370
371
372
373 @Override
374 protected void finalize()
375 throws Throwable
376 {
377 super.finalize();
378
379
380 shutdown();
381 }
382
383
384
385
386 public synchronized void setDiscoveredServices( Set<DiscoveredService> discoveredServices )
387 {
388 this.discoveredServices = discoveredServices;
389 }
390
391
392
393
394 public synchronized Set<DiscoveredService> getDiscoveredServices()
395 {
396 return discoveredServices;
397 }
398
399
400
401
402 private Set<IDiscoveryListener> getDiscoveryListeners()
403 {
404 return discoveryListeners;
405 }
406
407
408
409
410 public Set<IDiscoveryListener> getCopyOfDiscoveryListeners()
411 {
412 Set<IDiscoveryListener> copy = new HashSet<IDiscoveryListener>();
413 copy.addAll( getDiscoveryListeners() );
414 return copy;
415 }
416
417
418
419
420
421
422
423 public boolean addDiscoveryListener( IDiscoveryListener listener )
424 {
425 return getDiscoveryListeners().add( listener );
426 }
427
428
429
430
431
432
433
434 public boolean removeDiscoveryListener( IDiscoveryListener listener )
435 {
436 return getDiscoveryListeners().remove( listener );
437 }
438 }