001package org.apache.commons.jcs.engine;
002
003import java.util.ArrayList;
004import java.util.concurrent.BlockingQueue;
005import java.util.concurrent.ThreadPoolExecutor;
006
007/*
008 * Licensed to the Apache Software Foundation (ASF) under one
009 * or more contributor license agreements.  See the NOTICE file
010 * distributed with this work for additional information
011 * regarding copyright ownership.  The ASF licenses this file
012 * to you under the Apache License, Version 2.0 (the
013 * "License"); you may not use this file except in compliance
014 * with the License.  You may obtain a copy of the License at
015 *
016 *   http://www.apache.org/licenses/LICENSE-2.0
017 *
018 * Unless required by applicable law or agreed to in writing,
019 * software distributed under the License is distributed on an
020 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
021 * KIND, either express or implied.  See the License for the
022 * specific language governing permissions and limitations
023 * under the License.
024 */
025
026import org.apache.commons.jcs.engine.behavior.ICacheListener;
027import org.apache.commons.jcs.engine.stats.StatElement;
028import org.apache.commons.jcs.engine.stats.Stats;
029import org.apache.commons.jcs.engine.stats.behavior.IStatElement;
030import org.apache.commons.jcs.engine.stats.behavior.IStats;
031import org.apache.commons.jcs.utils.threadpool.ThreadPoolManager;
032import org.apache.commons.logging.Log;
033import org.apache.commons.logging.LogFactory;
034
035/**
036 * An event queue is used to propagate ordered cache events to one and only one target listener.
037 * <p>
038 * This is a modified version of the experimental version. It uses a PooledExecutor and a
039 * BoundedBuffer to queue up events and execute them as threads become available.
040 * <p>
041 * The PooledExecutor is static, because presumably these processes will be IO bound, so throwing
042 * more than a few threads at them will serve no purpose other than to saturate the IO interface. In
043 * light of this, having one thread per region seems unnecessary. This may prove to be false.
044 */
045public class PooledCacheEventQueue<K, V>
046    extends AbstractCacheEventQueue<K, V>
047{
048    /** The logger. */
049    private static final Log log = LogFactory.getLog( PooledCacheEventQueue.class );
050
051    /** The type of event queue */
052    private static final QueueType queueType = QueueType.POOLED;
053
054    /** The Thread Pool to execute events with. */
055    private ThreadPoolExecutor pool = null;
056
057    /**
058     * Constructor for the CacheEventQueue object
059     * <p>
060     * @param listener
061     * @param listenerId
062     * @param cacheName
063     * @param maxFailure
064     * @param waitBeforeRetry
065     * @param threadPoolName
066     */
067    public PooledCacheEventQueue( ICacheListener<K, V> listener, long listenerId, String cacheName, int maxFailure,
068                                  int waitBeforeRetry, String threadPoolName )
069    {
070        initialize( listener, listenerId, cacheName, maxFailure, waitBeforeRetry, threadPoolName );
071    }
072
073    /**
074     * Initializes the queue.
075     * <p>
076     * @param listener
077     * @param listenerId
078     * @param cacheName
079     * @param maxFailure
080     * @param waitBeforeRetry
081     * @param threadPoolName
082     */
083    protected void initialize( ICacheListener<K, V> listener, long listenerId, String cacheName, int maxFailure,
084                            int waitBeforeRetry, String threadPoolName )
085    {
086        super.initialize(listener, listenerId, cacheName, maxFailure, waitBeforeRetry);
087
088        // this will share the same pool with other event queues by default.
089        pool = ThreadPoolManager.getInstance().getPool(
090                (threadPoolName == null) ? "cache_event_queue" : threadPoolName );
091    }
092
093    /**
094     * @return the queue type
095     */
096    @Override
097    public QueueType getQueueType()
098    {
099        return queueType;
100    }
101
102    /**
103     * Destroy the queue. Interrupt all threads.
104     */
105    @Override
106    public synchronized void destroy()
107    {
108        if ( isAlive() )
109        {
110            setAlive(false);
111            pool.shutdownNow();
112            if ( log.isInfoEnabled() )
113            {
114                log.info( "Cache event queue destroyed: " + this );
115            }
116        }
117    }
118
119    /**
120     * Adds an event to the queue.
121     * <p>
122     * @param event
123     */
124    @Override
125    protected void put( AbstractCacheEvent event )
126    {
127        pool.execute( event );
128    }
129
130    /**
131     * @return IStats
132     */
133    @Override
134    public IStats getStatistics()
135    {
136        IStats stats = new Stats();
137        stats.setTypeName( "Pooled Cache Event Queue" );
138
139        ArrayList<IStatElement<?>> elems = new ArrayList<IStatElement<?>>();
140
141        elems.add(new StatElement<Boolean>( "Working", Boolean.valueOf(super.isWorking()) ) );
142        elems.add(new StatElement<Boolean>( "Alive", Boolean.valueOf(this.isAlive()) ) );
143        elems.add(new StatElement<Boolean>( "Empty", Boolean.valueOf(this.isEmpty()) ) );
144
145        if ( pool.getQueue() != null )
146        {
147            BlockingQueue<Runnable> bb = pool.getQueue();
148            elems.add(new StatElement<Integer>( "Queue Size", Integer.valueOf(bb.size()) ) );
149            elems.add(new StatElement<Integer>( "Queue Capacity", Integer.valueOf(bb.remainingCapacity()) ) );
150        }
151
152        elems.add(new StatElement<Integer>( "Pool Size", Integer.valueOf(pool.getPoolSize()) ) );
153        elems.add(new StatElement<Integer>( "Maximum Pool Size", Integer.valueOf(pool.getMaximumPoolSize()) ) );
154
155        stats.setStatElements( elems );
156
157        return stats;
158    }
159
160    /**
161     * If the Queue is using a bounded channel we can determine the size. If it is zero or we can't
162     * determine the size, we return true.
163     * <p>
164     * @return whether or not there are items in the queue
165     */
166    @Override
167    public boolean isEmpty()
168    {
169        if ( pool.getQueue() == null )
170        {
171            return true;
172        }
173        else
174        {
175            return pool.getQueue().size() == 0;
176        }
177    }
178
179    /**
180     * Returns the number of elements in the queue. If the queue cannot determine the size
181     * accurately it will return 1.
182     * <p>
183     * @return number of items in the queue.
184     */
185    @Override
186    public int size()
187    {
188        if ( pool.getQueue() == null )
189        {
190            return 0;
191        }
192        else
193        {
194            return pool.getQueue().size();
195        }
196    }
197}