001package org.apache.commons.jcs.auxiliary.lateral.socket.tcp;
002
003/*
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements.  See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership.  The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License.  You may obtain a copy of the License at
011 *
012 *   http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing,
015 * software distributed under the License is distributed on an
016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017 * KIND, either express or implied.  See the License for the
018 * specific language governing permissions and limitations
019 * under the License.
020 */
021
022import org.apache.commons.jcs.auxiliary.AuxiliaryCache;
023import org.apache.commons.jcs.auxiliary.AuxiliaryCacheAttributes;
024import org.apache.commons.jcs.auxiliary.lateral.LateralCacheAttributes;
025import org.apache.commons.jcs.auxiliary.lateral.LateralCacheNoWait;
026import org.apache.commons.jcs.auxiliary.lateral.LateralCacheNoWaitFacade;
027import org.apache.commons.jcs.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
028import org.apache.commons.jcs.engine.behavior.ICompositeCacheManager;
029import org.apache.commons.jcs.utils.discovery.DiscoveredService;
030import org.apache.commons.jcs.utils.discovery.behavior.IDiscoveryListener;
031import org.apache.commons.logging.Log;
032import org.apache.commons.logging.LogFactory;
033
034import java.util.ArrayList;
035import java.util.Collections;
036import java.util.HashMap;
037import java.util.HashSet;
038import java.util.Map;
039import java.util.Set;
040
041/**
042 * This knows how to add and remove discovered services. It observes UDP discovery events.
043 * <p>
044 * We can have one listener per region, or one shared by all regions.
045 */
046public class LateralTCPDiscoveryListener
047    implements IDiscoveryListener
048{
049    /** The log factory */
050    private static final Log log = LogFactory.getLog( LateralTCPDiscoveryListener.class );
051
052    /**
053     * Map of no wait facades. these are used to determine which regions are locally configured to
054     * use laterals.
055     */
056    private final Map<String, LateralCacheNoWaitFacade<?, ?>> facades =
057        Collections.synchronizedMap( new HashMap<String, LateralCacheNoWaitFacade<?, ?>>() );
058
059    /**
060     * List of regions that are configured differently here than on another server. We keep track of
061     * this to limit the amount of info logging.
062     */
063    private final Set<String> knownDifferentlyConfiguredRegions =
064        Collections.synchronizedSet( new HashSet<String>() );
065
066    /** The name of the cache factory */
067    private String factoryName;
068
069    /** Reference to the cache manager for auxiliary cache access */
070    private ICompositeCacheManager cacheManager;
071
072    /**
073     * This plugs into the udp discovery system. It will receive add and remove events.
074     * <p>
075     * @param factoryName the name of the related cache factory
076     * @param cacheManager the global cache manager
077     */
078    protected LateralTCPDiscoveryListener( String factoryName, ICompositeCacheManager cacheManager )
079    {
080        this.factoryName = factoryName;
081        this.cacheManager = cacheManager;
082    }
083
084    /**
085     * Adds a nowait facade under this cachename. If one already existed, it will be overridden.
086     * <p>
087     * This adds nowaits to a facade for the region name. If the region has no facade, then it is
088     * not configured to use the lateral cache, and no facade will be created.
089     * <p>
090     * @param cacheName - the region name
091     * @param facade - facade (for region) =&gt; multiple lateral clients.
092     * @return true if the facade was not already registered.
093     */
094    public synchronized boolean addNoWaitFacade( String cacheName, LateralCacheNoWaitFacade<?, ?> facade )
095    {
096        boolean isNew = !containsNoWaitFacade( cacheName );
097
098        // override or put anew, it doesn't matter
099        facades.put( cacheName, facade );
100        knownDifferentlyConfiguredRegions.remove( cacheName );
101
102        return isNew;
103    }
104
105    /**
106     * Allows us to see if the facade is present.
107     * <p>
108     * @param cacheName - facades are for a region
109     * @return do we contain the no wait. true if so
110     */
111    public boolean containsNoWaitFacade( String cacheName )
112    {
113        return facades.containsKey( cacheName );
114    }
115
116    /**
117     * Allows us to see if the facade is present and if it has the no wait.
118     * <p>
119     * @param cacheName - facades are for a region
120     * @param noWait - is this no wait in the facade
121     * @return do we contain the no wait. true if so
122     */
123    public <K, V> boolean containsNoWait( String cacheName, LateralCacheNoWait<K, V> noWait )
124    {
125        @SuppressWarnings("unchecked") // Need to cast because of common map for all facades
126        LateralCacheNoWaitFacade<K, V> facade = (LateralCacheNoWaitFacade<K, V>)facades.get( noWait.getCacheName() );
127        if ( facade == null )
128        {
129            return false;
130        }
131
132        return facade.containsNoWait( noWait );
133    }
134
135    /**
136     * When a broadcast is received from the UDP Discovery receiver, for each cacheName in the
137     * message, the add no wait will be called here. To add a no wait, the facade is looked up for
138     * this cache name.
139     * <p>
140     * Each region has a facade. The facade contains a list of end points--the other tcp lateral
141     * services.
142     * <p>
143     * @param noWait
144     * @return true if we found the no wait and added it. False if the no wait was not present or it
145     *         we already had it.
146     */
147    protected <K, V> boolean addNoWait( LateralCacheNoWait<K, V> noWait )
148    {
149        @SuppressWarnings("unchecked") // Need to cast because of common map for all facades
150        LateralCacheNoWaitFacade<K, V> facade = (LateralCacheNoWaitFacade<K, V>)facades.get( noWait.getCacheName() );
151        if ( log.isDebugEnabled() )
152        {
153            log.debug( "addNoWait > Got facade for " + noWait.getCacheName() + " = " + facade );
154        }
155
156        if ( facade != null )
157        {
158            boolean isNew = facade.addNoWait( noWait );
159            if ( log.isDebugEnabled() )
160            {
161                log.debug( "Called addNoWait, isNew = " + isNew );
162            }
163            return isNew;
164        }
165        else
166        {
167            if ( !knownDifferentlyConfiguredRegions.contains( noWait.getCacheName() ) )
168            {
169                if ( log.isInfoEnabled() )
170                {
171                    log.info( "addNoWait > Different nodes are configured differently or region ["
172                        + noWait.getCacheName() + "] is not yet used on this side.  " );
173                }
174                knownDifferentlyConfiguredRegions.add( noWait.getCacheName() );
175            }
176            return false;
177        }
178    }
179
180    /**
181     * Look up the facade for the name. If it doesn't exist, then the region is not configured for
182     * use with the lateral cache. If it is present, remove the item from the no wait list.
183     * <p>
184     * @param noWait
185     * @return true if we found the no wait and removed it. False if the no wait was not present.
186     */
187    protected <K, V> boolean removeNoWait( LateralCacheNoWait<K, V> noWait )
188    {
189        @SuppressWarnings("unchecked") // Need to cast because of common map for all facades
190        LateralCacheNoWaitFacade<K, V> facade = (LateralCacheNoWaitFacade<K, V>)facades.get( noWait.getCacheName() );
191        if ( log.isDebugEnabled() )
192        {
193            log.debug( "removeNoWait > Got facade for " + noWait.getCacheName() + " = " + facade );
194        }
195
196        if ( facade != null )
197        {
198            boolean removed = facade.removeNoWait( noWait );
199            if ( log.isDebugEnabled() )
200            {
201                log.debug( "Called removeNoWait, removed " + removed );
202            }
203            return removed;
204        }
205        else
206        {
207            if ( !knownDifferentlyConfiguredRegions.contains( noWait.getCacheName() ) )
208            {
209                if ( log.isInfoEnabled() )
210                {
211                    log.info( "removeNoWait > Different nodes are configured differently or region ["
212                        + noWait.getCacheName() + "] is not yet used on this side.  " );
213                }
214                knownDifferentlyConfiguredRegions.add( noWait.getCacheName() );
215            }
216            return false;
217        }
218    }
219
220    /**
221     * Creates the lateral cache if needed.
222     * <p>
223     * We could go to the composite cache manager and get the the cache for the region. This would
224     * force a full configuration of the region. One advantage of this would be that the creation of
225     * the later would go through the factory, which would add the item to the no wait list. But we
226     * don't want to do this. This would force this client to have all the regions as the other.
227     * This might not be desired. We don't want to send or receive for a region here that is either
228     * not used or not configured to use the lateral.
229     * <p>
230     * Right now, I'm afraid that the region will get puts if another instance has the region
231     * configured to use the lateral and our address is configured. This might be a bug, but it
232     * shouldn't happen with discovery.
233     * <p>
234     * @param service
235     */
236    @Override
237    public void addDiscoveredService( DiscoveredService service )
238    {
239        // get a cache and add it to the no waits
240        // the add method should not add the same.
241        // we need the listener port from the original config.
242        ArrayList<String> regions = service.getCacheNames();
243        String serverAndPort = service.getServiceAddress() + ":" + service.getServicePort();
244
245        if ( regions != null )
246        {
247            // for each region get the cache
248            for (String cacheName : regions)
249            {
250                AuxiliaryCache<?, ?> ic = cacheManager.getAuxiliaryCache(factoryName, cacheName);
251
252                if ( log.isDebugEnabled() )
253                {
254                    log.debug( "Got cache, ic = " + ic );
255                }
256
257                // add this to the nowaits for this cachename
258                if ( ic != null )
259                {
260                    AuxiliaryCacheAttributes aca = ic.getAuxiliaryCacheAttributes();
261                    if (aca instanceof ITCPLateralCacheAttributes)
262                    {
263                        ITCPLateralCacheAttributes lca = (ITCPLateralCacheAttributes)aca;
264                        if (lca.getTransmissionType() != LateralCacheAttributes.Type.TCP
265                            || !serverAndPort.equals(lca.getTcpServer()) )
266                        {
267                            // skip caches not belonging to this service
268                            continue;
269                        }
270                    }
271
272                    addNoWait( (LateralCacheNoWait<?, ?>) ic );
273                    if ( log.isDebugEnabled() )
274                    {
275                        log.debug( "Called addNoWait for cacheName [" + cacheName + "]" );
276                    }
277                }
278            }
279        }
280        else
281        {
282            log.warn( "No cache names found in message " + service );
283        }
284    }
285
286    /**
287     * Removes the lateral cache.
288     * <p>
289     * We need to tell the manager that this instance is bad, so it will reconnect the sender if it
290     * comes back.
291     * <p>
292     * @param service
293     */
294    @Override
295    public void removeDiscoveredService( DiscoveredService service )
296    {
297        // get a cache and add it to the no waits
298        // the add method should not add the same.
299        // we need the listener port from the original config.
300        ArrayList<String> regions = service.getCacheNames();
301        String serverAndPort = service.getServiceAddress() + ":" + service.getServicePort();
302
303        if ( regions != null )
304        {
305            // for each region get the cache
306            for (String cacheName : regions)
307            {
308                AuxiliaryCache<?, ?> ic = cacheManager.getAuxiliaryCache(factoryName, cacheName);
309
310                if ( log.isDebugEnabled() )
311                {
312                    log.debug( "Got cache, ic = " + ic );
313                }
314
315                // remove this to the nowaits for this cachename
316                if ( ic != null )
317                {
318                    AuxiliaryCacheAttributes aca = ic.getAuxiliaryCacheAttributes();
319                    if (aca instanceof ITCPLateralCacheAttributes)
320                    {
321                        ITCPLateralCacheAttributes lca = (ITCPLateralCacheAttributes)aca;
322                        if (lca.getTransmissionType() != LateralCacheAttributes.Type.TCP
323                            || !serverAndPort.equals(lca.getTcpServer()) )
324                        {
325                            // skip caches not belonging to this service
326                            continue;
327                        }
328                    }
329
330                    removeNoWait( (LateralCacheNoWait<?, ?>) ic );
331                    if ( log.isDebugEnabled() )
332                    {
333                        log.debug( "Called removeNoWait for cacheName [" + cacheName + "]" );
334                    }
335                }
336            }
337        }
338        else
339        {
340            log.warn( "No cache names found in message " + service );
341        }
342    }
343}