View Javadoc
1   package org.apache.commons.jcs.engine;
2   
3   import java.io.IOException;
4   import java.util.Map;
5   import java.util.Set;
6   import java.util.concurrent.ConcurrentHashMap;
7   import java.util.concurrent.ConcurrentMap;
8   import java.util.concurrent.CopyOnWriteArraySet;
9   
10  /*
11   * Licensed to the Apache Software Foundation (ASF) under one
12   * or more contributor license agreements.  See the NOTICE file
13   * distributed with this work for additional information
14   * regarding copyright ownership.  The ASF licenses this file
15   * to you under the Apache License, Version 2.0 (the
16   * "License"); you may not use this file except in compliance
17   * with the License.  You may obtain a copy of the License at
18   *
19   *   http://www.apache.org/licenses/LICENSE-2.0
20   *
21   * Unless required by applicable law or agreed to in writing,
22   * software distributed under the License is distributed on an
23   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
24   * KIND, either express or implied.  See the License for the
25   * specific language governing permissions and limitations
26   * under the License.
27   */
28  
29  import org.apache.commons.jcs.engine.behavior.ICacheListener;
30  import org.apache.commons.jcs.engine.behavior.ICacheObserver;
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  
34  /**
35   * Intercepts the requests to the underlying ICacheObserver object so that the listeners can be
36   * recorded locally for remote connection recovery purposes. (Durable subscription like those in JMS
37   * is not implemented at this stage for it can be too expensive.)
38   */
39  public class CacheWatchRepairable
40      implements ICacheObserver
41  {
42      /** The logger */
43      private static final Log log = LogFactory.getLog( CacheWatchRepairable.class );
44  
45      /** the underlying ICacheObserver. */
46      private ICacheObserver cacheWatch;
47  
48      /** Map of cache regions. */
49      private final ConcurrentMap<String, Set<ICacheListener<?, ?>>> cacheMap =
50          new ConcurrentHashMap<String, Set<ICacheListener<?, ?>>>();
51  
52      /**
53       * Replaces the underlying cache watch service and re-attaches all existing listeners to the new
54       * cache watch.
55       * <p>
56       * @param cacheWatch The new cacheWatch value
57       */
58      public void setCacheWatch( ICacheObserver cacheWatch )
59      {
60          this.cacheWatch = cacheWatch;
61          for (Map.Entry<String, Set<ICacheListener<?, ?>>> entry : cacheMap.entrySet())
62          {
63              String cacheName = entry.getKey();
64              for (ICacheListener<?, ?> listener : entry.getValue())
65              {
66                  try
67                  {
68                      if ( log.isInfoEnabled() )
69                      {
70                          log.info( "Adding listener to cache watch. ICacheListener = " + listener
71                              + " | ICacheObserver = " + cacheWatch );
72                      }
73                      cacheWatch.addCacheListener( cacheName, listener );
74                  }
75                  catch ( IOException ex )
76                  {
77                      log.error( "Problem adding listener. ICacheListener = " + listener + " | ICacheObserver = "
78                          + cacheWatch, ex );
79                  }
80              }
81          }
82      }
83  
84      /**
85       * Adds a feature to the CacheListener attribute of the CacheWatchRepairable object
86       * <p>
87       * @param cacheName The feature to be added to the CacheListener attribute
88       * @param obj The feature to be added to the CacheListener attribute
89       * @throws IOException
90       */
91      @Override
92      public <K, V> void addCacheListener( String cacheName, ICacheListener<K, V> obj )
93          throws IOException
94      {
95          // Record the added cache listener locally, regardless of whether the
96          // remote add-listener operation succeeds or fails.
97          Set<ICacheListener<?, ?>> listenerSet = cacheMap.get( cacheName );
98          if ( listenerSet == null )
99          {
100             Set<ICacheListener<?, ?>> newListenerSet = new CopyOnWriteArraySet<ICacheListener<?, ?>>();
101             listenerSet = cacheMap.putIfAbsent( cacheName, newListenerSet );
102 
103             if (listenerSet == null)
104             {
105                 listenerSet = newListenerSet;
106             }
107         }
108 
109         listenerSet.add( obj );
110 
111         if ( log.isInfoEnabled() )
112         {
113             log.info( "Adding listener to cache watch. ICacheListener = " + obj
114                 + " | ICacheObserver = " + cacheWatch + " | cacheName = " + cacheName );
115         }
116         cacheWatch.addCacheListener( cacheName, obj );
117     }
118 
119     /**
120      * Adds a feature to the CacheListener attribute of the CacheWatchRepairable object
121      * <p>
122      * @param obj The feature to be added to the CacheListener attribute
123      * @throws IOException
124      */
125     @Override
126     public <K, V> void addCacheListener( ICacheListener<K, V> obj )
127         throws IOException
128     {
129         // Record the added cache listener locally, regardless of whether the
130         // remote add-listener operation succeeds or fails.
131         for (Set<ICacheListener<?, ?>> listenerSet : cacheMap.values())
132         {
133             listenerSet.add( obj );
134         }
135 
136         if ( log.isInfoEnabled() )
137         {
138             log.info( "Adding listener to cache watch. ICacheListener = " + obj
139                 + " | ICacheObserver = " + cacheWatch );
140         }
141         cacheWatch.addCacheListener( obj );
142     }
143 
144     /**
145      * Tell the server to release us.
146      * <p>
147      * @param cacheName
148      * @param obj
149      * @throws IOException
150      */
151     @Override
152     public <K, V> void removeCacheListener( String cacheName, ICacheListener<K, V> obj )
153         throws IOException
154     {
155         if ( log.isInfoEnabled() )
156         {
157             log.info( "removeCacheListener, cacheName [" + cacheName + "]" );
158         }
159         // Record the removal locally, regardless of whether the remote
160         // remove-listener operation succeeds or fails.
161         Set<ICacheListener<?, ?>> listenerSet = cacheMap.get( cacheName );
162         if ( listenerSet != null )
163         {
164             listenerSet.remove( obj );
165         }
166         cacheWatch.removeCacheListener( cacheName, obj );
167     }
168 
169     /**
170      * @param obj
171      * @throws IOException
172      */
173     @Override
174     public <K, V> void removeCacheListener( ICacheListener<K, V> obj )
175         throws IOException
176     {
177         if ( log.isInfoEnabled() )
178         {
179             log.info( "removeCacheListener, ICacheListener [" + obj + "]" );
180         }
181 
182         // Record the removal locally, regardless of whether the remote
183         // remove-listener operation succeeds or fails.
184         for (Set<ICacheListener<?, ?>> listenerSet : cacheMap.values())
185         {
186             if ( log.isDebugEnabled() )
187             {
188                 log.debug( "Before removing [" + obj + "] the listenerSet = " + listenerSet );
189             }
190             listenerSet.remove( obj );
191         }
192         cacheWatch.removeCacheListener( obj );
193     }
194 }