001    package org.apache.jcs.auxiliary.lateral.socket.tcp;
002    
003    import java.io.Serializable;
004    import java.util.ArrayList;
005    import java.util.Collections;
006    import java.util.HashMap;
007    import java.util.HashSet;
008    import java.util.Map;
009    import java.util.Set;
010    
011    import org.apache.commons.logging.Log;
012    import org.apache.commons.logging.LogFactory;
013    import org.apache.jcs.auxiliary.lateral.LateralCacheAttributes;
014    import org.apache.jcs.auxiliary.lateral.LateralCacheNoWait;
015    import org.apache.jcs.auxiliary.lateral.LateralCacheNoWaitFacade;
016    import org.apache.jcs.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
017    import org.apache.jcs.engine.behavior.ICache;
018    import org.apache.jcs.engine.behavior.ICompositeCacheManager;
019    import org.apache.jcs.engine.behavior.IElementSerializer;
020    import org.apache.jcs.engine.logging.behavior.ICacheEventLogger;
021    import org.apache.jcs.utils.discovery.DiscoveredService;
022    import org.apache.jcs.utils.discovery.behavior.IDiscoveryListener;
023    
024    /**
025     * This knows how to add and remove discovered services. It observes UDP discovery events.
026     * <p>
027     * We can have one listener per region, or one shared by all regions.
028     */
029    public class LateralTCPDiscoveryListener
030        implements IDiscoveryListener
031    {
032        /** The log factory */
033        private final static Log log = LogFactory.getLog( LateralTCPDiscoveryListener.class );
034    
035        /**
036         * Map of no wait facades. these are used to determine which regions are locally configured to
037         * use laterals.
038         */
039        private final Map<String, LateralCacheNoWaitFacade<? extends Serializable, ? extends Serializable>> facades =
040            Collections.synchronizedMap( new HashMap<String, LateralCacheNoWaitFacade<? extends Serializable, ? extends Serializable>>() );
041    
042        /**
043         * List of regions that are configured differently here than on another server. We keep track of
044         * this to limit the amount of info logging.
045         */
046        private final Set<String> knownDifferentlyConfiguredRegions =
047            Collections.synchronizedSet( new HashSet<String>() );
048    
049        /** The cache manager. */
050        private final ICompositeCacheManager cacheMgr;
051    
052        /** The event logger. */
053        protected ICacheEventLogger cacheEventLogger;
054    
055        /** The serializer. */
056        protected IElementSerializer elementSerializer;
057    
058        /**
059         * This plugs into the udp discovery system. It will receive add and remove events.
060         * <p>
061         * @param cacheMgr
062         * @param cacheEventLogger
063         * @param elementSerializer
064         */
065        protected LateralTCPDiscoveryListener( ICompositeCacheManager cacheMgr, ICacheEventLogger cacheEventLogger,
066                                               IElementSerializer elementSerializer )
067        {
068            this.cacheMgr = cacheMgr;
069            this.cacheEventLogger = cacheEventLogger;
070            this.elementSerializer = elementSerializer;
071        }
072    
073        /**
074         * Adds a nowait facade under this cachename. If one already existed, it will be overridden.
075         * <p>
076         * This adds nowaits to a facade for the region name. If the region has no facade, then it is
077         * not configured to use the lateral cache, and no facade will be created.
078         * <p>
079         * @param cacheName - the region name
080         * @param facade - facade (for region) => multiple lateral clients.
081         * @return true if the facade was not already registered.
082         */
083        public synchronized boolean addNoWaitFacade( String cacheName, LateralCacheNoWaitFacade<? extends Serializable, ? extends Serializable> facade )
084        {
085            boolean isNew = !containsNoWaitFacade( cacheName );
086    
087            // override or put anew, it doesn't matter
088            facades.put( cacheName, facade );
089            knownDifferentlyConfiguredRegions.remove( cacheName );
090    
091            return isNew;
092        }
093    
094        /**
095         * Allows us to see if the facade is present.
096         * <p>
097         * @param cacheName - facades are for a region
098         * @return do we contain the no wait. true if so
099         */
100        public boolean containsNoWaitFacade( String cacheName )
101        {
102            return facades.containsKey( cacheName );
103        }
104    
105        /**
106         * Allows us to see if the facade is present and if it has the no wait.
107         * <p>
108         * @param cacheName - facades are for a region
109         * @param noWait - is this no wait in the facade
110         * @return do we contain the no wait. true if so
111         */
112        public <K extends Serializable, V extends Serializable> boolean containsNoWait( String cacheName, LateralCacheNoWait<K, V> noWait )
113        {
114            @SuppressWarnings("unchecked") // Need to cast because of common map for all facades
115            LateralCacheNoWaitFacade<K, V> facade = (LateralCacheNoWaitFacade<K, V>)facades.get( noWait.getCacheName() );
116            if ( facade == null )
117            {
118                return false;
119            }
120    
121            return facade.containsNoWait( noWait );
122        }
123    
124        /**
125         * When a broadcast is received from the UDP Discovery receiver, for each cacheName in the
126         * message, the add no wait will be called here. To add a no wait, the facade is looked up for
127         * this cache name.
128         * <p>
129         * Each region has a facade. The facade contains a list of end points--the other tcp lateral
130         * services.
131         * <p>
132         * @param noWait
133         * @return true if we found the no wait and added it. False if the no wait was not present or it
134         *         we already had it.
135         */
136        protected <K extends Serializable, V extends Serializable> boolean addNoWait( LateralCacheNoWait<K, V> noWait )
137        {
138            @SuppressWarnings("unchecked") // Need to cast because of common map for all facades
139            LateralCacheNoWaitFacade<K, V> facade = (LateralCacheNoWaitFacade<K, V>)facades.get( noWait.getCacheName() );
140            if ( log.isDebugEnabled() )
141            {
142                log.debug( "addNoWait > Got facade for " + noWait.getCacheName() + " = " + facade );
143            }
144    
145            if ( facade != null )
146            {
147                boolean isNew = facade.addNoWait( noWait );
148                if ( log.isDebugEnabled() )
149                {
150                    log.debug( "Called addNoWait, isNew = " + isNew );
151                }
152                return isNew;
153            }
154            else
155            {
156                if ( !knownDifferentlyConfiguredRegions.contains( noWait.getCacheName() ) )
157                {
158                    if ( log.isInfoEnabled() )
159                    {
160                        log.info( "addNoWait > Different nodes are configured differently or region ["
161                            + noWait.getCacheName() + "] is not yet used on this side.  " );
162                    }
163                    knownDifferentlyConfiguredRegions.add( noWait.getCacheName() );
164                }
165                return false;
166            }
167        }
168    
169        /**
170         * Look up the facade for the name. If it doesn't exist, then the region is not configured for
171         * use with the lateral cache. If it is present, remove the item from the no wait list.
172         * <p>
173         * @param noWait
174         * @return true if we found the no wait and removed it. False if the no wait was not present.
175         */
176        protected <K extends Serializable, V extends Serializable> boolean removeNoWait( LateralCacheNoWait<K, V> noWait )
177        {
178            @SuppressWarnings("unchecked") // Need to cast because of common map for all facades
179            LateralCacheNoWaitFacade<K, V> facade = (LateralCacheNoWaitFacade<K, V>)facades.get( noWait.getCacheName() );
180            if ( log.isDebugEnabled() )
181            {
182                log.debug( "removeNoWait > Got facade for " + noWait.getCacheName() + " = " + facade );
183            }
184    
185            if ( facade != null )
186            {
187                boolean removed = facade.removeNoWait( noWait );
188                if ( log.isDebugEnabled() )
189                {
190                    log.debug( "Called removeNoWait, removed " + removed );
191                }
192                return removed;
193            }
194            else
195            {
196                if ( !knownDifferentlyConfiguredRegions.contains( noWait.getCacheName() ) )
197                {
198                    if ( log.isInfoEnabled() )
199                    {
200                        log.info( "removeNoWait > Different nodes are configured differently or region ["
201                            + noWait.getCacheName() + "] is not yet used on this side.  " );
202                    }
203                    knownDifferentlyConfiguredRegions.add( noWait.getCacheName() );
204                }
205                return false;
206            }
207        }
208    
209        /**
210         * Creates the lateral cache if needed.
211         * <p>
212         * We could go to the composite cache manager and get the the cache for the region. This would
213         * force a full configuration of the region. One advantage of this would be that the creation of
214         * the later would go through the factory, which would add the item to the no wait list. But we
215         * don't want to do this. This would force this client to have all the regions as the other.
216         * This might not be desired. We don't want to send or receive for a region here that is either
217         * not used or not configured to use the lateral.
218         * <p>
219         * Right now, I'm afraid that the region will get puts if another instance has the region
220         * configured to use the lateral and our address is configured. This might be a bug, but it
221         * shouldn't happen with discovery.
222         * <p>
223         * @param service
224         */
225        public void addDiscoveredService( DiscoveredService service )
226        {
227            // get a cache and add it to the no waits
228            // the add method should not add the same.
229            // we need the listener port from the original config.
230            LateralTCPCacheManager lcm = findManagerForServiceEndPoint( service );
231    
232            ArrayList<String> regions = service.getCacheNames();
233            if ( regions != null )
234            {
235                // for each region get the cache
236                for (String cacheName : regions)
237                {
238                    try
239                    {
240                        ICache<? extends Serializable, ? extends Serializable> ic = lcm.getCache( cacheName );
241    
242                        if ( log.isDebugEnabled() )
243                        {
244                            log.debug( "Got cache, ic = " + ic );
245                        }
246    
247                        // add this to the nowaits for this cachename
248                        if ( ic != null )
249                        {
250                            addNoWait( (LateralCacheNoWait<? extends Serializable, ? extends Serializable>) ic );
251                            if ( log.isDebugEnabled() )
252                            {
253                                log.debug( "Called addNoWait for cacheName [" + cacheName + "]" );
254                            }
255                        }
256                    }
257                    catch ( Exception e )
258                    {
259                        log.error( "Problem creating no wait", e );
260                    }
261                }
262            }
263            else
264            {
265                log.warn( "No cache names found in message " + service );
266            }
267        }
268    
269        /**
270         * Removes the lateral cache.
271         * <p>
272         * We need to tell the manager that this instance is bad, so it will reconnect the sender if it
273         * comes back.
274         * <p>
275         * @param service
276         */
277        public void removeDiscoveredService( DiscoveredService service )
278        {
279            // get a cache and add it to the no waits
280            // the add method should not add the same.
281            // we need the listener port from the original config.
282            LateralTCPCacheManager lcm = findManagerForServiceEndPoint( service );
283    
284            ArrayList<String> regions = service.getCacheNames();
285            if ( regions != null )
286            {
287                // for each region get the cache
288                for (String cacheName : regions)
289                {
290                    try
291                    {
292                        ICache<? extends Serializable, ? extends Serializable> ic = lcm.getCache( cacheName );
293    
294                        if ( log.isDebugEnabled() )
295                        {
296                            log.debug( "Got cache, ic = " + ic );
297                        }
298    
299                        // remove this to the nowaits for this cachename
300                        if ( ic != null )
301                        {
302                            removeNoWait( (LateralCacheNoWait<? extends Serializable, ? extends Serializable>) ic );
303                            if ( log.isDebugEnabled() )
304                            {
305                                log.debug( "Called removeNoWait for cacheName [" + cacheName + "]" );
306                            }
307                        }
308                    }
309                    catch ( Exception e )
310                    {
311                        log.error( "Problem removing no wait", e );
312                    }
313                }
314            }
315            else
316            {
317                log.warn( "No cache names found in message " + service );
318            }
319        }
320    
321        /**
322         * Gets the appropriate manager.
323         * <p>
324         * @param service
325         * @return LateralTCPCacheManager configured for that end point.
326         */
327        private LateralTCPCacheManager findManagerForServiceEndPoint( DiscoveredService service )
328        {
329            ITCPLateralCacheAttributes lca = new TCPLateralCacheAttributes();
330            lca.setTransmissionType( LateralCacheAttributes.TCP );
331            lca.setTcpServer( service.getServiceAddress() + ":" + service.getServicePort() );
332            LateralTCPCacheManager lcm = LateralTCPCacheManager.getInstance( lca, cacheMgr, cacheEventLogger,
333                                                                             elementSerializer );
334            return lcm;
335        }
336    }