001package org.apache.commons.jcs.engine;
002
003import java.io.IOException;
004import java.util.concurrent.atomic.AtomicBoolean;
005
006/*
007 * Licensed to the Apache Software Foundation (ASF) under one
008 * or more contributor license agreements.  See the NOTICE file
009 * distributed with this work for additional information
010 * regarding copyright ownership.  The ASF licenses this file
011 * to you under the Apache License, Version 2.0 (the
012 * "License"); you may not use this file except in compliance
013 * with the License.  You may obtain a copy of the License at
014 *
015 *   http://www.apache.org/licenses/LICENSE-2.0
016 *
017 * Unless required by applicable law or agreed to in writing,
018 * software distributed under the License is distributed on an
019 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
020 * KIND, either express or implied.  See the License for the
021 * specific language governing permissions and limitations
022 * under the License.
023 */
024
025import org.apache.commons.jcs.engine.behavior.ICacheElement;
026import org.apache.commons.jcs.engine.behavior.ICacheEventQueue;
027import org.apache.commons.jcs.engine.behavior.ICacheListener;
028import org.apache.commons.logging.Log;
029import org.apache.commons.logging.LogFactory;
030
031/**
032 * An abstract base class to the different implementations
033 */
034public abstract class AbstractCacheEventQueue<K, V>
035    implements ICacheEventQueue<K, V>
036{
037    /** The logger. */
038    private static final Log log = LogFactory.getLog( AbstractCacheEventQueue.class );
039
040    /** default */
041    protected static final int DEFAULT_WAIT_TO_DIE_MILLIS = 10000;
042
043    /**
044     * time to wait for an event before snuffing the background thread if the queue is empty. make
045     * configurable later
046     */
047    private int waitToDieMillis = DEFAULT_WAIT_TO_DIE_MILLIS;
048
049    /**
050     * When the events are pulled off the queue, then tell the listener to handle the specific event
051     * type. The work is done by the listener.
052     */
053    private ICacheListener<K, V> listener;
054
055    /** Id of the listener registered with this queue */
056    private long listenerId;
057
058    /** The cache region name, if applicable. */
059    private String cacheName;
060
061    /** Maximum number of failures before we buy the farm. */
062    private int maxFailure;
063
064    /** in milliseconds */
065    private int waitBeforeRetry;
066
067    /** this is true if there is any worker thread. */
068    private final AtomicBoolean alive = new AtomicBoolean(false);
069
070    /**
071     * This means that the queue is functional. If we reached the max number of failures, the queue
072     * is marked as non functional and will never work again.
073     */
074    private final AtomicBoolean working = new AtomicBoolean(true);
075
076    /**
077     * Returns the time to wait for events before killing the background thread.
078     * <p>
079     * @return int
080     */
081    public int getWaitToDieMillis()
082    {
083        return waitToDieMillis;
084    }
085
086    /**
087     * Sets the time to wait for events before killing the background thread.
088     * <p>
089     * @param wtdm the ms for the q to sit idle.
090     */
091    public void setWaitToDieMillis( int wtdm )
092    {
093        waitToDieMillis = wtdm;
094    }
095
096    /**
097     * Creates a brief string identifying the listener and the region.
098     * <p>
099     * @return String debugging info.
100     */
101    @Override
102    public String toString()
103    {
104        return "CacheEventQueue [listenerId=" + listenerId + ", cacheName=" + cacheName + "]";
105    }
106
107    /**
108     * If they queue has an active thread it is considered alive.
109     * <p>
110     * @return The alive value
111     */
112    @Override
113    public boolean isAlive()
114    {
115        return alive.get();
116    }
117
118    /**
119     * Sets whether the queue is actively processing -- if there are working threads.
120     * <p>
121     * @param aState
122     */
123    public void setAlive( boolean aState )
124    {
125        alive.set(aState);
126    }
127
128    /**
129     * @return The listenerId value
130     */
131    @Override
132    public long getListenerId()
133    {
134        return listenerId;
135    }
136
137    /**
138     * @return the cacheName
139     */
140    protected String getCacheName()
141    {
142        return cacheName;
143    }
144
145    /**
146     * Initializes the queue.
147     * <p>
148     * @param listener
149     * @param listenerId
150     * @param cacheName
151     * @param maxFailure
152     * @param waitBeforeRetry
153     */
154    protected void initialize( ICacheListener<K, V> listener, long listenerId, String cacheName, int maxFailure,
155                            int waitBeforeRetry)
156    {
157        if ( listener == null )
158        {
159            throw new IllegalArgumentException( "listener must not be null" );
160        }
161
162        this.listener = listener;
163        this.listenerId = listenerId;
164        this.cacheName = cacheName;
165        this.maxFailure = maxFailure <= 0 ? 3 : maxFailure;
166        this.waitBeforeRetry = waitBeforeRetry <= 0 ? 500 : waitBeforeRetry;
167
168        if ( log.isDebugEnabled() )
169        {
170            log.debug( "Constructed: " + this );
171        }
172    }
173
174    /**
175     * This adds a put event to the queue. When it is processed, the element will be put to the
176     * listener.
177     * <p>
178     * @param ce The feature to be added to the PutEvent attribute
179     * @throws IOException
180     */
181    @Override
182    public synchronized void addPutEvent( ICacheElement<K, V> ce )
183        throws IOException
184    {
185        if ( isWorking() )
186        {
187            put( new PutEvent( ce ) );
188        }
189        else if ( log.isWarnEnabled() )
190        {
191            log.warn( "Not enqueuing Put Event for [" + this + "] because it's non-functional." );
192        }
193    }
194
195    /**
196     * This adds a remove event to the queue. When processed the listener's remove method will be
197     * called for the key.
198     * <p>
199     * @param key The feature to be added to the RemoveEvent attribute
200     * @throws IOException
201     */
202    @Override
203    public synchronized void addRemoveEvent( K key )
204        throws IOException
205    {
206        if ( isWorking() )
207        {
208            put( new RemoveEvent( key ) );
209        }
210        else if ( log.isWarnEnabled() )
211        {
212            log.warn( "Not enqueuing Remove Event for [" + this + "] because it's non-functional." );
213        }
214    }
215
216    /**
217     * This adds a remove all event to the queue. When it is processed, all elements will be removed
218     * from the cache.
219     * <p>
220     * @throws IOException
221     */
222    @Override
223    public synchronized void addRemoveAllEvent()
224        throws IOException
225    {
226        if ( isWorking() )
227        {
228            put( new RemoveAllEvent() );
229        }
230        else if ( log.isWarnEnabled() )
231        {
232            log.warn( "Not enqueuing RemoveAll Event for [" + this + "] because it's non-functional." );
233        }
234    }
235
236    /**
237     * @throws IOException
238     */
239    @Override
240    public synchronized void addDisposeEvent()
241        throws IOException
242    {
243        if ( isWorking() )
244        {
245            put( new DisposeEvent() );
246        }
247        else if ( log.isWarnEnabled() )
248        {
249            log.warn( "Not enqueuing Dispose Event for [" + this + "] because it's non-functional." );
250        }
251    }
252
253    /**
254     * Adds an event to the queue.
255     * <p>
256     * @param event
257     */
258    protected abstract void put( AbstractCacheEvent event );
259
260
261    // /////////////////////////// Inner classes /////////////////////////////
262    /**
263     * Retries before declaring failure.
264     * <p>
265     * @author asmuts
266     */
267    protected abstract class AbstractCacheEvent implements Runnable
268    {
269        /** Number of failures encountered processing this event. */
270        int failures = 0;
271
272        /**
273         * Main processing method for the AbstractCacheEvent object
274         */
275        @Override
276        @SuppressWarnings("synthetic-access")
277        public void run()
278        {
279            try
280            {
281                doRun();
282            }
283            catch ( IOException e )
284            {
285                if ( log.isWarnEnabled() )
286                {
287                    log.warn( e );
288                }
289                if ( ++failures >= maxFailure )
290                {
291                    if ( log.isWarnEnabled() )
292                    {
293                        log.warn( "Error while running event from Queue: " + this
294                            + ". Dropping Event and marking Event Queue as non-functional." );
295                    }
296                    setWorking( false );
297                    setAlive( false );
298                    return;
299                }
300                if ( log.isInfoEnabled() )
301                {
302                    log.info( "Error while running event from Queue: " + this + ". Retrying..." );
303                }
304                try
305                {
306                    Thread.sleep( waitBeforeRetry );
307                    run();
308                }
309                catch ( InterruptedException ie )
310                {
311                    if ( log.isErrorEnabled() )
312                    {
313                        log.warn( "Interrupted while sleeping for retry on event " + this + "." );
314                    }
315                    // TODO consider if this is best. maybe we should just
316                    // destroy
317                    setWorking( false );
318                    setAlive( false );
319                }
320            }
321        }
322
323        /**
324         * @throws IOException
325         */
326        protected abstract void doRun()
327            throws IOException;
328    }
329
330    /**
331     * An element should be put in the cache.
332     * <p>
333     * @author asmuts
334     */
335    protected class PutEvent
336        extends AbstractCacheEvent
337    {
338        /** The element to put to the listener */
339        private final ICacheElement<K, V> ice;
340
341        /**
342         * Constructor for the PutEvent object.
343         * <p>
344         * @param ice
345         * @throws IOException
346         */
347        PutEvent( ICacheElement<K, V> ice )
348            throws IOException
349        {
350            this.ice = ice;
351        }
352
353        /**
354         * Call put on the listener.
355         * <p>
356         * @throws IOException
357         */
358        @Override
359        protected void doRun()
360            throws IOException
361        {
362            listener.handlePut( ice );
363        }
364
365        /**
366         * For debugging.
367         * <p>
368         * @return Info on the key and value.
369         */
370        @Override
371        public String toString()
372        {
373            return new StringBuilder( "PutEvent for key: " ).append( ice.getKey() ).append( " value: " )
374                .append( ice.getVal() ).toString();
375        }
376
377    }
378
379    /**
380     * An element should be removed from the cache.
381     * <p>
382     * @author asmuts
383     */
384    protected class RemoveEvent
385        extends AbstractCacheEvent
386    {
387        /** The key to remove from the listener */
388        private final K key;
389
390        /**
391         * Constructor for the RemoveEvent object
392         * <p>
393         * @param key
394         * @throws IOException
395         */
396        RemoveEvent( K key )
397            throws IOException
398        {
399            this.key = key;
400        }
401
402        /**
403         * Call remove on the listener.
404         * <p>
405         * @throws IOException
406         */
407        @Override
408        protected void doRun()
409            throws IOException
410        {
411            listener.handleRemove( cacheName, key );
412        }
413
414        /**
415         * For debugging.
416         * <p>
417         * @return Info on the key to remove.
418         */
419        @Override
420        public String toString()
421        {
422            return new StringBuilder( "RemoveEvent for " ).append( key ).toString();
423        }
424
425    }
426
427    /**
428     * All elements should be removed from the cache when this event is processed.
429     * <p>
430     * @author asmuts
431     */
432    protected class RemoveAllEvent
433        extends AbstractCacheEvent
434    {
435        /**
436         * Call removeAll on the listener.
437         * <p>
438         * @throws IOException
439         */
440        @Override
441        protected void doRun()
442            throws IOException
443        {
444            listener.handleRemoveAll( cacheName );
445        }
446
447        /**
448         * For debugging.
449         * <p>
450         * @return The name of the event.
451         */
452        @Override
453        public String toString()
454        {
455            return "RemoveAllEvent";
456        }
457    }
458
459    /**
460     * The cache should be disposed when this event is processed.
461     * <p>
462     * @author asmuts
463     */
464    protected class DisposeEvent
465        extends AbstractCacheEvent
466    {
467        /**
468         * Called when gets to the end of the queue
469         * <p>
470         * @throws IOException
471         */
472        @Override
473        protected void doRun()
474            throws IOException
475        {
476            listener.handleDispose( cacheName );
477        }
478
479        /**
480         * For debugging.
481         * <p>
482         * @return The name of the event.
483         */
484        @Override
485        public String toString()
486        {
487            return "DisposeEvent";
488        }
489    }
490
491    /**
492     * @return whether the queue is functional.
493     */
494    @Override
495    public boolean isWorking()
496    {
497        return working.get();
498    }
499
500    /**
501     * This means that the queue is functional. If we reached the max number of failures, the queue
502     * is marked as non functional and will never work again.
503     * <p>
504     * @param b
505     */
506    public void setWorking( boolean b )
507    {
508        working.set(b);
509    }
510}