001package org.apache.commons.jcs3.engine;
002
003/*
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements.  See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership.  The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License.  You may obtain a copy of the License at
011 *
012 *   http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing,
015 * software distributed under the License is distributed on an
016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017 * KIND, either express or implied.  See the License for the
018 * specific language governing permissions and limitations
019 * under the License.
020 */
021
022import java.io.IOException;
023import java.util.Collections;
024import java.util.HashMap;
025import java.util.Map;
026import java.util.Set;
027import java.util.concurrent.ConcurrentLinkedQueue;
028
029import org.apache.commons.jcs3.engine.behavior.ICacheElement;
030import org.apache.commons.jcs3.engine.behavior.ICacheServiceNonLocal;
031import org.apache.commons.jcs3.log.Log;
032import org.apache.commons.jcs3.log.LogManager;
033import org.apache.commons.jcs3.utils.timing.ElapsedTimer;
034
035/**
036 * Zombie adapter for the non local cache services. It just balks if there is no queue configured.
037 * <p>
038 * If a queue is configured, then events will be added to the queue. The idea is that when proper
039 * operation is restored, the non local cache will walk the queue. The queue must be bounded so it
040 * does not eat memory.
041 * <p>
042 * This originated in the remote cache.
043 */
044public class ZombieCacheServiceNonLocal<K, V>
045    extends ZombieCacheService<K, V>
046    implements ICacheServiceNonLocal<K, V>
047{
048    /** The logger */
049    private static final Log log = LogManager.getLog( ZombieCacheServiceNonLocal.class );
050
051    /** How big can the queue grow. */
052    private int maxQueueSize;
053
054    /** The queue */
055    private final ConcurrentLinkedQueue<ZombieEvent> queue;
056
057    /**
058     * Default.
059     */
060    public ZombieCacheServiceNonLocal()
061    {
062        queue = new ConcurrentLinkedQueue<>();
063    }
064
065    /**
066     * Sets the maximum number of items that will be allowed on the queue.
067     * <p>
068     * @param maxQueueSize
069     */
070    public ZombieCacheServiceNonLocal( final int maxQueueSize )
071    {
072        this();
073        this.maxQueueSize = maxQueueSize;
074    }
075
076    /**
077     * Gets the number of items on the queue.
078     * <p>
079     * @return size of the queue.
080     */
081    public int getQueueSize()
082    {
083        return queue.size();
084    }
085
086    private void addQueue(final ZombieEvent event)
087    {
088        queue.add(event);
089        if (queue.size() > maxQueueSize)
090        {
091            queue.poll(); // drop oldest entry
092        }
093    }
094
095    /**
096     * Adds an update event to the queue if the maxSize is greater than 0;
097     * <p>
098     * @param item ICacheElement
099     * @param listenerId - identifies the caller.
100     */
101    @Override
102    public void update( final ICacheElement<K, V> item, final long listenerId )
103    {
104        if ( maxQueueSize > 0 )
105        {
106            final PutEvent<K, V> event = new PutEvent<>( item, listenerId );
107            addQueue( event );
108        }
109        // Zombies have no inner life
110    }
111
112    /**
113     * Adds a removeAll event to the queue if the maxSize is greater than 0;
114     * <p>
115     * @param cacheName - region name
116     * @param key - item key
117     * @param listenerId - identifies the caller.
118     */
119    @Override
120    public void remove( final String cacheName, final K key, final long listenerId )
121    {
122        if ( maxQueueSize > 0 )
123        {
124            final RemoveEvent<K> event = new RemoveEvent<>( cacheName, key, listenerId );
125            addQueue( event );
126        }
127        // Zombies have no inner life
128    }
129
130    /**
131     * Adds a removeAll event to the queue if the maxSize is greater than 0;
132     * <p>
133     * @param cacheName - name of the region
134     * @param listenerId - identifies the caller.
135     */
136    @Override
137    public void removeAll( final String cacheName, final long listenerId )
138    {
139        if ( maxQueueSize > 0 )
140        {
141            final RemoveAllEvent event = new RemoveAllEvent( cacheName, listenerId );
142            addQueue( event );
143        }
144        // Zombies have no inner life
145    }
146
147    /**
148     * Does nothing. Gets are synchronous and cannot be added to a queue.
149     * <p>
150     * @param cacheName - region name
151     * @param key - item key
152     * @param requesterId - identifies the caller.
153     * @return null
154     * @throws IOException
155     */
156    @Override
157    public ICacheElement<K, V> get( final String cacheName, final K key, final long requesterId )
158        throws IOException
159    {
160        // Zombies have no inner life
161        return null;
162    }
163
164    /**
165     * Does nothing.
166     * <p>
167     * @param cacheName
168     * @param pattern
169     * @param requesterId
170     * @return empty map
171     * @throws IOException
172     */
173    @Override
174    public Map<K, ICacheElement<K, V>> getMatching( final String cacheName, final String pattern, final long requesterId )
175        throws IOException
176    {
177        return Collections.emptyMap();
178    }
179
180    /**
181     * @param cacheName - region name
182     * @param keys - item key
183     * @param requesterId - identity of the caller
184     * @return an empty map. zombies have no internal data
185     */
186    @Override
187    public Map<K, ICacheElement<K, V>> getMultiple( final String cacheName, final Set<K> keys, final long requesterId )
188    {
189        return new HashMap<>();
190    }
191
192    /**
193     * Does nothing.
194     * <p>
195     * @param cacheName - region name
196     * @return empty set
197     */
198    @Override
199    public Set<K> getKeySet( final String cacheName )
200    {
201        return Collections.emptySet();
202    }
203
204    /**
205     * Walk the queue, calling the service for each queue operation.
206     * <p>
207     * @param service
208     * @throws Exception
209     */
210    public synchronized void propagateEvents( final ICacheServiceNonLocal<K, V> service )
211        throws Exception
212    {
213        int cnt = 0;
214        log.info( "Propagating events to the new ICacheServiceNonLocal." );
215        final ElapsedTimer timer = new ElapsedTimer();
216        while ( !queue.isEmpty() )
217        {
218            cnt++;
219
220            // for each item, call the appropriate service method
221            final ZombieEvent event = queue.poll();
222
223            if ( event instanceof PutEvent )
224            {
225                @SuppressWarnings("unchecked") // Type checked by instanceof
226                final
227                PutEvent<K, V> putEvent = (PutEvent<K, V>) event;
228                service.update( putEvent.element, event.requesterId );
229            }
230            else if ( event instanceof RemoveEvent )
231            {
232                @SuppressWarnings("unchecked") // Type checked by instanceof
233                final
234                RemoveEvent<K> removeEvent = (RemoveEvent<K>) event;
235                service.remove( event.cacheName, removeEvent.key, event.requesterId );
236            }
237            else if ( event instanceof RemoveAllEvent )
238            {
239                service.removeAll( event.cacheName, event.requesterId );
240            }
241        }
242        log.info( "Propagated {0} events to the new ICacheServiceNonLocal in {1}",
243                cnt, timer.getElapsedTimeString() );
244    }
245
246    /**
247     * Base of the other events.
248     */
249    protected static abstract class ZombieEvent
250    {
251        /** The name of the region. */
252        String cacheName;
253
254        /** The id of the requester */
255        long requesterId;
256    }
257
258    /**
259     * A basic put event.
260     */
261    private static class PutEvent<K, V>
262        extends ZombieEvent
263    {
264        /** The element to put */
265        final ICacheElement<K, V> element;
266
267        /**
268         * Set the element
269         * @param element
270         * @param requesterId
271         */
272        public PutEvent( final ICacheElement<K, V> element, final long requesterId )
273        {
274            this.requesterId = requesterId;
275            this.element = element;
276        }
277    }
278
279    /**
280     * A basic Remove event.
281     */
282    private static class RemoveEvent<K>
283        extends ZombieEvent
284    {
285        /** The key to remove */
286        final K key;
287
288        /**
289         * Set the element
290         * @param cacheName
291         * @param key
292         * @param requesterId
293         */
294        public RemoveEvent( final String cacheName, final K key, final long requesterId )
295        {
296            this.cacheName = cacheName;
297            this.requesterId = requesterId;
298            this.key = key;
299        }
300    }
301
302    /**
303     * A basic RemoveAll event.
304     */
305    private static class RemoveAllEvent
306        extends ZombieEvent
307    {
308        /**
309         * @param cacheName
310         * @param requesterId
311         */
312        public RemoveAllEvent( final String cacheName, final long requesterId )
313        {
314            this.cacheName = cacheName;
315            this.requesterId = requesterId;
316        }
317    }
318}