1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20 package org.apache.commons.pipeline.driver;
21
22 import java.lang.Thread.UncaughtExceptionHandler;
23 import java.util.concurrent.BlockingQueue;
24 import java.util.concurrent.CountDownLatch;
25 import java.util.concurrent.TimeUnit;
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.commons.pipeline.Feeder;
29 import org.apache.commons.pipeline.StageDriver;
30 import org.apache.commons.pipeline.Stage;
31 import org.apache.commons.pipeline.StageContext;
32 import org.apache.commons.pipeline.StageException;
33 import org.apache.commons.pipeline.driver.AbstractStageDriver;
34 import org.apache.commons.pipeline.driver.FaultTolerance;
35
36 import static org.apache.commons.pipeline.StageDriver.State.*;
37 import static org.apache.commons.pipeline.driver.FaultTolerance.*;
38
39 /**
40 * This {@link StageDriver} implementation uses a pool of threads
41 * to process objects from an input queue.
42 */
43 public class ThreadPoolStageDriver extends AbstractStageDriver {
44 // logger for the class
45 private final Log log = LogFactory.getLog(ThreadPoolStageDriver.class);
46
47 // wait timeout to ensure deadlock cannot occur on thread termination
48 private long timeout;
49
50 // signal telling threads to start polling queue
51 private final CountDownLatch startSignal;
52
53 // signal threads use to tell driver they have finished
54 private final CountDownLatch doneSignal;
55
56 // number of threads polling queue
57 private final int numThreads;
58
59 // queue to hold data to be processed
60 private final BlockingQueue queue;
61
62 //feeder used to feed data to this stage's queue
63 private final Feeder feeder = new Feeder() {
64 public void feed(Object obj) {
65 if (log.isDebugEnabled()) log.debug(obj + " is being fed to stage " + stage
66 + " (" + ThreadPoolStageDriver.this.queue.remainingCapacity() + " available slots in queue)");
67
68 try {
69 ThreadPoolStageDriver.this.queue.put(obj);
70 } catch (InterruptedException e) {
71 throw new IllegalStateException("Unexpected interrupt while waiting for space to become available for object "
72 + obj + " in queue for stage " + stage, e);
73 }
74
75 synchronized(ThreadPoolStageDriver.this) {
76 ThreadPoolStageDriver.this.notifyAll();
77 }
78 }
79 };
80
81 /**
82 * Creates a new ThreadPoolStageDriver.
83 *
84 * @param stage The stage that the driver will run
85 * @param context the context in which to run the stage
86 * @param queue The object queue to use for storing objects prior to processing. The
87 * default is {@link LinkedBlockingQueue}
88 * @param timeout The amount of time, in milliseconds, that the worker thread
89 * will wait before checking the processing state if no objects are available
90 * in the thread's queue.
91 * @param faultTolerance Flag determining the behavior of the driver when
92 * an error is encountered in execution of {@link Stage#process(Object)}.
93 * If this is set to false, any exception thrown during {@link Stage#process(Object)}
94 * will cause the worker thread to halt without executing {@link Stage#postprocess()}
95 * ({@link Stage#release()} will be called.)
96 * @param numThreads Number of threads that will be simultaneously reading from queue
97 */
98 public ThreadPoolStageDriver(Stage stage, StageContext context,
99 BlockingQueue queue,
100 long timeout,
101 FaultTolerance faultTolerance,
102 int numThreads) {
103 super(stage, context, faultTolerance);
104 this.numThreads = numThreads;
105
106 this.startSignal = new CountDownLatch(1);
107 this.doneSignal = new CountDownLatch(this.numThreads);
108
109 this.queue = queue;
110 this.timeout = timeout;
111 }
112
113 /**
114 * Return the Feeder used to feed data to the queue of objects to be processed.
115 * @return The feeder for objects processed by this driver's stage.
116 */
117 public Feeder getFeeder() {
118 return this.feeder;
119 }
120
121 /**
122 * Start the processing of the stage. Creates threads to poll items
123 * from queue.
124 * @throws org.apache.commons.pipeline.StageException Thrown if the driver is in an illegal state during startup
125 */
126 public synchronized void start() throws StageException {
127 if (this.currentState == STOPPED) {
128 setState(STARTED);
129
130 if (log.isDebugEnabled()) log.debug("Preprocessing stage " + stage + "...");
131 stage.preprocess();
132 if (log.isDebugEnabled()) log.debug("Preprocessing for stage " + stage + " complete.");
133
134 log.debug("Starting worker threads for stage " + stage + ".");
135
136 for (int i=0;i<this.numThreads;i++) {
137 new LatchWorkerThread(i).start();
138 }
139
140 // let threads know they can start
141 testAndSetState(STARTED, RUNNING);
142 startSignal.countDown();
143
144 log.debug("Worker threads for stage " + stage + " started.");
145 } else {
146 throw new IllegalStateException("Attempt to start driver in state " + this.currentState);
147 }
148 }
149
150 /**
151 * Causes processing to shut down gracefully. Waits until all worker threads
152 * have completed. It is important that this method be called only after
153 * the completion of execution of finish() in the driver for the prior
154 * stage; parallel finish calls can cause the stage to shut down before
155 * all prior stages have finished processing.
156 *
157 * @throws org.apache.commons.pipeline.StageException Thrown if the driver is in an illegal state for shutdown.
158 */
159 public synchronized void finish() throws StageException {
160 if (currentState == STOPPED) {
161 throw new IllegalStateException("The driver is not currently running.");
162 }
163
164 try {
165 //it may be the case that finish() is called when the driver is still in the process
166 //of starting up, so it is necessary to wait to enter the running state before
167 //a stop can be requested
168 while ( !(this.currentState == RUNNING || this.currentState == ERROR) ) this.wait();
169
170 //ask the worker threads to shut down
171 testAndSetState(RUNNING, STOP_REQUESTED);
172
173 if (log.isDebugEnabled()) log.debug("Waiting for worker threads to stop for stage " + stage + ".");
174 doneSignal.await();
175 if (log.isDebugEnabled()) log.debug("Worker threads for stage " + stage + " halted");
176
177 //transition into finished state (not used internally?)
178 testAndSetState(STOP_REQUESTED, FINISHED);
179
180 //do not run postprocessing if the driver is in an error state
181 if (this.currentState != ERROR) {
182 if (log.isDebugEnabled()) log.debug("Postprocessing stage " + stage + "...");
183 this.stage.postprocess();
184 if (log.isDebugEnabled()) log.debug("Postprocessing for stage " + stage + " complete.");
185 }
186
187 //the following lines appear to be artifacts of copy-and-paste from
188 //DedicatedThreadStageDriver.
189 // //do not transition into finished state if an error has occurred
190 // testAndSetState(STOP_REQUESTED, FINISHED);
191 //
192 // while ( !(this.currentState == FINISHED || this.currentState == ERROR) ) this.wait();
193
194 } catch (StageException e) {
195 log.error("An error occurred during postprocessing of stage " + stage , e);
196 recordFatalError(e);
197 setState(ERROR);
198 } catch (InterruptedException e) {
199 throw new StageException(this.getStage(), "StageDriver unexpectedly interrupted while waiting for shutdown of worker threads.", e);
200 } finally {
201 if (log.isDebugEnabled()) log.debug("Releasing resources for stage " + stage + "...");
202 stage.release();
203 if (log.isDebugEnabled()) log.debug("Stage " + stage + " released.");
204 }
205
206 testAndSetState(FINISHED, STOPPED);
207 }
208
209 /**
210 * Get the size of the queue used by this StageDriver.
211 * @return the queue capacity
212 */
213 public int getQueueSize() {
214 return this.queue.size() + this.queue.remainingCapacity();
215 }
216
217 /**
218 * Get the timeout value (in milliseconds) used by this StageDriver on
219 * thread termination.
220 * @return the timeout setting in milliseconds
221 */
222 public long getTimeout() {
223 return this.timeout;
224 }
225
226 /**
227 * Returns the number of threads allocated to the thread pool.
228 */
229 public int getNumThreads() {
230 return numThreads;
231 }
232
233 /*********************************
234 * WORKER THREAD IMPLEMENTATIONS *
235 *********************************/
236 private UncaughtExceptionHandler workerThreadExceptionHandler = new UncaughtExceptionHandler() {
237 public void uncaughtException(Thread t, Throwable e) {
238 setState(ERROR);
239 recordFatalError(e);
240 log.error("Uncaught exception in stage " + stage, e);
241 }
242 };
243
244 /**
245 * This worker thread removes and processes data objects from the incoming
246 * synchronize queue. It calls the Stage's process() method to process data
247 * from the queue. This loop runs until State has changed to
248 * STOP_REQUESTED. To break the loop the calling code must run the writer's
249 * finish() method to set the running property to false.
250 *
251 * @throws StageException if an error is encountered during data processing
252 * and faultTolerant is set to false.
253 */
254 private class LatchWorkerThread extends Thread {
255 final int threadID;
256
257 LatchWorkerThread(int threadID) {
258 this.setUncaughtExceptionHandler(workerThreadExceptionHandler);
259 this.threadID = threadID;
260 }
261
262 public final void run() {
263 try {
264 ThreadPoolStageDriver.this.startSignal.await();
265 //do not transition into running state if an error has occurred or a stop requested
266 running: while (currentState != ERROR) {
267 try {
268 Object obj = queue.poll(timeout, TimeUnit.MILLISECONDS);
269 if (obj == null) {
270 if (currentState == STOP_REQUESTED) break running;
271 } else {
272 try {
273 stage.process(obj);
274 } catch (StageException e) {
275 recordProcessingException(obj, e);
276 if (faultTolerance == NONE) throw e;
277 } catch (RuntimeException e) {
278 recordProcessingException(obj, e);
279 if (faultTolerance == CHECKED || faultTolerance == NONE) throw e;
280 }
281 }
282 } catch (InterruptedException e) {
283 throw new RuntimeException("Worker thread " + this.threadID + " unexpectedly interrupted while waiting on data for stage " + stage, e);
284 }
285 }
286 if (log.isDebugEnabled()) log.debug("Stage " + stage + " (threadID: " + this.threadID + ") exited running state.");
287
288 } catch (StageException e) {
289 log.error("An error occurred in the stage " + stage + " (threadID: " + this.threadID + ")", e);
290 recordFatalError(e);
291 setState(ERROR);
292 } catch (InterruptedException e) {
293 log.error("Stage " + stage + " (threadID: " + threadID + ") interrupted while waiting for barrier", e);
294 recordFatalError(e);
295 setState(ERROR);
296 } finally {
297 doneSignal.countDown();
298 synchronized (ThreadPoolStageDriver.this) {
299 ThreadPoolStageDriver.this.notifyAll();
300 }
301 }
302 }
303 }
304 }