001package org.apache.commons.jcs.engine;
002
003import java.io.IOException;
004import java.util.Map;
005import java.util.Set;
006import java.util.concurrent.ConcurrentHashMap;
007import java.util.concurrent.ConcurrentMap;
008import java.util.concurrent.CopyOnWriteArraySet;
009
010/*
011 * Licensed to the Apache Software Foundation (ASF) under one
012 * or more contributor license agreements.  See the NOTICE file
013 * distributed with this work for additional information
014 * regarding copyright ownership.  The ASF licenses this file
015 * to you under the Apache License, Version 2.0 (the
016 * "License"); you may not use this file except in compliance
017 * with the License.  You may obtain a copy of the License at
018 *
019 *   http://www.apache.org/licenses/LICENSE-2.0
020 *
021 * Unless required by applicable law or agreed to in writing,
022 * software distributed under the License is distributed on an
023 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
024 * KIND, either express or implied.  See the License for the
025 * specific language governing permissions and limitations
026 * under the License.
027 */
028
029import org.apache.commons.jcs.engine.behavior.ICacheListener;
030import org.apache.commons.jcs.engine.behavior.ICacheObserver;
031import org.apache.commons.logging.Log;
032import org.apache.commons.logging.LogFactory;
033
034/**
035 * Intercepts the requests to the underlying ICacheObserver object so that the listeners can be
036 * recorded locally for remote connection recovery purposes. (Durable subscription like those in JMS
037 * is not implemented at this stage for it can be too expensive.)
038 */
039public class CacheWatchRepairable
040    implements ICacheObserver
041{
042    /** The logger */
043    private static final Log log = LogFactory.getLog( CacheWatchRepairable.class );
044
045    /** the underlying ICacheObserver. */
046    private ICacheObserver cacheWatch;
047
048    /** Map of cache regions. */
049    private final ConcurrentMap<String, Set<ICacheListener<?, ?>>> cacheMap =
050        new ConcurrentHashMap<String, Set<ICacheListener<?, ?>>>();
051
052    /**
053     * Replaces the underlying cache watch service and re-attaches all existing listeners to the new
054     * cache watch.
055     * <p>
056     * @param cacheWatch The new cacheWatch value
057     */
058    public void setCacheWatch( ICacheObserver cacheWatch )
059    {
060        this.cacheWatch = cacheWatch;
061        for (Map.Entry<String, Set<ICacheListener<?, ?>>> entry : cacheMap.entrySet())
062        {
063            String cacheName = entry.getKey();
064            for (ICacheListener<?, ?> listener : entry.getValue())
065            {
066                try
067                {
068                    if ( log.isInfoEnabled() )
069                    {
070                        log.info( "Adding listener to cache watch. ICacheListener = " + listener
071                            + " | ICacheObserver = " + cacheWatch );
072                    }
073                    cacheWatch.addCacheListener( cacheName, listener );
074                }
075                catch ( IOException ex )
076                {
077                    log.error( "Problem adding listener. ICacheListener = " + listener + " | ICacheObserver = "
078                        + cacheWatch, ex );
079                }
080            }
081        }
082    }
083
084    /**
085     * Adds a feature to the CacheListener attribute of the CacheWatchRepairable object
086     * <p>
087     * @param cacheName The feature to be added to the CacheListener attribute
088     * @param obj The feature to be added to the CacheListener attribute
089     * @throws IOException
090     */
091    @Override
092    public <K, V> void addCacheListener( String cacheName, ICacheListener<K, V> obj )
093        throws IOException
094    {
095        // Record the added cache listener locally, regardless of whether the
096        // remote add-listener operation succeeds or fails.
097        Set<ICacheListener<?, ?>> listenerSet = cacheMap.get( cacheName );
098        if ( listenerSet == null )
099        {
100            Set<ICacheListener<?, ?>> newListenerSet = new CopyOnWriteArraySet<ICacheListener<?, ?>>();
101            listenerSet = cacheMap.putIfAbsent( cacheName, newListenerSet );
102
103            if (listenerSet == null)
104            {
105                listenerSet = newListenerSet;
106            }
107        }
108
109        listenerSet.add( obj );
110
111        if ( log.isInfoEnabled() )
112        {
113            log.info( "Adding listener to cache watch. ICacheListener = " + obj
114                + " | ICacheObserver = " + cacheWatch + " | cacheName = " + cacheName );
115        }
116        cacheWatch.addCacheListener( cacheName, obj );
117    }
118
119    /**
120     * Adds a feature to the CacheListener attribute of the CacheWatchRepairable object
121     * <p>
122     * @param obj The feature to be added to the CacheListener attribute
123     * @throws IOException
124     */
125    @Override
126    public <K, V> void addCacheListener( ICacheListener<K, V> obj )
127        throws IOException
128    {
129        // Record the added cache listener locally, regardless of whether the
130        // remote add-listener operation succeeds or fails.
131        for (Set<ICacheListener<?, ?>> listenerSet : cacheMap.values())
132        {
133            listenerSet.add( obj );
134        }
135
136        if ( log.isInfoEnabled() )
137        {
138            log.info( "Adding listener to cache watch. ICacheListener = " + obj
139                + " | ICacheObserver = " + cacheWatch );
140        }
141        cacheWatch.addCacheListener( obj );
142    }
143
144    /**
145     * Tell the server to release us.
146     * <p>
147     * @param cacheName
148     * @param obj
149     * @throws IOException
150     */
151    @Override
152    public <K, V> void removeCacheListener( String cacheName, ICacheListener<K, V> obj )
153        throws IOException
154    {
155        if ( log.isInfoEnabled() )
156        {
157            log.info( "removeCacheListener, cacheName [" + cacheName + "]" );
158        }
159        // Record the removal locally, regardless of whether the remote
160        // remove-listener operation succeeds or fails.
161        Set<ICacheListener<?, ?>> listenerSet = cacheMap.get( cacheName );
162        if ( listenerSet != null )
163        {
164            listenerSet.remove( obj );
165        }
166        cacheWatch.removeCacheListener( cacheName, obj );
167    }
168
169    /**
170     * @param obj
171     * @throws IOException
172     */
173    @Override
174    public <K, V> void removeCacheListener( ICacheListener<K, V> obj )
175        throws IOException
176    {
177        if ( log.isInfoEnabled() )
178        {
179            log.info( "removeCacheListener, ICacheListener [" + obj + "]" );
180        }
181
182        // Record the removal locally, regardless of whether the remote
183        // remove-listener operation succeeds or fails.
184        for (Set<ICacheListener<?, ?>> listenerSet : cacheMap.values())
185        {
186            if ( log.isDebugEnabled() )
187            {
188                log.debug( "Before removing [" + obj + "] the listenerSet = " + listenerSet );
189            }
190            listenerSet.remove( obj );
191        }
192        cacheWatch.removeCacheListener( obj );
193    }
194}