001    package org.apache.jcs.engine;
002    
003    /*
004     * Licensed to the Apache Software Foundation (ASF) under one
005     * or more contributor license agreements.  See the NOTICE file
006     * distributed with this work for additional information
007     * regarding copyright ownership.  The ASF licenses this file
008     * to you under the Apache License, Version 2.0 (the
009     * "License"); you may not use this file except in compliance
010     * with the License.  You may obtain a copy of the License at
011     *
012     *   http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing,
015     * software distributed under the License is distributed on an
016     * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017     * KIND, either express or implied.  See the License for the
018     * specific language governing permissions and limitations
019     * under the License.
020     */
021    
022    import java.io.IOException;
023    import java.io.Serializable;
024    import java.util.HashMap;
025    import java.util.HashSet;
026    import java.util.Map;
027    import java.util.Set;
028    
029    import org.apache.commons.logging.Log;
030    import org.apache.commons.logging.LogFactory;
031    import org.apache.jcs.engine.behavior.ICacheListener;
032    import org.apache.jcs.engine.behavior.ICacheObserver;
033    
034    /**
035     * Intercepts the requests to the underlying ICacheObserver object so that the listeners can be
036     * recorded locally for remote connection recovery purposes. (Durable subscription like those in JMS
037     * is not implemented at this stage for it can be too expensive.)
038     */
039    public class CacheWatchRepairable
040        implements ICacheObserver
041    {
042        /** The logger */
043        private final static Log log = LogFactory.getLog( CacheWatchRepairable.class );
044    
045        /** the underlying ICacheObserver. */
046        private ICacheObserver cacheWatch;
047    
048        /** Map of cache regions. */
049        private final Map<String, Set<ICacheListener<? extends Serializable, ? extends Serializable>>> cacheMap =
050            new HashMap<String, Set<ICacheListener<? extends Serializable, ? extends Serializable>>>();
051    
052        /**
053         * Replaces the underlying cache watch service and reattached all existing listeners to the new
054         * cache watch.
055         * <p>
056         * @param cacheWatch The new cacheWatch value
057         */
058        public void setCacheWatch( ICacheObserver cacheWatch )
059        {
060            this.cacheWatch = cacheWatch;
061            synchronized ( cacheMap )
062            {
063                for (Map.Entry<String, Set<ICacheListener<? extends Serializable, ? extends Serializable>>> entry : cacheMap.entrySet())
064                {
065                    String cacheName = entry.getKey();
066                    for (ICacheListener<? extends Serializable, ? extends Serializable> listener : entry.getValue())
067                    {
068                        try
069                        {
070                            if ( log.isInfoEnabled() )
071                            {
072                                log.info( "Adding listener to cache watch. ICacheListener = " + listener
073                                    + " | ICacheObserver = " + cacheWatch );
074                            }
075                            cacheWatch.addCacheListener( cacheName, listener );
076                        }
077                        catch ( IOException ex )
078                        {
079                            log.error( "Problem adding listener. ICacheListener = " + listener + " | ICacheObserver = "
080                                + cacheWatch, ex );
081                        }
082                    }
083                }
084            }
085        }
086    
087        /**
088         * Adds a feature to the CacheListener attribute of the CacheWatchRepairable object
089         * <p>
090         * @param cacheName The feature to be added to the CacheListener attribute
091         * @param obj The feature to be added to the CacheListener attribute
092         * @throws IOException
093         */
094        public <K extends Serializable, V extends Serializable> void addCacheListener( String cacheName, ICacheListener<K, V> obj )
095            throws IOException
096        {
097            // Record the added cache listener locally, regardless of whether the
098            // remote add-listener operation succeeds or fails.
099            synchronized ( cacheMap )
100            {
101                Set<ICacheListener<? extends Serializable, ? extends Serializable>> listenerSet = cacheMap.get( cacheName );
102                if ( listenerSet == null )
103                {
104                    listenerSet = new HashSet<ICacheListener<? extends Serializable, ? extends Serializable>>();
105                    cacheMap.put( cacheName, listenerSet );
106                }
107                listenerSet.add( obj );
108            }
109            if ( log.isInfoEnabled() )
110            {
111                log.info( "Adding listener to cache watch. ICacheListener = " + obj
112                    + " | ICacheObserver = " + cacheWatch + " | cacheName = " + cacheName );
113            }
114            cacheWatch.addCacheListener( cacheName, obj );
115        }
116    
117        /**
118         * Adds a feature to the CacheListener attribute of the CacheWatchRepairable object
119         * <p>
120         * @param obj The feature to be added to the CacheListener attribute
121         * @throws IOException
122         */
123        public <K extends Serializable, V extends Serializable> void addCacheListener( ICacheListener<K, V> obj )
124            throws IOException
125        {
126            // Record the added cache listener locally, regardless of whether the
127            // remote add-listener operation succeeds or fails.
128            synchronized ( cacheMap )
129            {
130                for (Set<ICacheListener<? extends Serializable, ? extends Serializable>> listenerSet : cacheMap.values())
131                {
132                    listenerSet.add( obj );
133                }
134            }
135            if ( log.isInfoEnabled() )
136            {
137                log.info( "Adding listener to cache watch. ICacheListener = " + obj
138                    + " | ICacheObserver = " + cacheWatch );
139            }
140            cacheWatch.addCacheListener( obj );
141        }
142    
143        /**
144         * Tell the server to release us.
145         * <p>
146         * @param cacheName
147         * @param obj
148         * @throws IOException
149         */
150        public <K extends Serializable, V extends Serializable> void removeCacheListener( String cacheName, ICacheListener<K, V> obj )
151            throws IOException
152        {
153            if ( log.isInfoEnabled() )
154            {
155                log.info( "removeCacheListener, cacheName [" + cacheName + "]" );
156            }
157            // Record the removal locally, regardless of whether the remote
158            // remove-listener operation succeeds or fails.
159            synchronized ( cacheMap )
160            {
161                Set<ICacheListener<? extends Serializable, ? extends Serializable>> listenerSet = cacheMap.get( cacheName );
162                if ( listenerSet != null )
163                {
164                    listenerSet.remove( obj );
165                }
166            }
167            cacheWatch.removeCacheListener( cacheName, obj );
168        }
169    
170        /**
171         * @param obj
172         * @throws IOException
173         */
174        public <K extends Serializable, V extends Serializable> void removeCacheListener( ICacheListener<K, V> obj )
175            throws IOException
176        {
177            if ( log.isInfoEnabled() )
178            {
179                log.info( "removeCacheListener, ICacheListener [" + obj + "]" );
180            }
181    
182            // Record the removal locally, regardless of whether the remote
183            // remove-listener operation succeeds or fails.
184            synchronized ( cacheMap )
185            {
186                for (Set<ICacheListener<? extends Serializable, ? extends Serializable>> listenerSet : cacheMap.values())
187                {
188                    if ( log.isDebugEnabled() )
189                    {
190                        log.debug( "Before removing [" + obj + "] the listenerSet = " + listenerSet );
191                    }
192                    listenerSet.remove( obj );
193                }
194            }
195            cacheWatch.removeCacheListener( obj );
196        }
197    }