001package org.apache.commons.jcs3.engine;
002
003/*
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements.  See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership.  The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License.  You may obtain a copy of the License at
011 *
012 *   http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing,
015 * software distributed under the License is distributed on an
016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017 * KIND, either express or implied.  See the License for the
018 * specific language governing permissions and limitations
019 * under the License.
020 */
021
022import java.util.ArrayList;
023import java.util.concurrent.BlockingQueue;
024import java.util.concurrent.ExecutorService;
025import java.util.concurrent.ThreadPoolExecutor;
026
027import org.apache.commons.jcs3.engine.behavior.ICacheListener;
028import org.apache.commons.jcs3.engine.stats.StatElement;
029import org.apache.commons.jcs3.engine.stats.Stats;
030import org.apache.commons.jcs3.engine.stats.behavior.IStatElement;
031import org.apache.commons.jcs3.engine.stats.behavior.IStats;
032import org.apache.commons.jcs3.log.Log;
033import org.apache.commons.jcs3.log.LogManager;
034import org.apache.commons.jcs3.utils.threadpool.ThreadPoolManager;
035
036/**
037 * An event queue is used to propagate ordered cache events to one and only one target listener.
038 * <p>
039 * This is a modified version of the experimental version. It uses a PooledExecutor and a
040 * BoundedBuffer to queue up events and execute them as threads become available.
041 * <p>
042 * The PooledExecutor is static, because presumably these processes will be IO bound, so throwing
043 * more than a few threads at them will serve no purpose other than to saturate the IO interface. In
044 * light of this, having one thread per region seems unnecessary. This may prove to be false.
045 */
046public class PooledCacheEventQueue<K, V>
047    extends AbstractCacheEventQueue<K, V>
048{
049    /** The logger. */
050    private static final Log log = LogManager.getLog( PooledCacheEventQueue.class );
051
052    /** The Thread Pool to execute events with. */
053    protected ExecutorService pool;
054
055    /** The Thread Pool queue */
056    protected BlockingQueue<Runnable> queue;
057
058    /**
059     * Constructor for the CacheEventQueue object
060     * <p>
061     * @param listener
062     * @param listenerId
063     * @param cacheName
064     * @param maxFailure
065     * @param waitBeforeRetry
066     * @param threadPoolName
067     */
068    public PooledCacheEventQueue( final ICacheListener<K, V> listener, final long listenerId, final String cacheName, final int maxFailure,
069                                  final int waitBeforeRetry, final String threadPoolName )
070    {
071        initialize( listener, listenerId, cacheName, maxFailure, waitBeforeRetry, threadPoolName );
072    }
073
074    /**
075     * Initializes the queue.
076     * <p>
077     * @param listener
078     * @param listenerId
079     * @param cacheName
080     * @param maxFailure
081     * @param waitBeforeRetry
082     * @param threadPoolName
083     */
084    protected void initialize( final ICacheListener<K, V> listener, final long listenerId, final String cacheName, final int maxFailure,
085                            final int waitBeforeRetry, final String threadPoolName )
086    {
087        super.initialize(listener, listenerId, cacheName, maxFailure, waitBeforeRetry);
088
089        pool = createPool(threadPoolName);
090
091        if (pool instanceof ThreadPoolExecutor)
092        {
093                queue = ((ThreadPoolExecutor) pool).getQueue();
094        }
095    }
096
097    /**
098     * Create the thread pool.
099     * <p>
100     * @param threadPoolName
101     * @since 3.1
102     */
103    protected ExecutorService createPool(final String threadPoolName)
104    {
105        // this will share the same pool with other event queues by default.
106        return ThreadPoolManager.getInstance().getExecutorService(
107                (threadPoolName == null) ? "cache_event_queue" : threadPoolName );
108    }
109
110    /**
111     * @return the queue type
112     */
113    @Override
114    public QueueType getQueueType()
115    {
116        /** The type of queue -- there are pooled and single */
117        return QueueType.POOLED;
118    }
119
120    /**
121     * Destroy the queue. Interrupt all threads.
122     */
123    @Override
124    public synchronized void destroy()
125    {
126        if ( isWorking() )
127        {
128            setWorking(false);
129            log.info( "Cache event queue destroyed: {0}", this );
130        }
131    }
132
133    /**
134     * Adds an event to the queue.
135     * <p>
136     * @param event
137     */
138    @Override
139    protected void put( final AbstractCacheEvent event )
140    {
141        pool.execute( event );
142    }
143
144    /**
145     * @return IStats
146     */
147    @Override
148    public IStats getStatistics()
149    {
150        final IStats stats = new Stats();
151        stats.setTypeName( "Pooled Cache Event Queue" );
152
153        final ArrayList<IStatElement<?>> elems = new ArrayList<>();
154
155        elems.add(new StatElement<>( "Working", Boolean.valueOf(isWorking()) ) );
156        elems.add(new StatElement<>( "Empty", Boolean.valueOf(this.isEmpty()) ) );
157
158        if ( queue != null )
159        {
160            elems.add(new StatElement<>( "Queue Size", Integer.valueOf(queue.size()) ) );
161            elems.add(new StatElement<>( "Queue Capacity", Integer.valueOf(queue.remainingCapacity()) ) );
162        }
163
164        stats.setStatElements( elems );
165
166        return stats;
167    }
168
169    /**
170     * If the Queue is using a bounded channel we can determine the size. If it is zero or we can't
171     * determine the size, we return true.
172     * <p>
173     * @return whether or not there are items in the queue
174     */
175    @Override
176    public boolean isEmpty()
177    {
178        return size() == 0;
179    }
180
181    /**
182     * Returns the number of elements in the queue. If the queue cannot determine the size
183     * accurately it will return 0.
184     * <p>
185     * @return number of items in the queue.
186     */
187    @Override
188    public int size()
189    {
190        if ( queue == null )
191        {
192            return 0;
193        }
194        return queue.size();
195    }
196}