View Javadoc
1   package org.apache.commons.jcs3.utils.discovery;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.concurrent.ConcurrentHashMap;
23  import java.util.concurrent.ConcurrentMap;
24  
25  import org.apache.commons.jcs3.engine.behavior.ICompositeCacheManager;
26  import org.apache.commons.jcs3.engine.behavior.IElementSerializer;
27  import org.apache.commons.jcs3.engine.behavior.IProvideScheduler;
28  import org.apache.commons.jcs3.log.Log;
29  import org.apache.commons.jcs3.log.LogManager;
30  import org.apache.commons.jcs3.utils.serialization.StandardSerializer;
31  
32  /**
33   * This manages UDPDiscovery Services. We should end up with one service per Lateral Cache Manager
34   * Instance. One service works for multiple regions. We don't want a connection for each region.
35   */
36  public class UDPDiscoveryManager
37  {
38      /** The logger */
39      private static final Log log = LogManager.getLog( UDPDiscoveryManager.class );
40  
41      /** Singleton instance */
42      private static final UDPDiscoveryManager INSTANCE = new UDPDiscoveryManager();
43  
44      /** Known services */
45      private final ConcurrentMap<String, UDPDiscoveryService> services = new ConcurrentHashMap<>();
46  
47      /** private for singleton */
48      private UDPDiscoveryManager()
49      {
50          // noopt
51      }
52  
53      /**
54       * Singleton
55       * <p>
56       * @return UDPDiscoveryManager
57       */
58      public static UDPDiscoveryManager getInstance()
59      {
60          return INSTANCE;
61      }
62  
63  
64      /**
65       * Creates a service for the address and port if one doesn't exist already.
66       * <p>
67       * We need to key this using the listener port too.
68       * TODO think of making one discovery service work for multiple types of clients.
69       * <p>
70       * @param discoveryAddress
71       * @param discoveryPort
72       * @param servicePort
73       * @param cacheMgr
74       * @return UDPDiscoveryService
75       * @deprecated Specify serializer implementation explicitly, allow to specify udpTTL
76       */
77      @Deprecated
78      public UDPDiscoveryService getService( final String discoveryAddress, final int discoveryPort, final int servicePort,
79                                                          final ICompositeCacheManager cacheMgr )
80      {
81          return getService(discoveryAddress, discoveryPort, null, servicePort, 0,
82                  cacheMgr, new StandardSerializer());
83      }
84  
85      /**
86       * Creates a service for the address and port if one doesn't exist already.
87       * <p>
88       * We need to key this using the listener port too.
89       * TODO think of making one discovery service work for multiple types of clients.
90       * <p>
91       * @param discoveryAddress
92       * @param discoveryPort
93       * @param serviceAddress
94       * @param servicePort
95       * @param updTTL
96       * @param cacheMgr
97       * @param serializer
98       *
99       * @return UDPDiscoveryService
100      * @since 3.1
101      */
102     public UDPDiscoveryService getService( final String discoveryAddress, final int discoveryPort,
103             final String serviceAddress, final int servicePort, final int updTTL,
104             final ICompositeCacheManager cacheMgr, final IElementSerializer serializer )
105     {
106         final String key = String.join(":", discoveryAddress, String.valueOf(discoveryPort), String.valueOf(servicePort));
107 
108         final UDPDiscoveryService service = services.computeIfAbsent(key, k -> {
109             log.info( "Creating service for address:port:servicePort [{0}]", key );
110 
111             final UDPDiscoveryAttributes attributes = new UDPDiscoveryAttributes();
112             attributes.setUdpDiscoveryAddr(discoveryAddress);
113             attributes.setUdpDiscoveryPort(discoveryPort);
114             attributes.setServiceAddress(serviceAddress);
115             attributes.setServicePort(servicePort);
116             attributes.setUdpTTL(updTTL);
117 
118             final UDPDiscoveryService newService = new UDPDiscoveryService(attributes, serializer);
119 
120             // register for shutdown notification
121             cacheMgr.registerShutdownObserver( newService );
122 
123             // inject scheduler
124             if ( cacheMgr instanceof IProvideScheduler)
125             {
126                 newService.setScheduledExecutorService(((IProvideScheduler)cacheMgr)
127                         .getScheduledExecutorService());
128             }
129 
130             newService.startup();
131             return newService;
132         });
133 
134         log.debug( "Returning service [{0}] for key [{1}]", service, key );
135 
136         return service;
137     }
138 }