1 package org.apache.commons.jcs.engine;
2
3 import java.util.ArrayList;
4 import java.util.concurrent.BlockingQueue;
5 import java.util.concurrent.ThreadPoolExecutor;
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 import org.apache.commons.jcs.engine.behavior.ICacheListener;
27 import org.apache.commons.jcs.engine.stats.StatElement;
28 import org.apache.commons.jcs.engine.stats.Stats;
29 import org.apache.commons.jcs.engine.stats.behavior.IStatElement;
30 import org.apache.commons.jcs.engine.stats.behavior.IStats;
31 import org.apache.commons.jcs.utils.threadpool.ThreadPoolManager;
32 import org.apache.commons.logging.Log;
33 import org.apache.commons.logging.LogFactory;
34
35
36
37
38
39
40
41
42
43
44
45 public class PooledCacheEventQueue<K, V>
46 extends AbstractCacheEventQueue<K, V>
47 {
48
49 private static final Log log = LogFactory.getLog( PooledCacheEventQueue.class );
50
51
52 private static final QueueType queueType = QueueType.POOLED;
53
54
55 private ThreadPoolExecutor pool = null;
56
57
58
59
60
61
62
63
64
65
66
67 public PooledCacheEventQueue( ICacheListener<K, V> listener, long listenerId, String cacheName, int maxFailure,
68 int waitBeforeRetry, String threadPoolName )
69 {
70 initialize( listener, listenerId, cacheName, maxFailure, waitBeforeRetry, threadPoolName );
71 }
72
73
74
75
76
77
78
79
80
81
82
83 protected void initialize( ICacheListener<K, V> listener, long listenerId, String cacheName, int maxFailure,
84 int waitBeforeRetry, String threadPoolName )
85 {
86 super.initialize(listener, listenerId, cacheName, maxFailure, waitBeforeRetry);
87
88
89 pool = ThreadPoolManager.getInstance().getPool(
90 (threadPoolName == null) ? "cache_event_queue" : threadPoolName );
91 }
92
93
94
95
96 @Override
97 public QueueType getQueueType()
98 {
99 return queueType;
100 }
101
102
103
104
105 @Override
106 public synchronized void destroy()
107 {
108 if ( isAlive() )
109 {
110 setAlive(false);
111 pool.shutdownNow();
112 if ( log.isInfoEnabled() )
113 {
114 log.info( "Cache event queue destroyed: " + this );
115 }
116 }
117 }
118
119
120
121
122
123
124 @Override
125 protected void put( AbstractCacheEvent event )
126 {
127 pool.execute( event );
128 }
129
130
131
132
133 @Override
134 public IStats getStatistics()
135 {
136 IStats stats = new Stats();
137 stats.setTypeName( "Pooled Cache Event Queue" );
138
139 ArrayList<IStatElement<?>> elems = new ArrayList<IStatElement<?>>();
140
141 elems.add(new StatElement<Boolean>( "Working", Boolean.valueOf(super.isWorking()) ) );
142 elems.add(new StatElement<Boolean>( "Alive", Boolean.valueOf(this.isAlive()) ) );
143 elems.add(new StatElement<Boolean>( "Empty", Boolean.valueOf(this.isEmpty()) ) );
144
145 if ( pool.getQueue() != null )
146 {
147 BlockingQueue<Runnable> bb = pool.getQueue();
148 elems.add(new StatElement<Integer>( "Queue Size", Integer.valueOf(bb.size()) ) );
149 elems.add(new StatElement<Integer>( "Queue Capacity", Integer.valueOf(bb.remainingCapacity()) ) );
150 }
151
152 elems.add(new StatElement<Integer>( "Pool Size", Integer.valueOf(pool.getPoolSize()) ) );
153 elems.add(new StatElement<Integer>( "Maximum Pool Size", Integer.valueOf(pool.getMaximumPoolSize()) ) );
154
155 stats.setStatElements( elems );
156
157 return stats;
158 }
159
160
161
162
163
164
165
166 @Override
167 public boolean isEmpty()
168 {
169 if ( pool.getQueue() == null )
170 {
171 return true;
172 }
173 else
174 {
175 return pool.getQueue().size() == 0;
176 }
177 }
178
179
180
181
182
183
184
185 @Override
186 public int size()
187 {
188 if ( pool.getQueue() == null )
189 {
190 return 0;
191 }
192 else
193 {
194 return pool.getQueue().size();
195 }
196 }
197 }