1 package org.apache.commons.jcs3.engine;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.ArrayList;
23 import java.util.concurrent.BlockingQueue;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.ThreadPoolExecutor;
26
27 import org.apache.commons.jcs3.engine.behavior.ICacheListener;
28 import org.apache.commons.jcs3.engine.stats.StatElement;
29 import org.apache.commons.jcs3.engine.stats.Stats;
30 import org.apache.commons.jcs3.engine.stats.behavior.IStatElement;
31 import org.apache.commons.jcs3.engine.stats.behavior.IStats;
32 import org.apache.commons.jcs3.log.Log;
33 import org.apache.commons.jcs3.log.LogManager;
34 import org.apache.commons.jcs3.utils.threadpool.ThreadPoolManager;
35
36
37
38
39
40
41
42
43
44
45
46 public class PooledCacheEventQueue<K, V>
47 extends AbstractCacheEventQueue<K, V>
48 {
49
50 private static final Log log = LogManager.getLog( PooledCacheEventQueue.class );
51
52
53 protected ExecutorService pool;
54
55
56 protected BlockingQueue<Runnable> queue;
57
58
59
60
61
62
63
64
65
66
67
68 public PooledCacheEventQueue( final ICacheListener<K, V> listener, final long listenerId, final String cacheName, final int maxFailure,
69 final int waitBeforeRetry, final String threadPoolName )
70 {
71 initialize( listener, listenerId, cacheName, maxFailure, waitBeforeRetry, threadPoolName );
72 }
73
74
75
76
77
78
79
80
81
82
83
84 protected void initialize( final ICacheListener<K, V> listener, final long listenerId, final String cacheName, final int maxFailure,
85 final int waitBeforeRetry, final String threadPoolName )
86 {
87 super.initialize(listener, listenerId, cacheName, maxFailure, waitBeforeRetry);
88
89 pool = createPool(threadPoolName);
90
91 if (pool instanceof ThreadPoolExecutor)
92 {
93 queue = ((ThreadPoolExecutor) pool).getQueue();
94 }
95 }
96
97
98
99
100
101
102
103 protected ExecutorService createPool(final String threadPoolName)
104 {
105
106 return ThreadPoolManager.getInstance().getExecutorService(
107 (threadPoolName == null) ? "cache_event_queue" : threadPoolName );
108 }
109
110
111
112
113 @Override
114 public QueueType getQueueType()
115 {
116
117 return QueueType.POOLED;
118 }
119
120
121
122
123 @Override
124 public synchronized void destroy()
125 {
126 if ( isWorking() )
127 {
128 setWorking(false);
129 log.info( "Cache event queue destroyed: {0}", this );
130 }
131 }
132
133
134
135
136
137
138 @Override
139 protected void put( final AbstractCacheEvent event )
140 {
141 pool.execute( event );
142 }
143
144
145
146
147 @Override
148 public IStats getStatistics()
149 {
150 final IStats stats = new Stats();
151 stats.setTypeName( "Pooled Cache Event Queue" );
152
153 final ArrayList<IStatElement<?>> elems = new ArrayList<>();
154
155 elems.add(new StatElement<>( "Working", Boolean.valueOf(isWorking()) ) );
156 elems.add(new StatElement<>( "Empty", Boolean.valueOf(this.isEmpty()) ) );
157
158 if ( queue != null )
159 {
160 elems.add(new StatElement<>( "Queue Size", Integer.valueOf(queue.size()) ) );
161 elems.add(new StatElement<>( "Queue Capacity", Integer.valueOf(queue.remainingCapacity()) ) );
162 }
163
164 stats.setStatElements( elems );
165
166 return stats;
167 }
168
169
170
171
172
173
174
175 @Override
176 public boolean isEmpty()
177 {
178 return size() == 0;
179 }
180
181
182
183
184
185
186
187 @Override
188 public int size()
189 {
190 if ( queue == null )
191 {
192 return 0;
193 }
194 return queue.size();
195 }
196 }