1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.apache.commons.pipeline.driver;
22
23 import java.lang.Thread.UncaughtExceptionHandler;
24 import java.util.concurrent.BlockingQueue;
25 import java.util.concurrent.TimeUnit;
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.commons.pipeline.driver.AbstractStageDriver;
29 import org.apache.commons.pipeline.Feeder;
30 import org.apache.commons.pipeline.StageDriver;
31 import org.apache.commons.pipeline.Stage;
32 import org.apache.commons.pipeline.StageContext;
33 import org.apache.commons.pipeline.StageException;
34 import static org.apache.commons.pipeline.StageDriver.State.*;
35 import org.apache.commons.pipeline.StageDriver.State;
36 import static org.apache.commons.pipeline.driver.FaultTolerance.*;
37
38
39
40
41
42 public class DedicatedThreadStageDriver extends AbstractStageDriver {
43 private final Log log = LogFactory.getLog(DedicatedThreadStageDriver.class);
44
45
46 private long timeout;
47
48
49 private Thread workerThread;
50
51
52 private BlockingQueue queue;
53
54
55 private final Feeder feeder = new Feeder() {
56 public void feed(Object obj) {
57 if (log.isDebugEnabled()) log.debug(obj + " is being fed to stage " + stage
58 + " (" + DedicatedThreadStageDriver.this.queue.remainingCapacity() + " available slots in queue)");
59 try {
60 DedicatedThreadStageDriver.this.queue.put(obj);
61 } catch (InterruptedException e) {
62 throw new IllegalStateException("Unexpected interrupt while waiting for space to become available for object "
63 + obj + " in queue for stage " + stage, e);
64 }
65
66 synchronized(DedicatedThreadStageDriver.this) {
67 DedicatedThreadStageDriver.this.notifyAll();
68 }
69 }
70 };
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88 public DedicatedThreadStageDriver(Stage stage, StageContext context, BlockingQueue queue, long timeout, FaultTolerance faultTolerance) {
89 super(stage, context, faultTolerance);
90 this.queue = queue;
91 this.timeout = timeout;
92 }
93
94
95
96
97
98 public Feeder getFeeder() {
99 return this.feeder;
100 }
101
102
103
104
105
106 public synchronized void start() throws StageException {
107 if (this.currentState == STOPPED) {
108 log.debug("Starting worker thread for stage " + stage + ".");
109 this.workerThread = new WorkerThread(stage);
110 this.workerThread.start();
111 log.debug("Worker thread for stage " + stage + " started.");
112
113
114 try {
115 while ( !(this.currentState == RUNNING || this.currentState == ERROR) ) this.wait();
116 } catch (InterruptedException e) {
117 throw new StageException(this.getStage(), "Worker thread unexpectedly interrupted while waiting for thread startup.", e);
118 }
119 } else {
120 throw new IllegalStateException("Attempt to start driver in state " + this.currentState);
121 }
122 }
123
124
125
126
127
128 public synchronized void finish() throws StageException {
129 if (currentState == STOPPED) {
130 throw new IllegalStateException("The driver is not currently running.");
131 }
132
133 try {
134 while ( !(this.currentState == RUNNING || this.currentState == ERROR) ) this.wait();
135
136
137 testAndSetState(RUNNING, STOP_REQUESTED);
138
139 while ( !(this.currentState == FINISHED || this.currentState == ERROR) ) this.wait();
140
141 log.debug("Waiting for worker thread stop for stage " + stage + ".");
142 this.workerThread.join();
143 log.debug("Worker thread for stage " + stage + " halted");
144
145 } catch (InterruptedException e) {
146 throw new StageException(this.getStage(), "Worker thread unexpectedly interrupted while waiting for graceful shutdown.", e);
147 }
148
149 setState(STOPPED);
150 }
151
152
153
154
155 private UncaughtExceptionHandler workerThreadExceptionHandler = new UncaughtExceptionHandler() {
156 public void uncaughtException(Thread t, Throwable e) {
157 setState(ERROR);
158 recordFatalError(e);
159 log.error("Uncaught exception in stage " + stage, e);
160 }
161 };
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176 private class WorkerThread extends Thread {
177
178 private Stage stage;
179
180 public WorkerThread(Stage stage) {
181 this.setUncaughtExceptionHandler(workerThreadExceptionHandler);
182 this.stage = stage;
183 }
184
185 public final void run() {
186 setState(STARTED);
187
188 try {
189 if (log.isDebugEnabled()) log.debug("Preprocessing stage " + stage + "...");
190 stage.preprocess();
191 if (log.isDebugEnabled()) log.debug("Preprocessing for stage " + stage + " complete.");
192
193
194 testAndSetState(STARTED, RUNNING);
195 running: while (currentState != ERROR) {
196 try {
197 Object obj = queue.poll(timeout, TimeUnit.MILLISECONDS);
198 if (obj == null) {
199 if (currentState == STOP_REQUESTED) break running;
200
201 } else {
202 try {
203 stage.process(obj);
204 } catch (StageException e) {
205 recordProcessingException(obj, e);
206 if (faultTolerance == NONE) throw e;
207 } catch (RuntimeException e) {
208 recordProcessingException(obj, e);
209 if (faultTolerance == CHECKED || faultTolerance == NONE) throw e;
210 }
211 }
212 } catch (InterruptedException e) {
213 throw new RuntimeException("Worker thread unexpectedly interrupted while waiting on data for stage " + stage, e);
214 }
215 }
216 if (log.isDebugEnabled()) log.debug("Stage " + stage + " exited running state.");
217
218 if (log.isDebugEnabled()) log.debug("Postprocessing stage " + stage + "...");
219 stage.postprocess();
220 if (log.isDebugEnabled()) log.debug("Postprocessing for stage " + stage + " complete.");
221
222 } catch (StageException e) {
223 log.error("An error occurred in the stage " + stage, e);
224 recordFatalError(e);
225 setState(ERROR);
226 } finally {
227 if (log.isDebugEnabled()) log.debug("Releasing resources for stage " + stage + "...");
228 stage.release();
229 if (log.isDebugEnabled()) log.debug("Stage " + stage + " released.");
230 }
231
232
233 testAndSetState(STOP_REQUESTED, FINISHED);
234 }
235 }
236
237
238
239
240
241 public int getQueueSize() {
242 return this.queue.size() + this.queue.remainingCapacity();
243 }
244
245
246
247
248
249
250 public long getTimeout() {
251 return this.timeout;
252 }
253 }