View Javadoc
1   package org.apache.commons.jcs.engine;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.IOException;
23  import java.util.Collections;
24  import java.util.HashMap;
25  import java.util.Map;
26  import java.util.Set;
27  import java.util.concurrent.ConcurrentLinkedQueue;
28  
29  import org.apache.commons.jcs.engine.behavior.ICacheElement;
30  import org.apache.commons.jcs.engine.behavior.ICacheServiceNonLocal;
31  import org.apache.commons.jcs.utils.timing.ElapsedTimer;
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  
35  /**
36   * Zombie adapter for the non local cache services. It just balks if there is no queue configured.
37   * <p>
38   * If a queue is configured, then events will be added to the queue. The idea is that when proper
39   * operation is restored, the non local cache will walk the queue. The queue must be bounded so it
40   * does not eat memory.
41   * <p>
42   * This originated in the remote cache.
43   */
44  public class ZombieCacheServiceNonLocal<K, V>
45      extends ZombieCacheService<K, V>
46      implements ICacheServiceNonLocal<K, V>
47  {
48      /** The logger */
49      private static final Log log = LogFactory.getLog( ZombieCacheServiceNonLocal.class );
50  
51      /** How big can the queue grow. */
52      private int maxQueueSize = 0;
53  
54      /** The queue */
55      private final ConcurrentLinkedQueue<ZombieEvent> queue;
56  
57      /**
58       * Default.
59       */
60      public ZombieCacheServiceNonLocal()
61      {
62          queue = new ConcurrentLinkedQueue<ZombieEvent>();
63      }
64  
65      /**
66       * Sets the maximum number of items that will be allowed on the queue.
67       * <p>
68       * @param maxQueueSize
69       */
70      public ZombieCacheServiceNonLocal( int maxQueueSize )
71      {
72          this.maxQueueSize = maxQueueSize;
73          queue = new ConcurrentLinkedQueue<ZombieEvent>();
74      }
75  
76      /**
77       * Gets the number of items on the queue.
78       * <p>
79       * @return size of the queue.
80       */
81      public int getQueueSize()
82      {
83          return queue.size();
84      }
85  
86      private void addQueue(ZombieEvent event)
87      {
88          queue.add(event);
89          if (queue.size() > maxQueueSize)
90          {
91              queue.poll(); // drop oldest entry
92          }
93      }
94  
95      /**
96       * Adds an update event to the queue if the maxSize is greater than 0;
97       * <p>
98       * @param item ICacheElement
99       * @param listenerId - identifies the caller.
100      */
101     @Override
102     public void update( ICacheElement<K, V> item, long listenerId )
103     {
104         if ( maxQueueSize > 0 )
105         {
106             PutEvent<K, V> event = new PutEvent<K, V>( item, listenerId );
107             addQueue( event );
108         }
109         // Zombies have no inner life
110     }
111 
112     /**
113      * Adds a removeAll event to the queue if the maxSize is greater than 0;
114      * <p>
115      * @param cacheName - region name
116      * @param key - item key
117      * @param listenerId - identifies the caller.
118      */
119     @Override
120     public void remove( String cacheName, K key, long listenerId )
121     {
122         if ( maxQueueSize > 0 )
123         {
124             RemoveEvent<K> event = new RemoveEvent<K>( cacheName, key, listenerId );
125             addQueue( event );
126         }
127         // Zombies have no inner life
128     }
129 
130     /**
131      * Adds a removeAll event to the queue if the maxSize is greater than 0;
132      * <p>
133      * @param cacheName - name of the region
134      * @param listenerId - identifies the caller.
135      */
136     @Override
137     public void removeAll( String cacheName, long listenerId )
138     {
139         if ( maxQueueSize > 0 )
140         {
141             RemoveAllEvent event = new RemoveAllEvent( cacheName, listenerId );
142             addQueue( event );
143         }
144         // Zombies have no inner life
145     }
146 
147     /**
148      * Does nothing. Gets are synchronous and cannot be added to a queue.
149      * <p>
150      * @param cacheName - region name
151      * @param key - item key
152      * @param requesterId - identifies the caller.
153      * @return null
154      * @throws IOException
155      */
156     @Override
157     public ICacheElement<K, V> get( String cacheName, K key, long requesterId )
158         throws IOException
159     {
160         // Zombies have no inner life
161         return null;
162     }
163 
164     /**
165      * Does nothing.
166      * <p>
167      * @param cacheName
168      * @param pattern
169      * @param requesterId
170      * @return empty map
171      * @throws IOException
172      */
173     @Override
174     public Map<K, ICacheElement<K, V>> getMatching( String cacheName, String pattern, long requesterId )
175         throws IOException
176     {
177         return Collections.emptyMap();
178     }
179 
180     /**
181      * @param cacheName - region name
182      * @param keys - item key
183      * @param requesterId - identity of the caller
184      * @return an empty map. zombies have no internal data
185      */
186     @Override
187     public Map<K, ICacheElement<K, V>> getMultiple( String cacheName, Set<K> keys, long requesterId )
188     {
189         return new HashMap<K, ICacheElement<K, V>>();
190     }
191 
192     /**
193      * Does nothing.
194      * <p>
195      * @param cacheName - region name
196      * @return empty set
197      */
198     @Override
199     public Set<K> getKeySet( String cacheName )
200     {
201         return Collections.emptySet();
202     }
203 
204     /**
205      * Walk the queue, calling the service for each queue operation.
206      * <p>
207      * @param service
208      * @throws Exception
209      */
210     public synchronized void propagateEvents( ICacheServiceNonLocal<K, V> service )
211         throws Exception
212     {
213         int cnt = 0;
214         if ( log.isInfoEnabled() )
215         {
216             log.info( "Propagating events to the new ICacheServiceNonLocal." );
217         }
218         ElapsedTimer timer = new ElapsedTimer();
219         while ( !queue.isEmpty() )
220         {
221             cnt++;
222 
223             // for each item, call the appropriate service method
224             ZombieEvent event = queue.poll();
225 
226             if ( event instanceof PutEvent )
227             {
228                 @SuppressWarnings("unchecked") // Type checked by instanceof
229                 PutEvent<K, V> putEvent = (PutEvent<K, V>) event;
230                 service.update( putEvent.element, event.requesterId );
231             }
232             else if ( event instanceof RemoveEvent )
233             {
234                 @SuppressWarnings("unchecked") // Type checked by instanceof
235                 RemoveEvent<K> removeEvent = (RemoveEvent<K>) event;
236                 service.remove( event.cacheName, removeEvent.key, event.requesterId );
237             }
238             else if ( event instanceof RemoveAllEvent )
239             {
240                 service.removeAll( event.cacheName, event.requesterId );
241             }
242         }
243         if ( log.isInfoEnabled() )
244         {
245             log.info( "Propagated " + cnt + " events to the new ICacheServiceNonLocal in "
246                 + timer.getElapsedTimeString() );
247         }
248     }
249 
250     /**
251      * Base of the other events.
252      */
253     protected static abstract class ZombieEvent
254     {
255         /** The name of the region. */
256         String cacheName;
257 
258         /** The id of the requester */
259         long requesterId;
260     }
261 
262     /**
263      * A basic put event.
264      */
265     private static class PutEvent<K, V>
266         extends ZombieEvent
267     {
268         /** The element to put */
269         ICacheElement<K, V> element;
270 
271         /**
272          * Set the element
273          * @param element
274          * @param requesterId
275          */
276         public PutEvent( ICacheElement<K, V> element, long requesterId )
277         {
278             this.requesterId = requesterId;
279             this.element = element;
280         }
281     }
282 
283     /**
284      * A basic Remove event.
285      */
286     private static class RemoveEvent<K>
287         extends ZombieEvent
288     {
289         /** The key to remove */
290         K key;
291 
292         /**
293          * Set the element
294          * @param cacheName
295          * @param key
296          * @param requesterId
297          */
298         public RemoveEvent( String cacheName, K key, long requesterId )
299         {
300             this.cacheName = cacheName;
301             this.requesterId = requesterId;
302             this.key = key;
303         }
304     }
305 
306     /**
307      * A basic RemoveAll event.
308      */
309     private static class RemoveAllEvent
310         extends ZombieEvent
311     {
312         /**
313          * @param cacheName
314          * @param requesterId
315          */
316         public RemoveAllEvent( String cacheName, long requesterId )
317         {
318             this.cacheName = cacheName;
319             this.requesterId = requesterId;
320         }
321     }
322 }