View Javadoc
1   package org.apache.commons.jcs.auxiliary.lateral.socket.tcp;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import org.apache.commons.jcs.auxiliary.AuxiliaryCache;
23  import org.apache.commons.jcs.auxiliary.AuxiliaryCacheAttributes;
24  import org.apache.commons.jcs.auxiliary.lateral.LateralCacheAttributes;
25  import org.apache.commons.jcs.auxiliary.lateral.LateralCacheNoWait;
26  import org.apache.commons.jcs.auxiliary.lateral.LateralCacheNoWaitFacade;
27  import org.apache.commons.jcs.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
28  import org.apache.commons.jcs.engine.behavior.ICompositeCacheManager;
29  import org.apache.commons.jcs.utils.discovery.DiscoveredService;
30  import org.apache.commons.jcs.utils.discovery.behavior.IDiscoveryListener;
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  
34  import java.util.ArrayList;
35  import java.util.Collections;
36  import java.util.HashMap;
37  import java.util.HashSet;
38  import java.util.Map;
39  import java.util.Set;
40  
41  /**
42   * This knows how to add and remove discovered services. It observes UDP discovery events.
43   * <p>
44   * We can have one listener per region, or one shared by all regions.
45   */
46  public class LateralTCPDiscoveryListener
47      implements IDiscoveryListener
48  {
49      /** The log factory */
50      private static final Log log = LogFactory.getLog( LateralTCPDiscoveryListener.class );
51  
52      /**
53       * Map of no wait facades. these are used to determine which regions are locally configured to
54       * use laterals.
55       */
56      private final Map<String, LateralCacheNoWaitFacade<?, ?>> facades =
57          Collections.synchronizedMap( new HashMap<String, LateralCacheNoWaitFacade<?, ?>>() );
58  
59      /**
60       * List of regions that are configured differently here than on another server. We keep track of
61       * this to limit the amount of info logging.
62       */
63      private final Set<String> knownDifferentlyConfiguredRegions =
64          Collections.synchronizedSet( new HashSet<String>() );
65  
66      /** The name of the cache factory */
67      private String factoryName;
68  
69      /** Reference to the cache manager for auxiliary cache access */
70      private ICompositeCacheManager cacheManager;
71  
72      /**
73       * This plugs into the udp discovery system. It will receive add and remove events.
74       * <p>
75       * @param factoryName the name of the related cache factory
76       * @param cacheManager the global cache manager
77       */
78      protected LateralTCPDiscoveryListener( String factoryName, ICompositeCacheManager cacheManager )
79      {
80          this.factoryName = factoryName;
81          this.cacheManager = cacheManager;
82      }
83  
84      /**
85       * Adds a nowait facade under this cachename. If one already existed, it will be overridden.
86       * <p>
87       * This adds nowaits to a facade for the region name. If the region has no facade, then it is
88       * not configured to use the lateral cache, and no facade will be created.
89       * <p>
90       * @param cacheName - the region name
91       * @param facade - facade (for region) =&gt; multiple lateral clients.
92       * @return true if the facade was not already registered.
93       */
94      public synchronized boolean addNoWaitFacade( String cacheName, LateralCacheNoWaitFacade<?, ?> facade )
95      {
96          boolean isNew = !containsNoWaitFacade( cacheName );
97  
98          // override or put anew, it doesn't matter
99          facades.put( cacheName, facade );
100         knownDifferentlyConfiguredRegions.remove( cacheName );
101 
102         return isNew;
103     }
104 
105     /**
106      * Allows us to see if the facade is present.
107      * <p>
108      * @param cacheName - facades are for a region
109      * @return do we contain the no wait. true if so
110      */
111     public boolean containsNoWaitFacade( String cacheName )
112     {
113         return facades.containsKey( cacheName );
114     }
115 
116     /**
117      * Allows us to see if the facade is present and if it has the no wait.
118      * <p>
119      * @param cacheName - facades are for a region
120      * @param noWait - is this no wait in the facade
121      * @return do we contain the no wait. true if so
122      */
123     public <K, V> boolean containsNoWait( String cacheName, LateralCacheNoWait<K, V> noWait )
124     {
125         @SuppressWarnings("unchecked") // Need to cast because of common map for all facades
126         LateralCacheNoWaitFacade<K, V> facade = (LateralCacheNoWaitFacade<K, V>)facades.get( noWait.getCacheName() );
127         if ( facade == null )
128         {
129             return false;
130         }
131 
132         return facade.containsNoWait( noWait );
133     }
134 
135     /**
136      * When a broadcast is received from the UDP Discovery receiver, for each cacheName in the
137      * message, the add no wait will be called here. To add a no wait, the facade is looked up for
138      * this cache name.
139      * <p>
140      * Each region has a facade. The facade contains a list of end points--the other tcp lateral
141      * services.
142      * <p>
143      * @param noWait
144      * @return true if we found the no wait and added it. False if the no wait was not present or it
145      *         we already had it.
146      */
147     protected <K, V> boolean addNoWait( LateralCacheNoWait<K, V> noWait )
148     {
149         @SuppressWarnings("unchecked") // Need to cast because of common map for all facades
150         LateralCacheNoWaitFacade<K, V> facade = (LateralCacheNoWaitFacade<K, V>)facades.get( noWait.getCacheName() );
151         if ( log.isDebugEnabled() )
152         {
153             log.debug( "addNoWait > Got facade for " + noWait.getCacheName() + " = " + facade );
154         }
155 
156         if ( facade != null )
157         {
158             boolean isNew = facade.addNoWait( noWait );
159             if ( log.isDebugEnabled() )
160             {
161                 log.debug( "Called addNoWait, isNew = " + isNew );
162             }
163             return isNew;
164         }
165         else
166         {
167             if ( !knownDifferentlyConfiguredRegions.contains( noWait.getCacheName() ) )
168             {
169                 if ( log.isInfoEnabled() )
170                 {
171                     log.info( "addNoWait > Different nodes are configured differently or region ["
172                         + noWait.getCacheName() + "] is not yet used on this side.  " );
173                 }
174                 knownDifferentlyConfiguredRegions.add( noWait.getCacheName() );
175             }
176             return false;
177         }
178     }
179 
180     /**
181      * Look up the facade for the name. If it doesn't exist, then the region is not configured for
182      * use with the lateral cache. If it is present, remove the item from the no wait list.
183      * <p>
184      * @param noWait
185      * @return true if we found the no wait and removed it. False if the no wait was not present.
186      */
187     protected <K, V> boolean removeNoWait( LateralCacheNoWait<K, V> noWait )
188     {
189         @SuppressWarnings("unchecked") // Need to cast because of common map for all facades
190         LateralCacheNoWaitFacade<K, V> facade = (LateralCacheNoWaitFacade<K, V>)facades.get( noWait.getCacheName() );
191         if ( log.isDebugEnabled() )
192         {
193             log.debug( "removeNoWait > Got facade for " + noWait.getCacheName() + " = " + facade );
194         }
195 
196         if ( facade != null )
197         {
198             boolean removed = facade.removeNoWait( noWait );
199             if ( log.isDebugEnabled() )
200             {
201                 log.debug( "Called removeNoWait, removed " + removed );
202             }
203             return removed;
204         }
205         else
206         {
207             if ( !knownDifferentlyConfiguredRegions.contains( noWait.getCacheName() ) )
208             {
209                 if ( log.isInfoEnabled() )
210                 {
211                     log.info( "removeNoWait > Different nodes are configured differently or region ["
212                         + noWait.getCacheName() + "] is not yet used on this side.  " );
213                 }
214                 knownDifferentlyConfiguredRegions.add( noWait.getCacheName() );
215             }
216             return false;
217         }
218     }
219 
220     /**
221      * Creates the lateral cache if needed.
222      * <p>
223      * We could go to the composite cache manager and get the the cache for the region. This would
224      * force a full configuration of the region. One advantage of this would be that the creation of
225      * the later would go through the factory, which would add the item to the no wait list. But we
226      * don't want to do this. This would force this client to have all the regions as the other.
227      * This might not be desired. We don't want to send or receive for a region here that is either
228      * not used or not configured to use the lateral.
229      * <p>
230      * Right now, I'm afraid that the region will get puts if another instance has the region
231      * configured to use the lateral and our address is configured. This might be a bug, but it
232      * shouldn't happen with discovery.
233      * <p>
234      * @param service
235      */
236     @Override
237     public void addDiscoveredService( DiscoveredService service )
238     {
239         // get a cache and add it to the no waits
240         // the add method should not add the same.
241         // we need the listener port from the original config.
242         ArrayList<String> regions = service.getCacheNames();
243         String serverAndPort = service.getServiceAddress() + ":" + service.getServicePort();
244 
245         if ( regions != null )
246         {
247             // for each region get the cache
248             for (String cacheName : regions)
249             {
250                 AuxiliaryCache<?, ?> ic = cacheManager.getAuxiliaryCache(factoryName, cacheName);
251 
252                 if ( log.isDebugEnabled() )
253                 {
254                     log.debug( "Got cache, ic = " + ic );
255                 }
256 
257                 // add this to the nowaits for this cachename
258                 if ( ic != null )
259                 {
260                     AuxiliaryCacheAttributes aca = ic.getAuxiliaryCacheAttributes();
261                     if (aca instanceof ITCPLateralCacheAttributes)
262                     {
263                         ITCPLateralCacheAttributes lca = (ITCPLateralCacheAttributes)aca;
264                         if (lca.getTransmissionType() != LateralCacheAttributes.Type.TCP
265                             || !serverAndPort.equals(lca.getTcpServer()) )
266                         {
267                             // skip caches not belonging to this service
268                             continue;
269                         }
270                     }
271 
272                     addNoWait( (LateralCacheNoWait<?, ?>) ic );
273                     if ( log.isDebugEnabled() )
274                     {
275                         log.debug( "Called addNoWait for cacheName [" + cacheName + "]" );
276                     }
277                 }
278             }
279         }
280         else
281         {
282             log.warn( "No cache names found in message " + service );
283         }
284     }
285 
286     /**
287      * Removes the lateral cache.
288      * <p>
289      * We need to tell the manager that this instance is bad, so it will reconnect the sender if it
290      * comes back.
291      * <p>
292      * @param service
293      */
294     @Override
295     public void removeDiscoveredService( DiscoveredService service )
296     {
297         // get a cache and add it to the no waits
298         // the add method should not add the same.
299         // we need the listener port from the original config.
300         ArrayList<String> regions = service.getCacheNames();
301         String serverAndPort = service.getServiceAddress() + ":" + service.getServicePort();
302 
303         if ( regions != null )
304         {
305             // for each region get the cache
306             for (String cacheName : regions)
307             {
308                 AuxiliaryCache<?, ?> ic = cacheManager.getAuxiliaryCache(factoryName, cacheName);
309 
310                 if ( log.isDebugEnabled() )
311                 {
312                     log.debug( "Got cache, ic = " + ic );
313                 }
314 
315                 // remove this to the nowaits for this cachename
316                 if ( ic != null )
317                 {
318                     AuxiliaryCacheAttributes aca = ic.getAuxiliaryCacheAttributes();
319                     if (aca instanceof ITCPLateralCacheAttributes)
320                     {
321                         ITCPLateralCacheAttributes lca = (ITCPLateralCacheAttributes)aca;
322                         if (lca.getTransmissionType() != LateralCacheAttributes.Type.TCP
323                             || !serverAndPort.equals(lca.getTcpServer()) )
324                         {
325                             // skip caches not belonging to this service
326                             continue;
327                         }
328                     }
329 
330                     removeNoWait( (LateralCacheNoWait<?, ?>) ic );
331                     if ( log.isDebugEnabled() )
332                     {
333                         log.debug( "Called removeNoWait for cacheName [" + cacheName + "]" );
334                     }
335                 }
336             }
337         }
338         else
339         {
340             log.warn( "No cache names found in message " + service );
341         }
342     }
343 }