001    package org.apache.jcs.engine;
002    
003    /*
004     * Licensed to the Apache Software Foundation (ASF) under one
005     * or more contributor license agreements.  See the NOTICE file
006     * distributed with this work for additional information
007     * regarding copyright ownership.  The ASF licenses this file
008     * to you under the Apache License, Version 2.0 (the
009     * "License"); you may not use this file except in compliance
010     * with the License.  You may obtain a copy of the License at
011     *
012     *   http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing,
015     * software distributed under the License is distributed on an
016     * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017     * KIND, either express or implied.  See the License for the
018     * specific language governing permissions and limitations
019     * under the License.
020     */
021    
022    import java.io.IOException;
023    import java.io.Serializable;
024    
025    import org.apache.commons.logging.Log;
026    import org.apache.commons.logging.LogFactory;
027    import org.apache.jcs.engine.behavior.ICacheElement;
028    import org.apache.jcs.engine.behavior.ICacheEventQueue;
029    import org.apache.jcs.engine.behavior.ICacheListener;
030    
031    /**
032     * An abstract base class to the different implementations
033     */
034    public abstract class AbstractCacheEventQueue<K extends Serializable, V extends Serializable>
035        implements ICacheEventQueue<K, V>
036    {
037        /** The logger. */
038        protected static final Log log = LogFactory.getLog( AbstractCacheEventQueue.class );
039    
040        /** default */
041        protected static final int DEFAULT_WAIT_TO_DIE_MILLIS = 10000;
042    
043        /**
044         * time to wait for an event before snuffing the background thread if the queue is empty. make
045         * configurable later
046         */
047        protected int waitToDieMillis = DEFAULT_WAIT_TO_DIE_MILLIS;
048    
049        /**
050         * When the events are pulled off the queue, the tell the listener to handle the specific event
051         * type. The work is done by the listener.
052         */
053        protected ICacheListener<K, V> listener;
054    
055        /** Id of the listener registered with this queue */
056        protected long listenerId;
057    
058        /** The cache region name, if applicable. */
059        protected String cacheName;
060    
061        /** Maximum number of failures before we buy the farm. */
062        protected int maxFailure;
063    
064        /** in milliseconds */
065        protected int waitBeforeRetry;
066    
067        /** this is true if there is no worker thread. */
068        protected boolean destroyed = true;
069    
070        /**
071         * This means that the queue is functional. If we reached the max number of failures, the queue
072         * is marked as non functional and will never work again.
073         */
074        protected boolean working = true;
075    
076        /**
077         * Returns the time to wait for events before killing the background thread.
078         * <p>
079         * @return int
080         */
081        public int getWaitToDieMillis()
082        {
083            return waitToDieMillis;
084        }
085    
086        /**
087         * Sets the time to wait for events before killing the background thread.
088         * <p>
089         * @param wtdm the ms for the q to sit idle.
090         */
091        public void setWaitToDieMillis( int wtdm )
092        {
093            waitToDieMillis = wtdm;
094        }
095    
096        /**
097         * Creates a brief string identifying the listener and the region.
098         * <p>
099         * @return String debugging info.
100         */
101        @Override
102        public String toString()
103        {
104            return "CacheEventQueue [listenerId=" + listenerId + ", cacheName=" + cacheName + "]";
105        }
106    
107        /**
108         * If they queue has an active thread it is considered alive.
109         * <p>
110         * @return The alive value
111         */
112        public synchronized boolean isAlive()
113        {
114            return ( !destroyed );
115        }
116    
117        /**
118         * Sets whether the queue is actively processing -- if there are working threads.
119         * <p>
120         * @param aState
121         */
122        public synchronized void setAlive( boolean aState )
123        {
124            destroyed = !aState;
125        }
126    
127        /**
128         * @return The listenerId value
129         */
130        public long getListenerId()
131        {
132            return listenerId;
133        }
134    
135        /**
136         * This adds a put event to the queue. When it is processed, the element will be put to the
137         * listener.
138         * <p>
139         * @param ce The feature to be added to the PutEvent attribute
140         * @exception IOException
141         */
142        public synchronized void addPutEvent( ICacheElement<K, V> ce )
143            throws IOException
144        {
145            if ( isWorking() )
146            {
147                put( new PutEvent( ce ) );
148            }
149            else
150            {
151                if ( log.isWarnEnabled() )
152                {
153                    log.warn( "Not enqueuing Put Event for [" + this + "] because it's non-functional." );
154                }
155            }
156        }
157    
158        /**
159         * This adds a remove event to the queue. When processed the listener's remove method will be
160         * called for the key.
161         * <p>
162         * @param key The feature to be added to the RemoveEvent attribute
163         * @exception IOException
164         */
165        public synchronized void addRemoveEvent( K key )
166            throws IOException
167        {
168            if ( isWorking() )
169            {
170                put( new RemoveEvent( key ) );
171            }
172            else
173            {
174                if ( log.isWarnEnabled() )
175                {
176                    log.warn( "Not enqueuing Remove Event for [" + this + "] because it's non-functional." );
177                }
178            }
179        }
180    
181        /**
182         * This adds a remove all event to the queue. When it is processed, all elements will be removed
183         * from the cache.
184         * <p>
185         * @exception IOException
186         */
187        public synchronized void addRemoveAllEvent()
188            throws IOException
189        {
190            if ( isWorking() )
191            {
192                put( new RemoveAllEvent() );
193            }
194            else
195            {
196                if ( log.isWarnEnabled() )
197                {
198                    log.warn( "Not enqueuing RemoveAll Event for [" + this + "] because it's non-functional." );
199                }
200            }
201        }
202    
203        /**
204         * @exception IOException
205         */
206        public synchronized void addDisposeEvent()
207            throws IOException
208        {
209            if ( isWorking() )
210            {
211                put( new DisposeEvent() );
212            }
213            else
214            {
215                if ( log.isWarnEnabled() )
216                {
217                    log.warn( "Not enqueuing Dispose Event for [" + this + "] because it's non-functional." );
218                }
219            }
220        }
221    
222        /**
223         * Adds an event to the queue.
224         * <p>
225         * @param event
226         */
227        protected abstract void put( AbstractCacheEvent event );
228    
229    
230        // /////////////////////////// Inner classes /////////////////////////////
231    
232        /** The queue is composed of nodes. */
233        protected static class Node
234        {
235            /** Next node in the singly linked list. */
236            Node next = null;
237    
238            /** The payload. */
239            AbstractCacheEventQueue<?, ?>.AbstractCacheEvent event = null;
240        }
241    
242        /**
243         * Retries before declaring failure.
244         * <p>
245         * @author asmuts
246         * @created January 15, 2002
247         */
248        protected abstract class AbstractCacheEvent
249            implements Runnable
250        {
251            /** Number of failures encountered processing this event. */
252            int failures = 0;
253    
254            /**
255             * Main processing method for the AbstractCacheEvent object
256             */
257            public void run()
258            {
259                try
260                {
261                    doRun();
262                }
263                catch ( IOException e )
264                {
265                    if ( log.isWarnEnabled() )
266                    {
267                        log.warn( e );
268                    }
269                    if ( ++failures >= maxFailure )
270                    {
271                        if ( log.isWarnEnabled() )
272                        {
273                            log.warn( "Error while running event from Queue: " + this
274                                + ". Dropping Event and marking Event Queue as non-functional." );
275                        }
276                        setWorking( false );
277                        setAlive( false );
278                        return;
279                    }
280                    if ( log.isInfoEnabled() )
281                    {
282                        log.info( "Error while running event from Queue: " + this + ". Retrying..." );
283                    }
284                    try
285                    {
286                        Thread.sleep( waitBeforeRetry );
287                        run();
288                    }
289                    catch ( InterruptedException ie )
290                    {
291                        if ( log.isErrorEnabled() )
292                        {
293                            log.warn( "Interrupted while sleeping for retry on event " + this + "." );
294                        }
295                        // TODO consider if this is best. maybe we should just
296                        // destroy
297                        setWorking( false );
298                        setAlive( false );
299                    }
300                }
301            }
302    
303            /**
304             * @exception IOException
305             */
306            protected abstract void doRun()
307                throws IOException;
308        }
309    
310        /**
311         * An element should be put in the cache.
312         * <p>
313         * @author asmuts
314         * @created January 15, 2002
315         */
316        protected class PutEvent
317            extends AbstractCacheEvent
318        {
319            /** The element to put to the listener */
320            private final ICacheElement<K, V> ice;
321    
322            /**
323             * Constructor for the PutEvent object.
324             * <p>
325             * @param ice
326             * @exception IOException
327             */
328            PutEvent( ICacheElement<K, V> ice )
329                throws IOException
330            {
331                this.ice = ice;
332            }
333    
334            /**
335             * Call put on the listener.
336             * <p>
337             * @exception IOException
338             */
339            @Override
340            protected void doRun()
341                throws IOException
342            {
343                listener.handlePut( ice );
344            }
345    
346            /**
347             * For debugging.
348             * <p>
349             * @return Info on the key and value.
350             */
351            @Override
352            public String toString()
353            {
354                return new StringBuffer( "PutEvent for key: " ).append( ice.getKey() ).append( " value: " )
355                    .append( ice.getVal() ).toString();
356            }
357    
358        }
359    
360        /**
361         * An element should be removed from the cache.
362         * <p>
363         * @author asmuts
364         * @created January 15, 2002
365         */
366        protected class RemoveEvent
367            extends AbstractCacheEvent
368        {
369            /** The key to remove from the listener */
370            private final K key;
371    
372            /**
373             * Constructor for the RemoveEvent object
374             * <p>
375             * @param key
376             * @exception IOException
377             */
378            RemoveEvent( K key )
379                throws IOException
380            {
381                this.key = key;
382            }
383    
384            /**
385             * Call remove on the listener.
386             * <p>
387             * @exception IOException
388             */
389            @Override
390            protected void doRun()
391                throws IOException
392            {
393                listener.handleRemove( cacheName, key );
394            }
395    
396            /**
397             * For debugging.
398             * <p>
399             * @return Info on the key to remove.
400             */
401            @Override
402            public String toString()
403            {
404                return new StringBuffer( "RemoveEvent for " ).append( key ).toString();
405            }
406    
407        }
408    
409        /**
410         * All elements should be removed from the cache when this event is processed.
411         * <p>
412         * @author asmuts
413         * @created January 15, 2002
414         */
415        protected class RemoveAllEvent
416            extends AbstractCacheEvent
417        {
418            /**
419             * Call removeAll on the listener.
420             * <p>
421             * @exception IOException
422             */
423            @Override
424            protected void doRun()
425                throws IOException
426            {
427                listener.handleRemoveAll( cacheName );
428            }
429    
430            /**
431             * For debugging.
432             * <p>
433             * @return The name of the event.
434             */
435            @Override
436            public String toString()
437            {
438                return "RemoveAllEvent";
439            }
440    
441        }
442    
443        /**
444         * The cache should be disposed when this event is processed.
445         * <p>
446         * @author asmuts
447         * @created January 15, 2002
448         */
449        protected class DisposeEvent
450            extends AbstractCacheEvent
451        {
452            /**
453             * Called when gets to the end of the queue
454             * <p>
455             * @exception IOException
456             */
457            @Override
458            protected void doRun()
459                throws IOException
460            {
461                listener.handleDispose( cacheName );
462            }
463    
464            /**
465             * For debugging.
466             * <p>
467             * @return The name of the event.
468             */
469            @Override
470            public String toString()
471            {
472                return "DisposeEvent";
473            }
474        }
475    
476        /**
477         * @return whether the queue is functional.
478         */
479        public boolean isWorking()
480        {
481            return working;
482        }
483    
484        /**
485         * This means that the queue is functional. If we reached the max number of failures, the queue
486         * is marked as non functional and will never work again.
487         * <p>
488         * @param b
489         */
490        public void setWorking( boolean b )
491        {
492            working = b;
493        }
494    }