View Javadoc
1   package org.apache.commons.jcs3.engine;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.ArrayList;
23  import java.util.concurrent.BlockingQueue;
24  import java.util.concurrent.ExecutorService;
25  import java.util.concurrent.ThreadPoolExecutor;
26  
27  import org.apache.commons.jcs3.engine.behavior.ICacheListener;
28  import org.apache.commons.jcs3.engine.stats.StatElement;
29  import org.apache.commons.jcs3.engine.stats.Stats;
30  import org.apache.commons.jcs3.engine.stats.behavior.IStatElement;
31  import org.apache.commons.jcs3.engine.stats.behavior.IStats;
32  import org.apache.commons.jcs3.log.Log;
33  import org.apache.commons.jcs3.log.LogManager;
34  import org.apache.commons.jcs3.utils.threadpool.ThreadPoolManager;
35  
36  /**
37   * An event queue is used to propagate ordered cache events to one and only one target listener.
38   * <p>
39   * This is a modified version of the experimental version. It uses a PooledExecutor and a
40   * BoundedBuffer to queue up events and execute them as threads become available.
41   * <p>
42   * The PooledExecutor is static, because presumably these processes will be IO bound, so throwing
43   * more than a few threads at them will serve no purpose other than to saturate the IO interface. In
44   * light of this, having one thread per region seems unnecessary. This may prove to be false.
45   */
46  public class PooledCacheEventQueue<K, V>
47      extends AbstractCacheEventQueue<K, V>
48  {
49      /** The logger. */
50      private static final Log log = LogManager.getLog( PooledCacheEventQueue.class );
51  
52      /** The Thread Pool to execute events with. */
53      protected ExecutorService pool;
54  
55      /** The Thread Pool queue */
56      protected BlockingQueue<Runnable> queue;
57  
58      /**
59       * Constructor for the CacheEventQueue object
60       * <p>
61       * @param listener
62       * @param listenerId
63       * @param cacheName
64       * @param maxFailure
65       * @param waitBeforeRetry
66       * @param threadPoolName
67       */
68      public PooledCacheEventQueue( final ICacheListener<K, V> listener, final long listenerId, final String cacheName, final int maxFailure,
69                                    final int waitBeforeRetry, final String threadPoolName )
70      {
71          initialize( listener, listenerId, cacheName, maxFailure, waitBeforeRetry, threadPoolName );
72      }
73  
74      /**
75       * Initializes the queue.
76       * <p>
77       * @param listener
78       * @param listenerId
79       * @param cacheName
80       * @param maxFailure
81       * @param waitBeforeRetry
82       * @param threadPoolName
83       */
84      protected void initialize( final ICacheListener<K, V> listener, final long listenerId, final String cacheName, final int maxFailure,
85                              final int waitBeforeRetry, final String threadPoolName )
86      {
87          super.initialize(listener, listenerId, cacheName, maxFailure, waitBeforeRetry);
88  
89          pool = createPool(threadPoolName);
90  
91          if (pool instanceof ThreadPoolExecutor)
92          {
93          	queue = ((ThreadPoolExecutor) pool).getQueue();
94          }
95      }
96  
97      /**
98       * Create the thread pool.
99       * <p>
100      * @param threadPoolName
101      * @since 3.1
102      */
103     protected ExecutorService createPool(final String threadPoolName)
104     {
105         // this will share the same pool with other event queues by default.
106         return ThreadPoolManager.getInstance().getExecutorService(
107                 (threadPoolName == null) ? "cache_event_queue" : threadPoolName );
108     }
109 
110     /**
111      * @return the queue type
112      */
113     @Override
114     public QueueType getQueueType()
115     {
116         /** The type of queue -- there are pooled and single */
117         return QueueType.POOLED;
118     }
119 
120     /**
121      * Destroy the queue. Interrupt all threads.
122      */
123     @Override
124     public synchronized void destroy()
125     {
126         if ( isWorking() )
127         {
128             setWorking(false);
129             log.info( "Cache event queue destroyed: {0}", this );
130         }
131     }
132 
133     /**
134      * Adds an event to the queue.
135      * <p>
136      * @param event
137      */
138     @Override
139     protected void put( final AbstractCacheEvent event )
140     {
141         pool.execute( event );
142     }
143 
144     /**
145      * @return IStats
146      */
147     @Override
148     public IStats getStatistics()
149     {
150         final IStats stats = new Stats();
151         stats.setTypeName( "Pooled Cache Event Queue" );
152 
153         final ArrayList<IStatElement<?>> elems = new ArrayList<>();
154 
155         elems.add(new StatElement<>( "Working", Boolean.valueOf(isWorking()) ) );
156         elems.add(new StatElement<>( "Empty", Boolean.valueOf(this.isEmpty()) ) );
157 
158         if ( queue != null )
159         {
160             elems.add(new StatElement<>( "Queue Size", Integer.valueOf(queue.size()) ) );
161             elems.add(new StatElement<>( "Queue Capacity", Integer.valueOf(queue.remainingCapacity()) ) );
162         }
163 
164         stats.setStatElements( elems );
165 
166         return stats;
167     }
168 
169     /**
170      * If the Queue is using a bounded channel we can determine the size. If it is zero or we can't
171      * determine the size, we return true.
172      * <p>
173      * @return whether or not there are items in the queue
174      */
175     @Override
176     public boolean isEmpty()
177     {
178         return size() == 0;
179     }
180 
181     /**
182      * Returns the number of elements in the queue. If the queue cannot determine the size
183      * accurately it will return 0.
184      * <p>
185      * @return number of items in the queue.
186      */
187     @Override
188     public int size()
189     {
190         if ( queue == null )
191         {
192             return 0;
193         }
194         return queue.size();
195     }
196 }