1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.commons.pipeline.driver;
21
22 import java.lang.Thread.UncaughtExceptionHandler;
23 import java.util.concurrent.BlockingQueue;
24 import java.util.concurrent.CountDownLatch;
25 import java.util.concurrent.TimeUnit;
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.commons.pipeline.Feeder;
29 import org.apache.commons.pipeline.StageDriver;
30 import org.apache.commons.pipeline.Stage;
31 import org.apache.commons.pipeline.StageContext;
32 import org.apache.commons.pipeline.StageException;
33 import org.apache.commons.pipeline.driver.AbstractStageDriver;
34 import org.apache.commons.pipeline.driver.FaultTolerance;
35
36 import static org.apache.commons.pipeline.StageDriver.State.*;
37 import static org.apache.commons.pipeline.driver.FaultTolerance.*;
38
39
40
41
42
43 public class ThreadPoolStageDriver extends AbstractStageDriver {
44
45 private final Log log = LogFactory.getLog(ThreadPoolStageDriver.class);
46
47
48 private long timeout;
49
50
51 private final CountDownLatch startSignal;
52
53
54 private final CountDownLatch doneSignal;
55
56
57 private final int numThreads;
58
59
60 private final BlockingQueue queue;
61
62
63 private final Feeder feeder = new Feeder() {
64 public void feed(Object obj) {
65 if (log.isDebugEnabled()) log.debug(obj + " is being fed to stage " + stage
66 + " (" + ThreadPoolStageDriver.this.queue.remainingCapacity() + " available slots in queue)");
67
68 try {
69 ThreadPoolStageDriver.this.queue.put(obj);
70 } catch (InterruptedException e) {
71 throw new IllegalStateException("Unexpected interrupt while waiting for space to become available for object "
72 + obj + " in queue for stage " + stage, e);
73 }
74
75 synchronized(ThreadPoolStageDriver.this) {
76 ThreadPoolStageDriver.this.notifyAll();
77 }
78 }
79 };
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98 public ThreadPoolStageDriver(Stage stage, StageContext context,
99 BlockingQueue queue,
100 long timeout,
101 FaultTolerance faultTolerance,
102 int numThreads) {
103 super(stage, context, faultTolerance);
104 this.numThreads = numThreads;
105
106 this.startSignal = new CountDownLatch(1);
107 this.doneSignal = new CountDownLatch(this.numThreads);
108
109 this.queue = queue;
110 this.timeout = timeout;
111 }
112
113
114
115
116
117 public Feeder getFeeder() {
118 return this.feeder;
119 }
120
121
122
123
124
125
126 public synchronized void start() throws StageException {
127 if (this.currentState == STOPPED) {
128 setState(STARTED);
129
130 if (log.isDebugEnabled()) log.debug("Preprocessing stage " + stage + "...");
131 stage.preprocess();
132 if (log.isDebugEnabled()) log.debug("Preprocessing for stage " + stage + " complete.");
133
134 log.debug("Starting worker threads for stage " + stage + ".");
135
136 for (int i=0;i<this.numThreads;i++) {
137 new LatchWorkerThread(i).start();
138 }
139
140
141 testAndSetState(STARTED, RUNNING);
142 startSignal.countDown();
143
144 log.debug("Worker threads for stage " + stage + " started.");
145 } else {
146 throw new IllegalStateException("Attempt to start driver in state " + this.currentState);
147 }
148 }
149
150
151
152
153
154
155
156
157
158
159 public synchronized void finish() throws StageException {
160 if (currentState == STOPPED) {
161 throw new IllegalStateException("The driver is not currently running.");
162 }
163
164 try {
165
166
167
168 while ( !(this.currentState == RUNNING || this.currentState == ERROR) ) this.wait();
169
170
171 testAndSetState(RUNNING, STOP_REQUESTED);
172
173 if (log.isDebugEnabled()) log.debug("Waiting for worker threads to stop for stage " + stage + ".");
174 doneSignal.await();
175 if (log.isDebugEnabled()) log.debug("Worker threads for stage " + stage + " halted");
176
177
178 testAndSetState(STOP_REQUESTED, FINISHED);
179
180
181 if (this.currentState != ERROR) {
182 if (log.isDebugEnabled()) log.debug("Postprocessing stage " + stage + "...");
183 this.stage.postprocess();
184 if (log.isDebugEnabled()) log.debug("Postprocessing for stage " + stage + " complete.");
185 }
186
187
188
189
190
191
192
193
194 } catch (StageException e) {
195 log.error("An error occurred during postprocessing of stage " + stage , e);
196 recordFatalError(e);
197 setState(ERROR);
198 } catch (InterruptedException e) {
199 throw new StageException(this.getStage(), "StageDriver unexpectedly interrupted while waiting for shutdown of worker threads.", e);
200 } finally {
201 if (log.isDebugEnabled()) log.debug("Releasing resources for stage " + stage + "...");
202 stage.release();
203 if (log.isDebugEnabled()) log.debug("Stage " + stage + " released.");
204 }
205
206 testAndSetState(FINISHED, STOPPED);
207 }
208
209
210
211
212
213 public int getQueueSize() {
214 return this.queue.size() + this.queue.remainingCapacity();
215 }
216
217
218
219
220
221
222 public long getTimeout() {
223 return this.timeout;
224 }
225
226
227
228
229 public int getNumThreads() {
230 return numThreads;
231 }
232
233
234
235
236 private UncaughtExceptionHandler workerThreadExceptionHandler = new UncaughtExceptionHandler() {
237 public void uncaughtException(Thread t, Throwable e) {
238 setState(ERROR);
239 recordFatalError(e);
240 log.error("Uncaught exception in stage " + stage, e);
241 }
242 };
243
244
245
246
247
248
249
250
251
252
253
254 private class LatchWorkerThread extends Thread {
255 final int threadID;
256
257 LatchWorkerThread(int threadID) {
258 this.setUncaughtExceptionHandler(workerThreadExceptionHandler);
259 this.threadID = threadID;
260 }
261
262 public final void run() {
263 try {
264 ThreadPoolStageDriver.this.startSignal.await();
265
266 running: while (currentState != ERROR) {
267 try {
268 Object obj = queue.poll(timeout, TimeUnit.MILLISECONDS);
269 if (obj == null) {
270 if (currentState == STOP_REQUESTED) break running;
271 } else {
272 try {
273 stage.process(obj);
274 } catch (StageException e) {
275 recordProcessingException(obj, e);
276 if (faultTolerance == NONE) throw e;
277 } catch (RuntimeException e) {
278 recordProcessingException(obj, e);
279 if (faultTolerance == CHECKED || faultTolerance == NONE) throw e;
280 }
281 }
282 } catch (InterruptedException e) {
283 throw new RuntimeException("Worker thread " + this.threadID + " unexpectedly interrupted while waiting on data for stage " + stage, e);
284 }
285 }
286 if (log.isDebugEnabled()) log.debug("Stage " + stage + " (threadID: " + this.threadID + ") exited running state.");
287
288 } catch (StageException e) {
289 log.error("An error occurred in the stage " + stage + " (threadID: " + this.threadID + ")", e);
290 recordFatalError(e);
291 setState(ERROR);
292 } catch (InterruptedException e) {
293 log.error("Stage " + stage + " (threadID: " + threadID + ") interrupted while waiting for barrier", e);
294 recordFatalError(e);
295 setState(ERROR);
296 } finally {
297 doneSignal.countDown();
298 synchronized (ThreadPoolStageDriver.this) {
299 ThreadPoolStageDriver.this.notifyAll();
300 }
301 }
302 }
303 }
304 }