001package org.apache.commons.jcs3.utils.discovery;
002
003/*
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements.  See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership.  The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License.  You may obtain a copy of the License at
011 *
012 *   http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing,
015 * software distributed under the License is distributed on an
016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017 * KIND, either express or implied.  See the License for the
018 * specific language governing permissions and limitations
019 * under the License.
020 */
021
022import java.util.concurrent.ConcurrentHashMap;
023import java.util.concurrent.ConcurrentMap;
024
025import org.apache.commons.jcs3.engine.behavior.ICompositeCacheManager;
026import org.apache.commons.jcs3.engine.behavior.IElementSerializer;
027import org.apache.commons.jcs3.engine.behavior.IProvideScheduler;
028import org.apache.commons.jcs3.log.Log;
029import org.apache.commons.jcs3.log.LogManager;
030import org.apache.commons.jcs3.utils.serialization.StandardSerializer;
031
032/**
033 * This manages UDPDiscovery Services. We should end up with one service per Lateral Cache Manager
034 * Instance. One service works for multiple regions. We don't want a connection for each region.
035 */
036public class UDPDiscoveryManager
037{
038    /** The logger */
039    private static final Log log = LogManager.getLog( UDPDiscoveryManager.class );
040
041    /** Singleton instance */
042    private static final UDPDiscoveryManager INSTANCE = new UDPDiscoveryManager();
043
044    /** Known services */
045    private final ConcurrentMap<String, UDPDiscoveryService> services = new ConcurrentHashMap<>();
046
047    /** private for singleton */
048    private UDPDiscoveryManager()
049    {
050        // noopt
051    }
052
053    /**
054     * Singleton
055     * <p>
056     * @return UDPDiscoveryManager
057     */
058    public static UDPDiscoveryManager getInstance()
059    {
060        return INSTANCE;
061    }
062
063
064    /**
065     * Creates a service for the address and port if one doesn't exist already.
066     * <p>
067     * We need to key this using the listener port too.
068     * TODO think of making one discovery service work for multiple types of clients.
069     * <p>
070     * @param discoveryAddress
071     * @param discoveryPort
072     * @param servicePort
073     * @param cacheMgr
074     * @return UDPDiscoveryService
075     * @deprecated Specify serializer implementation explicitly, allow to specify udpTTL
076     */
077    @Deprecated
078    public UDPDiscoveryService getService( final String discoveryAddress, final int discoveryPort, final int servicePort,
079                                                        final ICompositeCacheManager cacheMgr )
080    {
081        return getService(discoveryAddress, discoveryPort, null, servicePort, 0,
082                cacheMgr, new StandardSerializer());
083    }
084
085    /**
086     * Creates a service for the address and port if one doesn't exist already.
087     * <p>
088     * We need to key this using the listener port too.
089     * TODO think of making one discovery service work for multiple types of clients.
090     * <p>
091     * @param discoveryAddress
092     * @param discoveryPort
093     * @param serviceAddress
094     * @param servicePort
095     * @param updTTL
096     * @param cacheMgr
097     * @param serializer
098     *
099     * @return UDPDiscoveryService
100     * @since 3.1
101     */
102    public UDPDiscoveryService getService( final String discoveryAddress, final int discoveryPort,
103            final String serviceAddress, final int servicePort, final int updTTL,
104            final ICompositeCacheManager cacheMgr, final IElementSerializer serializer )
105    {
106        final String key = String.join(":", discoveryAddress, String.valueOf(discoveryPort), String.valueOf(servicePort));
107
108        final UDPDiscoveryService service = services.computeIfAbsent(key, k -> {
109            log.info( "Creating service for address:port:servicePort [{0}]", key );
110
111            final UDPDiscoveryAttributes attributes = new UDPDiscoveryAttributes();
112            attributes.setUdpDiscoveryAddr(discoveryAddress);
113            attributes.setUdpDiscoveryPort(discoveryPort);
114            attributes.setServiceAddress(serviceAddress);
115            attributes.setServicePort(servicePort);
116            attributes.setUdpTTL(updTTL);
117
118            final UDPDiscoveryService newService = new UDPDiscoveryService(attributes, serializer);
119
120            // register for shutdown notification
121            cacheMgr.registerShutdownObserver( newService );
122
123            // inject scheduler
124            if ( cacheMgr instanceof IProvideScheduler)
125            {
126                newService.setScheduledExecutorService(((IProvideScheduler)cacheMgr)
127                        .getScheduledExecutorService());
128            }
129
130            newService.startup();
131            return newService;
132        });
133
134        log.debug( "Returning service [{0}] for key [{1}]", service, key );
135
136        return service;
137    }
138}