View Javadoc
1   package org.apache.commons.jcs.engine;
2   
3   import java.util.ArrayList;
4   import java.util.concurrent.BlockingQueue;
5   import java.util.concurrent.ThreadPoolExecutor;
6   
7   /*
8    * Licensed to the Apache Software Foundation (ASF) under one
9    * or more contributor license agreements.  See the NOTICE file
10   * distributed with this work for additional information
11   * regarding copyright ownership.  The ASF licenses this file
12   * to you under the Apache License, Version 2.0 (the
13   * "License"); you may not use this file except in compliance
14   * with the License.  You may obtain a copy of the License at
15   *
16   *   http://www.apache.org/licenses/LICENSE-2.0
17   *
18   * Unless required by applicable law or agreed to in writing,
19   * software distributed under the License is distributed on an
20   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
21   * KIND, either express or implied.  See the License for the
22   * specific language governing permissions and limitations
23   * under the License.
24   */
25  
26  import org.apache.commons.jcs.engine.behavior.ICacheListener;
27  import org.apache.commons.jcs.engine.stats.StatElement;
28  import org.apache.commons.jcs.engine.stats.Stats;
29  import org.apache.commons.jcs.engine.stats.behavior.IStatElement;
30  import org.apache.commons.jcs.engine.stats.behavior.IStats;
31  import org.apache.commons.jcs.utils.threadpool.ThreadPoolManager;
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  
35  /**
36   * An event queue is used to propagate ordered cache events to one and only one target listener.
37   * <p>
38   * This is a modified version of the experimental version. It uses a PooledExecutor and a
39   * BoundedBuffer to queue up events and execute them as threads become available.
40   * <p>
41   * The PooledExecutor is static, because presumably these processes will be IO bound, so throwing
42   * more than a few threads at them will serve no purpose other than to saturate the IO interface. In
43   * light of this, having one thread per region seems unnecessary. This may prove to be false.
44   */
45  public class PooledCacheEventQueue<K, V>
46      extends AbstractCacheEventQueue<K, V>
47  {
48      /** The logger. */
49      private static final Log log = LogFactory.getLog( PooledCacheEventQueue.class );
50  
51      /** The type of event queue */
52      private static final QueueType queueType = QueueType.POOLED;
53  
54      /** The Thread Pool to execute events with. */
55      private ThreadPoolExecutor pool = null;
56  
57      /**
58       * Constructor for the CacheEventQueue object
59       * <p>
60       * @param listener
61       * @param listenerId
62       * @param cacheName
63       * @param maxFailure
64       * @param waitBeforeRetry
65       * @param threadPoolName
66       */
67      public PooledCacheEventQueue( ICacheListener<K, V> listener, long listenerId, String cacheName, int maxFailure,
68                                    int waitBeforeRetry, String threadPoolName )
69      {
70          initialize( listener, listenerId, cacheName, maxFailure, waitBeforeRetry, threadPoolName );
71      }
72  
73      /**
74       * Initializes the queue.
75       * <p>
76       * @param listener
77       * @param listenerId
78       * @param cacheName
79       * @param maxFailure
80       * @param waitBeforeRetry
81       * @param threadPoolName
82       */
83      protected void initialize( ICacheListener<K, V> listener, long listenerId, String cacheName, int maxFailure,
84                              int waitBeforeRetry, String threadPoolName )
85      {
86          super.initialize(listener, listenerId, cacheName, maxFailure, waitBeforeRetry);
87  
88          // this will share the same pool with other event queues by default.
89          pool = ThreadPoolManager.getInstance().getPool(
90                  (threadPoolName == null) ? "cache_event_queue" : threadPoolName );
91      }
92  
93      /**
94       * @return the queue type
95       */
96      @Override
97      public QueueType getQueueType()
98      {
99          return queueType;
100     }
101 
102     /**
103      * Destroy the queue. Interrupt all threads.
104      */
105     @Override
106     public synchronized void destroy()
107     {
108         if ( isAlive() )
109         {
110             setAlive(false);
111             pool.shutdownNow();
112             if ( log.isInfoEnabled() )
113             {
114                 log.info( "Cache event queue destroyed: " + this );
115             }
116         }
117     }
118 
119     /**
120      * Adds an event to the queue.
121      * <p>
122      * @param event
123      */
124     @Override
125     protected void put( AbstractCacheEvent event )
126     {
127         pool.execute( event );
128     }
129 
130     /**
131      * @return IStats
132      */
133     @Override
134     public IStats getStatistics()
135     {
136         IStats stats = new Stats();
137         stats.setTypeName( "Pooled Cache Event Queue" );
138 
139         ArrayList<IStatElement<?>> elems = new ArrayList<IStatElement<?>>();
140 
141         elems.add(new StatElement<Boolean>( "Working", Boolean.valueOf(super.isWorking()) ) );
142         elems.add(new StatElement<Boolean>( "Alive", Boolean.valueOf(this.isAlive()) ) );
143         elems.add(new StatElement<Boolean>( "Empty", Boolean.valueOf(this.isEmpty()) ) );
144 
145         if ( pool.getQueue() != null )
146         {
147             BlockingQueue<Runnable> bb = pool.getQueue();
148             elems.add(new StatElement<Integer>( "Queue Size", Integer.valueOf(bb.size()) ) );
149             elems.add(new StatElement<Integer>( "Queue Capacity", Integer.valueOf(bb.remainingCapacity()) ) );
150         }
151 
152         elems.add(new StatElement<Integer>( "Pool Size", Integer.valueOf(pool.getPoolSize()) ) );
153         elems.add(new StatElement<Integer>( "Maximum Pool Size", Integer.valueOf(pool.getMaximumPoolSize()) ) );
154 
155         stats.setStatElements( elems );
156 
157         return stats;
158     }
159 
160     /**
161      * If the Queue is using a bounded channel we can determine the size. If it is zero or we can't
162      * determine the size, we return true.
163      * <p>
164      * @return whether or not there are items in the queue
165      */
166     @Override
167     public boolean isEmpty()
168     {
169         if ( pool.getQueue() == null )
170         {
171             return true;
172         }
173         else
174         {
175             return pool.getQueue().size() == 0;
176         }
177     }
178 
179     /**
180      * Returns the number of elements in the queue. If the queue cannot determine the size
181      * accurately it will return 1.
182      * <p>
183      * @return number of items in the queue.
184      */
185     @Override
186     public int size()
187     {
188         if ( pool.getQueue() == null )
189         {
190             return 0;
191         }
192         else
193         {
194             return pool.getQueue().size();
195         }
196     }
197 }