1 package org.apache.commons.jcs3.engine;
2
3 /*
4 * Licensed to the Apache Software Foundation (ASF) under one
5 * or more contributor license agreements. See the NOTICE file
6 * distributed with this work for additional information
7 * regarding copyright ownership. The ASF licenses this file
8 * to you under the Apache License, Version 2.0 (the
9 * "License"); you may not use this file except in compliance
10 * with the License. You may obtain a copy of the License at
11 *
12 * http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing,
15 * software distributed under the License is distributed on an
16 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17 * KIND, either express or implied. See the License for the
18 * specific language governing permissions and limitations
19 * under the License.
20 */
21
22 import java.util.ArrayList;
23 import java.util.concurrent.BlockingQueue;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.ThreadPoolExecutor;
26
27 import org.apache.commons.jcs3.engine.behavior.ICacheListener;
28 import org.apache.commons.jcs3.engine.stats.StatElement;
29 import org.apache.commons.jcs3.engine.stats.Stats;
30 import org.apache.commons.jcs3.engine.stats.behavior.IStatElement;
31 import org.apache.commons.jcs3.engine.stats.behavior.IStats;
32 import org.apache.commons.jcs3.log.Log;
33 import org.apache.commons.jcs3.log.LogManager;
34 import org.apache.commons.jcs3.utils.threadpool.ThreadPoolManager;
35
36 /**
37 * An event queue is used to propagate ordered cache events to one and only one target listener.
38 * <p>
39 * This is a modified version of the experimental version. It uses a PooledExecutor and a
40 * BoundedBuffer to queue up events and execute them as threads become available.
41 * <p>
42 * The PooledExecutor is static, because presumably these processes will be IO bound, so throwing
43 * more than a few threads at them will serve no purpose other than to saturate the IO interface. In
44 * light of this, having one thread per region seems unnecessary. This may prove to be false.
45 */
46 public class PooledCacheEventQueue<K, V>
47 extends AbstractCacheEventQueue<K, V>
48 {
49 /** The logger. */
50 private static final Log log = LogManager.getLog( PooledCacheEventQueue.class );
51
52 /** The Thread Pool to execute events with. */
53 protected ExecutorService pool;
54
55 /** The Thread Pool queue */
56 protected BlockingQueue<Runnable> queue;
57
58 /**
59 * Constructor for the CacheEventQueue object
60 * <p>
61 * @param listener
62 * @param listenerId
63 * @param cacheName
64 * @param maxFailure
65 * @param waitBeforeRetry
66 * @param threadPoolName
67 */
68 public PooledCacheEventQueue( final ICacheListener<K, V> listener, final long listenerId, final String cacheName, final int maxFailure,
69 final int waitBeforeRetry, final String threadPoolName )
70 {
71 initialize( listener, listenerId, cacheName, maxFailure, waitBeforeRetry, threadPoolName );
72 }
73
74 /**
75 * Initializes the queue.
76 * <p>
77 * @param listener
78 * @param listenerId
79 * @param cacheName
80 * @param maxFailure
81 * @param waitBeforeRetry
82 * @param threadPoolName
83 */
84 protected void initialize( final ICacheListener<K, V> listener, final long listenerId, final String cacheName, final int maxFailure,
85 final int waitBeforeRetry, final String threadPoolName )
86 {
87 super.initialize(listener, listenerId, cacheName, maxFailure, waitBeforeRetry);
88
89 pool = createPool(threadPoolName);
90
91 if (pool instanceof ThreadPoolExecutor)
92 {
93 queue = ((ThreadPoolExecutor) pool).getQueue();
94 }
95 }
96
97 /**
98 * Create the thread pool.
99 * <p>
100 * @param threadPoolName
101 * @since 3.1
102 */
103 protected ExecutorService createPool(final String threadPoolName)
104 {
105 // this will share the same pool with other event queues by default.
106 return ThreadPoolManager.getInstance().getExecutorService(
107 (threadPoolName == null) ? "cache_event_queue" : threadPoolName );
108 }
109
110 /**
111 * @return the queue type
112 */
113 @Override
114 public QueueType getQueueType()
115 {
116 /** The type of queue -- there are pooled and single */
117 return QueueType.POOLED;
118 }
119
120 /**
121 * Destroy the queue. Interrupt all threads.
122 */
123 @Override
124 public synchronized void destroy()
125 {
126 if ( isWorking() )
127 {
128 setWorking(false);
129 log.info( "Cache event queue destroyed: {0}", this );
130 }
131 }
132
133 /**
134 * Adds an event to the queue.
135 * <p>
136 * @param event
137 */
138 @Override
139 protected void put( final AbstractCacheEvent event )
140 {
141 pool.execute( event );
142 }
143
144 /**
145 * @return IStats
146 */
147 @Override
148 public IStats getStatistics()
149 {
150 final IStats stats = new Stats();
151 stats.setTypeName( "Pooled Cache Event Queue" );
152
153 final ArrayList<IStatElement<?>> elems = new ArrayList<>();
154
155 elems.add(new StatElement<>( "Working", Boolean.valueOf(isWorking()) ) );
156 elems.add(new StatElement<>( "Empty", Boolean.valueOf(this.isEmpty()) ) );
157
158 if ( queue != null )
159 {
160 elems.add(new StatElement<>( "Queue Size", Integer.valueOf(queue.size()) ) );
161 elems.add(new StatElement<>( "Queue Capacity", Integer.valueOf(queue.remainingCapacity()) ) );
162 }
163
164 stats.setStatElements( elems );
165
166 return stats;
167 }
168
169 /**
170 * If the Queue is using a bounded channel we can determine the size. If it is zero or we can't
171 * determine the size, we return true.
172 * <p>
173 * @return whether or not there are items in the queue
174 */
175 @Override
176 public boolean isEmpty()
177 {
178 return size() == 0;
179 }
180
181 /**
182 * Returns the number of elements in the queue. If the queue cannot determine the size
183 * accurately it will return 0.
184 * <p>
185 * @return number of items in the queue.
186 */
187 @Override
188 public int size()
189 {
190 if ( queue == null )
191 {
192 return 0;
193 }
194 return queue.size();
195 }
196 }