View Javadoc
1   package org.apache.commons.jcs3.auxiliary.lateral.socket.tcp;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.ArrayList;
23  import java.util.concurrent.ConcurrentHashMap;
24  import java.util.concurrent.ConcurrentMap;
25  import java.util.concurrent.CopyOnWriteArrayList;
26  
27  import org.apache.commons.jcs3.auxiliary.lateral.LateralCacheNoWait;
28  import org.apache.commons.jcs3.auxiliary.lateral.LateralCacheNoWaitFacade;
29  import org.apache.commons.jcs3.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
30  import org.apache.commons.jcs3.engine.behavior.ICompositeCacheManager;
31  import org.apache.commons.jcs3.engine.behavior.IElementSerializer;
32  import org.apache.commons.jcs3.engine.control.CompositeCacheManager;
33  import org.apache.commons.jcs3.engine.logging.behavior.ICacheEventLogger;
34  import org.apache.commons.jcs3.log.Log;
35  import org.apache.commons.jcs3.log.LogManager;
36  import org.apache.commons.jcs3.utils.discovery.DiscoveredService;
37  import org.apache.commons.jcs3.utils.discovery.behavior.IDiscoveryListener;
38  
39  /**
40   * This knows how to add and remove discovered services. It observes UDP discovery events.
41   * <p>
42   * We can have one listener per region, or one shared by all regions.
43   */
44  public class LateralTCPDiscoveryListener
45      implements IDiscoveryListener
46  {
47      /** The log factory */
48      private static final Log log = LogManager.getLog( LateralTCPDiscoveryListener.class );
49  
50      /**
51       * Map of no wait facades. these are used to determine which regions are locally configured to
52       * use laterals.
53       */
54      private final ConcurrentMap<String, LateralCacheNoWaitFacade<?, ?>> facades =
55          new ConcurrentHashMap<>();
56  
57      /**
58       * List of regions that are configured differently here than on another server. We keep track of
59       * this to limit the amount of info logging.
60       */
61      private final CopyOnWriteArrayList<String> knownDifferentlyConfiguredRegions =
62          new CopyOnWriteArrayList<>();
63  
64      /** The name of the cache factory */
65      private final String factoryName;
66  
67      /** Reference to the cache manager for auxiliary cache access */
68      private final CompositeCacheManager cacheManager;
69  
70      /** Reference to the cache event logger for auxiliary cache creation */
71      private final ICacheEventLogger cacheEventLogger;
72  
73      /** Reference to the cache element serializer for auxiliary cache creation */
74      private final IElementSerializer elementSerializer;
75  
76      /**
77       * This plugs into the udp discovery system. It will receive add and remove events.
78       * <p>
79       * @param factoryName the name of the related cache factory
80       * @param cacheManager the global cache manager
81       * @deprecated Use constructor with four parameters
82       */
83      @Deprecated
84      protected LateralTCPDiscoveryListener( final String factoryName, final ICompositeCacheManager cacheManager )
85      {
86          this(factoryName, (CompositeCacheManager) cacheManager, null, null);
87      }
88  
89      /**
90       * This plugs into the udp discovery system. It will receive add and remove events.
91       * <p>
92       * @param factoryName the name of the related cache factory
93       * @param cacheManager the global cache manager
94       * @param cacheEventLogger Reference to the cache event logger for auxiliary cache creation
95       * @param elementSerializer Reference to the cache element serializer for auxiliary cache
96       * creation
97       * @since 3.1
98       */
99      protected LateralTCPDiscoveryListener( final String factoryName,
100             final CompositeCacheManager cacheManager,
101             final ICacheEventLogger cacheEventLogger,
102             final IElementSerializer elementSerializer)
103     {
104         this.factoryName = factoryName;
105         this.cacheManager = cacheManager;
106         this.cacheEventLogger = cacheEventLogger;
107         this.elementSerializer = elementSerializer;
108     }
109 
110     /**
111      * Adds a nowait facade under this cachename. If one already existed, it will be overridden.
112      * <p>
113      * This adds nowaits to a facade for the region name. If the region has no facade, then it is
114      * not configured to use the lateral cache, and no facade will be created.
115      * <p>
116      * @param cacheName - the region name
117      * @param facade - facade (for region) =&gt; multiple lateral clients.
118      * @return true if the facade was not already registered.
119      */
120     public boolean addNoWaitFacade( final String cacheName, final LateralCacheNoWaitFacade<?, ?> facade )
121     {
122         final boolean isNew = !containsNoWaitFacade( cacheName );
123 
124         // override or put anew, it doesn't matter
125         facades.put( cacheName, facade );
126         knownDifferentlyConfiguredRegions.remove( cacheName );
127 
128         return isNew;
129     }
130 
131     /**
132      * Allows us to see if the facade is present.
133      * <p>
134      * @param cacheName - facades are for a region
135      * @return do we contain the no wait. true if so
136      */
137     public boolean containsNoWaitFacade( final String cacheName )
138     {
139         return facades.containsKey( cacheName );
140     }
141 
142     /**
143      * Allows us to see if the facade is present and if it has the no wait.
144      * <p>
145      * @param cacheName - facades are for a region
146      * @param noWait - is this no wait in the facade
147      * @return do we contain the no wait. true if so
148      */
149     public <K, V> boolean containsNoWait( final String cacheName, final LateralCacheNoWait<K, V> noWait )
150     {
151         @SuppressWarnings("unchecked") // Need to cast because of common map for all facades
152         final
153         LateralCacheNoWaitFacade<K, V> facade =
154             (LateralCacheNoWaitFacade<K, V>)facades.get( noWait.getCacheName() );
155 
156         if ( facade == null )
157         {
158             return false;
159         }
160 
161         return facade.containsNoWait( noWait );
162     }
163 
164     /**
165      * When a broadcast is received from the UDP Discovery receiver, for each cacheName in the
166      * message, the add no wait will be called here. To add a no wait, the facade is looked up for
167      * this cache name.
168      * <p>
169      * Each region has a facade. The facade contains a list of end points--the other tcp lateral
170      * services.
171      * <p>
172      * @param noWait
173      * @return true if we found the no wait and added it. False if the no wait was not present or if
174      *         we already had it.
175      */
176     protected <K, V> boolean addNoWait( final LateralCacheNoWait<K, V> noWait )
177     {
178         @SuppressWarnings("unchecked") // Need to cast because of common map for all facades
179         final
180         LateralCacheNoWaitFacade<K, V> facade =
181             (LateralCacheNoWaitFacade<K, V>)facades.get( noWait.getCacheName() );
182         log.debug( "addNoWait > Got facade for {0} = {1}", noWait.getCacheName(), facade );
183 
184         return addNoWait(noWait, facade);
185     }
186 
187     /**
188      * When a broadcast is received from the UDP Discovery receiver, for each cacheName in the
189      * message, the add no wait will be called here.
190      * <p>
191      * @param noWait the no wait
192      * @param facade the related facade
193      * @return true if we found the no wait and added it. False if the no wait was not present or if
194      *         we already had it.
195      * @since 3.1
196      */
197     protected <K, V> boolean addNoWait(final LateralCacheNoWait<K, V> noWait,
198             final LateralCacheNoWaitFacade<K, V> facade)
199     {
200         if ( facade != null )
201         {
202             final boolean isNew = facade.addNoWait( noWait );
203             log.debug( "Called addNoWait, isNew = {0}", isNew );
204             return isNew;
205         }
206         if ( knownDifferentlyConfiguredRegions.addIfAbsent( noWait.getCacheName() ) )
207         {
208             log.info( "addNoWait > Different nodes are configured differently "
209                     + "or region [{0}] is not yet used on this side.",
210                     noWait::getCacheName);
211         }
212         return false;
213     }
214 
215     /**
216      * Look up the facade for the name. If it doesn't exist, then the region is not configured for
217      * use with the lateral cache. If it is present, remove the item from the no wait list.
218      * <p>
219      * @param noWait
220      * @return true if we found the no wait and removed it. False if the no wait was not present.
221      */
222     protected <K, V> boolean removeNoWait( final LateralCacheNoWait<K, V> noWait )
223     {
224         @SuppressWarnings("unchecked") // Need to cast because of common map for all facades
225         final
226         LateralCacheNoWaitFacade<K, V> facade =
227             (LateralCacheNoWaitFacade<K, V>)facades.get( noWait.getCacheName() );
228         log.debug( "removeNoWait > Got facade for {0} = {1}", noWait.getCacheName(), facade);
229 
230         return removeNoWait(facade, noWait.getCacheName(), noWait.getIdentityKey());
231     }
232 
233     /**
234      * Remove the item from the no wait list.
235      * <p>
236      * @param facade
237      * @param cacheName
238      * @param tcpServer
239      * @return true if we found the no wait and removed it. False if the no wait was not present.
240      * @since 3.1
241      */
242     protected <K, V> boolean removeNoWait(final LateralCacheNoWaitFacade<K, V> facade,
243             final String cacheName, final String tcpServer)
244     {
245         if ( facade != null )
246         {
247             final boolean removed = facade.removeNoWait(tcpServer);
248             log.debug( "Called removeNoWait, removed {0}", removed );
249             return removed;
250         }
251         if (knownDifferentlyConfiguredRegions.addIfAbsent(cacheName))
252         {
253             log.info( "addNoWait > Different nodes are configured differently "
254                     + "or region [{0}] is not yet used on this side.",
255                     cacheName);
256         }
257         return false;
258     }
259 
260     /**
261      * Creates the lateral cache if needed.
262      * <p>
263      * We could go to the composite cache manager and get the cache for the region. This would
264      * force a full configuration of the region. One advantage of this would be that the creation of
265      * the later would go through the factory, which would add the item to the no wait list. But we
266      * don't want to do this. This would force this client to have all the regions as the other.
267      * This might not be desired. We don't want to send or receive for a region here that is either
268      * not used or not configured to use the lateral.
269      * <p>
270      * Right now, I'm afraid that the region will get puts if another instance has the region
271      * configured to use the lateral and our address is configured. This might be a bug, but it
272      * shouldn't happen with discovery.
273      * <p>
274      * @param service
275      */
276     @Override
277     public void addDiscoveredService( final DiscoveredService service )
278     {
279         // get a cache and add it to the no waits
280         // the add method should not add the same.
281         // we need the listener port from the original config.
282         final ArrayList<String> regions = service.getCacheNames();
283         final String serverAndPort = service.getServiceAddress() + ":" + service.getServicePort();
284 
285         if ( regions != null )
286         {
287             // for each region get the cache
288             for (final String cacheName : regions)
289             {
290                 final LateralCacheNoWaitFacade<?, ?> facade = facades.get(cacheName);
291                 log.debug( "Got cache facade {0}", facade );
292 
293                 // add this to the nowaits for this cachename
294                 if (facade != null)
295                 {
296                     // skip caches already there
297                     if (facade.containsNoWait(serverAndPort))
298                     {
299                         continue;
300                     }
301 
302                     final ITCPLateralCacheAttributes lca =
303                             (ITCPLateralCacheAttributes) facade.getAuxiliaryCacheAttributes().clone();
304                     lca.setTcpServer(serverAndPort);
305 
306                     LateralTCPCacheFactory factory =
307                             (LateralTCPCacheFactory) cacheManager.registryFacGet(factoryName);
308 
309                     LateralCacheNoWait<?, ?> noWait =
310                             factory.createCacheNoWait(lca, cacheEventLogger, elementSerializer);
311                     factory.monitorCache(noWait);
312 
313                     if (addNoWait(noWait))
314                     {
315                         log.debug("Added NoWait for cacheName [{0}] at {1}", cacheName, serverAndPort);
316                     }
317                 }
318             }
319         }
320         else
321         {
322             log.warn( "No cache names found in message {0}", service );
323         }
324     }
325 
326     /**
327      * Removes the lateral cache.
328      * <p>
329      * We need to tell the manager that this instance is bad, so it will reconnect the sender if it
330      * comes back.
331      * <p>
332      * @param service
333      */
334     @Override
335     public void removeDiscoveredService( final DiscoveredService service )
336     {
337         // get a cache and add it to the no waits
338         // the add method should not add the same.
339         // we need the listener port from the original config.
340         final ArrayList<String> regions = service.getCacheNames();
341         final String serverAndPort = service.getServiceAddress() + ":" + service.getServicePort();
342 
343         if ( regions != null )
344         {
345             // for each region get the cache
346             for (final String cacheName : regions)
347             {
348                 final LateralCacheNoWaitFacade<?, ?> facade = facades.get(cacheName);
349                 log.debug( "Got cache facade {0}", facade );
350 
351                 // remove this from the nowaits for this cachename
352                 if (facade != null && removeNoWait(facade, cacheName, serverAndPort))
353                 {
354                     log.debug("Removed NoWait for cacheName [{0}] at {1}", cacheName, serverAndPort);
355                 }
356             }
357         }
358         else
359         {
360             log.warn( "No cache names found in message {0}", service );
361         }
362     }
363 }