001package org.apache.commons.jcs.engine;
002
003/*
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements.  See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership.  The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License.  You may obtain a copy of the License at
011 *
012 *   http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing,
015 * software distributed under the License is distributed on an
016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017 * KIND, either express or implied.  See the License for the
018 * specific language governing permissions and limitations
019 * under the License.
020 */
021
022import java.util.ArrayList;
023import java.util.concurrent.LinkedBlockingQueue;
024import java.util.concurrent.TimeUnit;
025
026import org.apache.commons.jcs.engine.behavior.ICacheListener;
027import org.apache.commons.jcs.engine.stats.StatElement;
028import org.apache.commons.jcs.engine.stats.Stats;
029import org.apache.commons.jcs.engine.stats.behavior.IStatElement;
030import org.apache.commons.jcs.engine.stats.behavior.IStats;
031import org.apache.commons.logging.Log;
032import org.apache.commons.logging.LogFactory;
033
034/**
035 * An event queue is used to propagate ordered cache events to one and only one target listener.
036 * <p>
037 * This is a modified version of the experimental version. It should lazy initialize the processor
038 * thread, and kill the thread if the queue goes empty for a specified period, now set to 1 minute.
039 * If something comes in after that a new processor thread should be created.
040 */
041public class CacheEventQueue<K, V>
042    extends AbstractCacheEventQueue<K, V>
043{
044    /** The logger. */
045    private static final Log log = LogFactory.getLog( CacheEventQueue.class );
046
047    /** The type of queue -- there are pooled and single */
048    private static final QueueType queueType = QueueType.SINGLE;
049
050    /** the thread that works the queue. */
051    private Thread processorThread;
052
053    /** Queue implementation */
054    private LinkedBlockingQueue<AbstractCacheEvent> queue = new LinkedBlockingQueue<AbstractCacheEvent>();
055
056    /**
057     * Constructs with the specified listener and the cache name.
058     * <p>
059     * @param listener
060     * @param listenerId
061     * @param cacheName
062     */
063    public CacheEventQueue( ICacheListener<K, V> listener, long listenerId, String cacheName )
064    {
065        this( listener, listenerId, cacheName, 10, 500 );
066    }
067
068    /**
069     * Constructor for the CacheEventQueue object
070     * <p>
071     * @param listener
072     * @param listenerId
073     * @param cacheName
074     * @param maxFailure
075     * @param waitBeforeRetry
076     */
077    public CacheEventQueue( ICacheListener<K, V> listener, long listenerId, String cacheName, int maxFailure,
078                            int waitBeforeRetry )
079    {
080        initialize( listener, listenerId, cacheName, maxFailure, waitBeforeRetry );
081    }
082
083    /**
084     * What type of queue is this.
085     * <p>
086     * @return queueType
087     */
088    @Override
089    public QueueType getQueueType()
090    {
091        return queueType;
092    }
093
094    /**
095     * Kill the processor thread and indicate that the queue is destroyed and no longer alive, but it
096     * can still be working.
097     */
098    protected void stopProcessing()
099    {
100        setAlive(false);
101        processorThread = null;
102    }
103
104    /**
105     * Event Q is empty.
106     * <p>
107     * Calling destroy interrupts the processor thread.
108     */
109    @Override
110    public void destroy()
111    {
112        if ( isAlive() )
113        {
114            setAlive(false);
115
116            if ( log.isInfoEnabled() )
117            {
118                log.info( "Destroying queue, stats =  " + getStatistics() );
119            }
120
121            if ( processorThread != null )
122            {
123                processorThread.interrupt();
124                processorThread = null;
125            }
126
127            if ( log.isInfoEnabled() )
128            {
129                log.info( "Cache event queue destroyed: " + this );
130            }
131        }
132        else
133        {
134            if ( log.isInfoEnabled() )
135            {
136                log.info( "Destroy was called after queue was destroyed. Doing nothing. Stats =  " + getStatistics() );
137            }
138        }
139    }
140
141    /**
142     * Adds an event to the queue.
143     * <p>
144     * @param event
145     */
146    @Override
147    protected void put( AbstractCacheEvent event )
148    {
149        if ( log.isDebugEnabled() )
150        {
151            log.debug( "Event entering Queue for " + getCacheName() + ": " + event );
152        }
153
154        queue.offer(event);
155
156        if ( isWorking() )
157        {
158            if ( !isAlive() )
159            {
160                setAlive(true);
161                processorThread = new QProcessor();
162                processorThread.start();
163                if ( log.isInfoEnabled() )
164                {
165                    log.info( "Cache event queue created: " + this );
166                }
167            }
168        }
169    }
170
171    // /////////////////////////// Inner classes /////////////////////////////
172
173    /**
174     * This is the thread that works the queue.
175     * <p>
176     * @author asmuts
177     */
178    protected class QProcessor
179        extends Thread
180    {
181        /**
182         * Constructor for the QProcessor object
183         * <p>
184         * @param aQueue the event queue to take items from.
185         */
186        QProcessor()
187        {
188            super( "CacheEventQueue.QProcessor-" + getCacheName() );
189            setDaemon( true );
190        }
191
192        /**
193         * Main processing method for the QProcessor object.
194         * <p>
195         * Waits for a specified time (waitToDieMillis) for something to come in and if no new
196         * events come in during that period the run method can exit and the thread is dereferenced.
197         */
198        @Override
199        public void run()
200        {
201
202            while ( CacheEventQueue.this.isAlive() )
203            {
204                AbstractCacheEvent event = null;
205
206                try
207                {
208                    event = queue.poll(getWaitToDieMillis(), TimeUnit.MILLISECONDS);
209                }
210                catch (InterruptedException e)
211                {
212                    // is ok
213                }
214
215                if ( log.isDebugEnabled() )
216                {
217                    log.debug( "Event from queue = " + event );
218                }
219
220                if ( event == null )
221                {
222                    stopProcessing();
223                }
224
225                if ( event != null && isWorking() && CacheEventQueue.this.isAlive() )
226                {
227                    event.run();
228                }
229            }
230            if ( log.isDebugEnabled() )
231            {
232                log.debug( "QProcessor exiting for " + getCacheName() );
233            }
234        }
235    }
236
237    /**
238     * This method returns semi-structured data on this queue.
239     * <p>
240     * @see org.apache.commons.jcs.engine.behavior.ICacheEventQueue#getStatistics()
241     * @return information on the status and history of the queue
242     */
243    @Override
244    public IStats getStatistics()
245    {
246        IStats stats = new Stats();
247        stats.setTypeName( "Cache Event Queue" );
248
249        ArrayList<IStatElement<?>> elems = new ArrayList<IStatElement<?>>();
250
251        elems.add(new StatElement<Boolean>( "Working", Boolean.valueOf(this.isWorking()) ) );
252        elems.add(new StatElement<Boolean>( "Alive", Boolean.valueOf(this.isAlive()) ) );
253        elems.add(new StatElement<Boolean>( "Empty", Boolean.valueOf(this.isEmpty()) ) );
254        elems.add(new StatElement<Integer>( "Size", Integer.valueOf(this.size()) ) );
255
256        stats.setStatElements( elems );
257
258        return stats;
259    }
260
261    /**
262     * @return whether there are any items in the queue.
263     */
264    @Override
265    public boolean isEmpty()
266    {
267        return queue.isEmpty();
268    }
269
270    /**
271     * Returns the number of elements in the queue.
272     * <p>
273     * @return number of items in the queue.
274     */
275    @Override
276    public int size()
277    {
278        return queue.size();
279    }
280}