001package org.apache.commons.jcs3.engine;
002
003/*
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements.  See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership.  The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License.  You may obtain a copy of the License at
011 *
012 *   http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing,
015 * software distributed under the License is distributed on an
016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017 * KIND, either express or implied.  See the License for the
018 * specific language governing permissions and limitations
019 * under the License.
020 */
021
022import java.io.IOException;
023import java.util.concurrent.atomic.AtomicBoolean;
024
025import org.apache.commons.jcs3.engine.behavior.ICacheElement;
026import org.apache.commons.jcs3.engine.behavior.ICacheEventQueue;
027import org.apache.commons.jcs3.engine.behavior.ICacheListener;
028import org.apache.commons.jcs3.log.Log;
029import org.apache.commons.jcs3.log.LogManager;
030
031/**
032 * An abstract base class to the different implementations
033 */
034public abstract class AbstractCacheEventQueue<K, V>
035    implements ICacheEventQueue<K, V>
036{
037    /** The logger. */
038    private static final Log log = LogManager.getLog( AbstractCacheEventQueue.class );
039
040    /** default */
041    protected static final int DEFAULT_WAIT_TO_DIE_MILLIS = 10000;
042
043    /**
044     * time to wait for an event before snuffing the background thread if the queue is empty. make
045     * configurable later
046     */
047    private int waitToDieMillis = DEFAULT_WAIT_TO_DIE_MILLIS;
048
049    /**
050     * When the events are pulled off the queue, then tell the listener to handle the specific event
051     * type. The work is done by the listener.
052     */
053    private ICacheListener<K, V> listener;
054
055    /** Id of the listener registered with this queue */
056    private long listenerId;
057
058    /** The cache region name, if applicable. */
059    private String cacheName;
060
061    /** Maximum number of failures before we buy the farm. */
062    private int maxFailure;
063
064    /** in milliseconds */
065    private int waitBeforeRetry;
066
067    /**
068     * This means that the queue is functional. If we reached the max number of failures, the queue
069     * is marked as non functional and will never work again.
070     */
071    private final AtomicBoolean working = new AtomicBoolean(true);
072
073    /**
074     * Returns the time to wait for events before killing the background thread.
075     * <p>
076     * @return int
077     */
078    public int getWaitToDieMillis()
079    {
080        return waitToDieMillis;
081    }
082
083    /**
084     * Sets the time to wait for events before killing the background thread.
085     * <p>
086     * @param wtdm the ms for the q to sit idle.
087     */
088    public void setWaitToDieMillis( final int wtdm )
089    {
090        waitToDieMillis = wtdm;
091    }
092
093    /**
094     * Creates a brief string identifying the listener and the region.
095     * <p>
096     * @return String debugging info.
097     */
098    @Override
099    public String toString()
100    {
101        return "CacheEventQueue [listenerId=" + listenerId + ", cacheName=" + cacheName + "]";
102    }
103
104    /**
105     * @return The listenerId value
106     */
107    @Override
108    public long getListenerId()
109    {
110        return listenerId;
111    }
112
113    /**
114     * @return the cacheName
115     */
116    protected String getCacheName()
117    {
118        return cacheName;
119    }
120
121    /**
122     * Initializes the queue.
123     * <p>
124     * @param listener
125     * @param listenerId
126     * @param cacheName
127     * @param maxFailure
128     * @param waitBeforeRetry
129     */
130    protected void initialize( final ICacheListener<K, V> listener, final long listenerId, final String cacheName, final int maxFailure,
131                            final int waitBeforeRetry)
132    {
133        if ( listener == null )
134        {
135            throw new IllegalArgumentException( "listener must not be null" );
136        }
137
138        this.listener = listener;
139        this.listenerId = listenerId;
140        this.cacheName = cacheName;
141        this.maxFailure = maxFailure <= 0 ? 3 : maxFailure;
142        this.waitBeforeRetry = waitBeforeRetry <= 0 ? 500 : waitBeforeRetry;
143
144        log.debug( "Constructed: {0}", this );
145    }
146
147    /**
148     * This adds a put event to the queue. When it is processed, the element will be put to the
149     * listener.
150     * <p>
151     * @param ce The feature to be added to the PutEvent attribute
152     * @throws IOException
153     */
154    @Override
155    public void addPutEvent( final ICacheElement<K, V> ce )
156    {
157        put( new PutEvent( ce ) );
158    }
159
160    /**
161     * This adds a remove event to the queue. When processed the listener's remove method will be
162     * called for the key.
163     * <p>
164     * @param key The feature to be added to the RemoveEvent attribute
165     * @throws IOException
166     */
167    @Override
168    public void addRemoveEvent( final K key )
169    {
170        put( new RemoveEvent( key ) );
171    }
172
173    /**
174     * This adds a remove all event to the queue. When it is processed, all elements will be removed
175     * from the cache.
176     */
177    @Override
178    public void addRemoveAllEvent()
179    {
180        put( new RemoveAllEvent() );
181    }
182
183    /**
184     * This adds a dispose event to the queue. When it is processed, the cache is shut down
185     */
186    @Override
187    public void addDisposeEvent()
188    {
189        put( new DisposeEvent() );
190    }
191
192    /**
193     * Adds an event to the queue.
194     * <p>
195     * @param event
196     */
197    protected abstract void put( AbstractCacheEvent event );
198
199
200    // /////////////////////////// Inner classes /////////////////////////////
201    /**
202     * Retries before declaring failure.
203     */
204    protected abstract class AbstractCacheEvent implements Runnable
205    {
206        /**
207         * Main processing method for the AbstractCacheEvent object
208         */
209        @Override
210        public void run()
211        {
212            for (int failures = 0; failures < maxFailure; failures++)
213            {
214                try
215                {
216                    doRun();
217                    return;
218                }
219                catch (final IOException e)
220                {
221                    log.warn("Error while running event from Queue: {0}. "
222                            + "Retrying...", this, e);
223                }
224
225                try
226                {
227                    Thread.sleep( waitBeforeRetry );
228                }
229                catch ( final InterruptedException ie )
230                {
231                    log.warn("Interrupted while sleeping for retry on event "
232                            + "{0}.", this, ie);
233                    break;
234                }
235            }
236
237            log.warn( "Dropping Event and marking Event Queue {0} as "
238                    + "non-functional.", this );
239            destroy();
240        }
241
242        /**
243         * @throws IOException
244         */
245        protected abstract void doRun()
246            throws IOException;
247    }
248
249    /**
250     * An element should be put in the cache.
251     */
252    protected class PutEvent
253        extends AbstractCacheEvent
254    {
255        /** The element to put to the listener */
256        private final ICacheElement<K, V> ice;
257
258        /**
259         * Constructor for the PutEvent object.
260         * <p>
261         * @param ice
262         */
263        PutEvent( final ICacheElement<K, V> ice )
264        {
265            this.ice = ice;
266        }
267
268        /**
269         * Call put on the listener.
270         * <p>
271         * @throws IOException
272         */
273        @Override
274        protected void doRun()
275            throws IOException
276        {
277            listener.handlePut( ice );
278        }
279
280        /**
281         * For debugging.
282         * <p>
283         * @return Info on the key and value.
284         */
285        @Override
286        public String toString()
287        {
288            return new StringBuilder( "PutEvent for key: " )
289                    .append( ice.getKey() )
290                    .append( " value: " )
291                    .append( ice.getVal() )
292                    .toString();
293        }
294
295    }
296
297    /**
298     * An element should be removed from the cache.
299     */
300    protected class RemoveEvent
301        extends AbstractCacheEvent
302    {
303        /** The key to remove from the listener */
304        private final K key;
305
306        /**
307         * Constructor for the RemoveEvent object
308         * <p>
309         * @param key
310         */
311        RemoveEvent( final K key )
312        {
313            this.key = key;
314        }
315
316        /**
317         * Call remove on the listener.
318         * <p>
319         * @throws IOException
320         */
321        @Override
322        protected void doRun()
323            throws IOException
324        {
325            listener.handleRemove( cacheName, key );
326        }
327
328        /**
329         * For debugging.
330         * <p>
331         * @return Info on the key to remove.
332         */
333        @Override
334        public String toString()
335        {
336            return new StringBuilder( "RemoveEvent for " )
337                    .append( key )
338                    .toString();
339        }
340
341    }
342
343    /**
344     * All elements should be removed from the cache when this event is processed.
345     */
346    protected class RemoveAllEvent
347        extends AbstractCacheEvent
348    {
349        /**
350         * Call removeAll on the listener.
351         * <p>
352         * @throws IOException
353         */
354        @Override
355        protected void doRun()
356            throws IOException
357        {
358            listener.handleRemoveAll( cacheName );
359        }
360
361        /**
362         * For debugging.
363         * <p>
364         * @return The name of the event.
365         */
366        @Override
367        public String toString()
368        {
369            return "RemoveAllEvent";
370        }
371    }
372
373    /**
374     * The cache should be disposed when this event is processed.
375     */
376    protected class DisposeEvent
377        extends AbstractCacheEvent
378    {
379        /**
380         * Called when gets to the end of the queue
381         *
382         * @throws IOException
383         */
384        @Override
385        protected void doRun()
386            throws IOException
387        {
388            listener.handleDispose( cacheName );
389        }
390
391        /**
392         * For debugging.
393         *
394         * @return The name of the event.
395         */
396        @Override
397        public String toString()
398        {
399            return "DisposeEvent";
400        }
401    }
402
403    /**
404     * @return whether the queue is functional.
405     */
406    @Override
407    public boolean isWorking()
408    {
409        return working.get();
410    }
411
412    /**
413     * This means that the queue is functional. If we reached the max number of failures, the queue
414     * is marked as non functional and will never work again.
415     * <p>
416     * @param b
417     */
418    public void setWorking( final boolean b )
419    {
420        working.set(b);
421    }
422}