001package org.apache.commons.jcs.utils.discovery;
002
003/*
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements.  See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership.  The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License.  You may obtain a copy of the License at
011 *
012 *   http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing,
015 * software distributed under the License is distributed on an
016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017 * KIND, either express or implied.  See the License for the
018 * specific language governing permissions and limitations
019 * under the License.
020 */
021
022import java.io.IOException;
023import java.net.UnknownHostException;
024import java.util.ArrayList;
025import java.util.HashSet;
026import java.util.Set;
027import java.util.concurrent.CopyOnWriteArraySet;
028import java.util.concurrent.ScheduledExecutorService;
029import java.util.concurrent.TimeUnit;
030
031import org.apache.commons.jcs.engine.behavior.IRequireScheduler;
032import org.apache.commons.jcs.engine.behavior.IShutdownObserver;
033import org.apache.commons.jcs.utils.discovery.behavior.IDiscoveryListener;
034import org.apache.commons.jcs.utils.net.HostNameUtil;
035import org.apache.commons.logging.Log;
036import org.apache.commons.logging.LogFactory;
037
038/**
039 * This service creates a listener that can create lateral caches and add them to the no wait list.
040 * <p>
041 * It also creates a sender that periodically broadcasts its availability.
042 * <p>
043 * The sender also broadcasts a request for other caches to broadcast their addresses.
044 * <p>
045 * @author Aaron Smuts
046 */
047public class UDPDiscoveryService
048    implements IShutdownObserver, IRequireScheduler
049{
050    /** The logger */
051    private static final Log log = LogFactory.getLog( UDPDiscoveryService.class );
052
053    /** thread that listens for messages */
054    private Thread udpReceiverThread;
055
056    /** the runnable that the receiver thread runs */
057    private UDPDiscoveryReceiver receiver;
058
059    /** the runnable that sends messages via the clock daemon */
060    private UDPDiscoverySenderThread sender = null;
061
062    /** attributes */
063    private UDPDiscoveryAttributes udpDiscoveryAttributes = null;
064
065    /** is this shut down? */
066    private boolean shutdown = false;
067
068    /** This is a set of services that have been discovered. */
069    private Set<DiscoveredService> discoveredServices = new CopyOnWriteArraySet<DiscoveredService>();
070
071    /** This a list of regions that are configured to use discovery. */
072    private final Set<String> cacheNames = new CopyOnWriteArraySet<String>();
073
074    /** Set of listeners. */
075    private final Set<IDiscoveryListener> discoveryListeners = new CopyOnWriteArraySet<IDiscoveryListener>();
076
077    /**
078     * @param attributes
079     */
080    public UDPDiscoveryService( UDPDiscoveryAttributes attributes)
081    {
082        udpDiscoveryAttributes = (UDPDiscoveryAttributes) attributes.clone();
083
084        try
085        {
086            // todo, you should be able to set this
087            udpDiscoveryAttributes.setServiceAddress( HostNameUtil.getLocalHostAddress() );
088        }
089        catch ( UnknownHostException e )
090        {
091            log.error( "Couldn't get localhost address", e );
092        }
093
094        try
095        {
096            // todo need some kind of recovery here.
097            receiver = new UDPDiscoveryReceiver( this, getUdpDiscoveryAttributes().getUdpDiscoveryAddr(),
098                                                 getUdpDiscoveryAttributes().getUdpDiscoveryPort() );
099        }
100        catch ( IOException e )
101        {
102            log.error( "Problem creating UDPDiscoveryReceiver, address ["
103                + getUdpDiscoveryAttributes().getUdpDiscoveryAddr() + "] port ["
104                + getUdpDiscoveryAttributes().getUdpDiscoveryPort() + "] we won't be able to find any other caches", e );
105        }
106
107        // create a sender thread
108        sender = new UDPDiscoverySenderThread( getUdpDiscoveryAttributes(), getCacheNames() );
109    }
110
111    /**
112     * @see org.apache.commons.jcs.engine.behavior.IRequireScheduler#setScheduledExecutorService(java.util.concurrent.ScheduledExecutorService)
113     */
114    @Override
115    public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutor)
116    {
117        if (sender != null)
118        {
119            scheduledExecutor.scheduleAtFixedRate(sender, 0, 15, TimeUnit.SECONDS);
120        }
121
122        /** removes things that have been idle for too long */
123        UDPCleanupRunner cleanup = new UDPCleanupRunner( this );
124        // I'm going to use this as both, but it could happen
125        // that something could hang around twice the time using this as the
126        // delay and the idle time.
127        scheduledExecutor.scheduleAtFixedRate(cleanup, 0, getUdpDiscoveryAttributes().getMaxIdleTimeSec(), TimeUnit.SECONDS);
128    }
129
130    /**
131     * Send a passive broadcast in response to a request broadcast. Never send a request for a
132     * request. We can respond to our own requests, since a request broadcast is not intended as a
133     * connection request. We might want to only send messages, so we would send a request, but
134     * never a passive broadcast.
135     */
136    protected void serviceRequestBroadcast()
137    {
138        UDPDiscoverySender sender1 = null;
139        try
140        {
141            // create this connection each time.
142            // more robust
143            sender1 = new UDPDiscoverySender( getUdpDiscoveryAttributes().getUdpDiscoveryAddr(),
144                                             getUdpDiscoveryAttributes().getUdpDiscoveryPort() );
145
146            sender1.passiveBroadcast( getUdpDiscoveryAttributes().getServiceAddress(), getUdpDiscoveryAttributes()
147                .getServicePort(), this.getCacheNames() );
148
149            // todo we should consider sending a request broadcast every so
150            // often.
151
152            if ( log.isDebugEnabled() )
153            {
154                log.debug( "Called sender to issue a passive broadcast" );
155            }
156        }
157        catch ( Exception e )
158        {
159            log.error( "Problem calling the UDP Discovery Sender. address ["
160                + getUdpDiscoveryAttributes().getUdpDiscoveryAddr() + "] port ["
161                + getUdpDiscoveryAttributes().getUdpDiscoveryPort() + "]", e );
162        }
163        finally
164        {
165            try
166            {
167                if ( sender1 != null )
168                {
169                    sender1.destroy();
170                }
171            }
172            catch ( Exception e )
173            {
174                log.error( "Problem closing Passive Broadcast sender, while servicing a request broadcast.", e );
175            }
176        }
177    }
178
179    /**
180     * Adds a region to the list that is participating in discovery.
181     * <p>
182     * @param cacheName
183     */
184    public void addParticipatingCacheName( String cacheName )
185    {
186        cacheNames.add( cacheName );
187        sender.setCacheNames( getCacheNames() );
188    }
189
190    /**
191     * Removes the discovered service from the list and calls the discovery listener.
192     * <p>
193     * @param service
194     */
195    public void removeDiscoveredService( DiscoveredService service )
196    {
197        boolean contained = getDiscoveredServices().remove( service );
198
199        if ( contained )
200        {
201            if ( log.isInfoEnabled() )
202            {
203                log.info( "Removing " + service );
204            }
205        }
206
207        for (IDiscoveryListener listener : getDiscoveryListeners())
208        {
209            listener.removeDiscoveredService( service );
210        }
211    }
212
213    /**
214     * Add a service to the list. Update the held copy if we already know about it.
215     * <p>
216     * @param discoveredService discovered service
217     */
218    protected void addOrUpdateService( DiscoveredService discoveredService )
219    {
220        synchronized ( getDiscoveredServices() )
221        {
222            // Since this is a set we can add it over an over.
223            // We want to replace the old one, since we may add info that is not part of the equals.
224            // The equals method on the object being added is intentionally restricted.
225            if ( !getDiscoveredServices().contains( discoveredService ) )
226            {
227                if ( log.isInfoEnabled() )
228                {
229                    log.info( "Set does not contain service. I discovered " + discoveredService );
230                }
231                if ( log.isDebugEnabled() )
232                {
233                    log.debug( "Adding service in the set " + discoveredService );
234                }
235                getDiscoveredServices().add( discoveredService );
236            }
237            else
238            {
239                if ( log.isDebugEnabled() )
240                {
241                    log.debug( "Set contains service." );
242                }
243                if ( log.isDebugEnabled() )
244                {
245                    log.debug( "Updating service in the set " + discoveredService );
246                }
247
248                // Update the list of cache names if it has changed.
249                DiscoveredService theOldServiceInformation = null;
250                // need to update the time this sucks. add has no effect convert to a map
251                for (DiscoveredService service1 : getDiscoveredServices())
252                {
253                    if ( discoveredService.equals( service1 ) )
254                    {
255                        theOldServiceInformation = service1;
256                        break;
257                    }
258                }
259                if ( theOldServiceInformation != null )
260                {
261                    if ( !theOldServiceInformation.getCacheNames().equals( discoveredService.getCacheNames() ) )
262                    {
263                        if ( log.isInfoEnabled() )
264                        {
265                            log.info( "List of cache names changed for service: " + discoveredService );
266                        }
267                    }
268                }
269
270                // replace it, we want to reset the payload and the last heard from time.
271                getDiscoveredServices().remove( discoveredService );
272                getDiscoveredServices().add( discoveredService );
273            }
274        }
275        // Always Notify the listeners
276        // If we don't do this, then if a region using the default config is initialized after notification,
277        // it will never get the service in it's no wait list.
278        // Leave it to the listeners to decide what to do.
279        for (IDiscoveryListener listener : getDiscoveryListeners())
280        {
281            listener.addDiscoveredService( discoveredService );
282        }
283
284    }
285
286    /**
287     * Get all the cache names we have facades for.
288     * <p>
289     * @return ArrayList
290     */
291    protected ArrayList<String> getCacheNames()
292    {
293        ArrayList<String> names = new ArrayList<String>();
294        names.addAll( cacheNames );
295        return names;
296    }
297
298    /**
299     * @param attr The UDPDiscoveryAttributes to set.
300     */
301    public void setUdpDiscoveryAttributes( UDPDiscoveryAttributes attr )
302    {
303        this.udpDiscoveryAttributes = attr;
304    }
305
306    /**
307     * @return Returns the lca.
308     */
309    public UDPDiscoveryAttributes getUdpDiscoveryAttributes()
310    {
311        return this.udpDiscoveryAttributes;
312    }
313
314    /**
315     * Start necessary receiver thread
316     */
317    public void startup()
318    {
319        udpReceiverThread = new Thread( receiver );
320        udpReceiverThread.setDaemon( true );
321        // udpReceiverThread.setName( t.getName() + "--UDPReceiver" );
322        udpReceiverThread.start();
323    }
324
325    /**
326     * Shuts down the receiver.
327     */
328    @Override
329    public void shutdown()
330    {
331        if ( !shutdown )
332        {
333            shutdown = true;
334
335            if ( log.isInfoEnabled() )
336            {
337                log.info( "Shutting down UDP discovery service receiver." );
338            }
339
340            try
341            {
342                // no good way to do this right now.
343                receiver.shutdown();
344                udpReceiverThread.interrupt();
345            }
346            catch ( Exception e )
347            {
348                log.error( "Problem interrupting UDP receiver thread." );
349            }
350
351            if ( log.isInfoEnabled() )
352            {
353                log.info( "Shutting down UDP discovery service sender." );
354            }
355
356            // also call the shutdown on the sender thread itself, which
357            // will result in a remove command.
358            try
359            {
360                sender.shutdown();
361            }
362            catch ( Exception e )
363            {
364                log.error( "Problem issuing remove broadcast via UDP sender." );
365            }
366        }
367        else
368        {
369            if ( log.isDebugEnabled() )
370            {
371                log.debug( "Shutdown already called." );
372            }
373        }
374    }
375
376    /**
377     * @return Returns the discoveredServices.
378     */
379    public synchronized Set<DiscoveredService> getDiscoveredServices()
380    {
381        return discoveredServices;
382    }
383
384    /**
385     * @return the discoveryListeners
386     */
387    private Set<IDiscoveryListener> getDiscoveryListeners()
388    {
389        return discoveryListeners;
390    }
391
392    /**
393     * @return the discoveryListeners
394     */
395    public Set<IDiscoveryListener> getCopyOfDiscoveryListeners()
396    {
397        Set<IDiscoveryListener> copy = new HashSet<IDiscoveryListener>();
398        copy.addAll( getDiscoveryListeners() );
399        return copy;
400    }
401
402    /**
403     * Adds a listener.
404     * <p>
405     * @param listener
406     * @return true if it wasn't already in the set
407     */
408    public boolean addDiscoveryListener( IDiscoveryListener listener )
409    {
410        return getDiscoveryListeners().add( listener );
411    }
412
413    /**
414     * Removes a listener.
415     * <p>
416     * @param listener
417     * @return true if it was in the set
418     */
419    public boolean removeDiscoveryListener( IDiscoveryListener listener )
420    {
421        return getDiscoveryListeners().remove( listener );
422    }
423}