1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20
21 package org.apache.commons.pipeline.driver;
22
23 import java.lang.Thread.UncaughtExceptionHandler;
24 import java.util.concurrent.BlockingQueue;
25 import java.util.concurrent.TimeUnit;
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.commons.pipeline.driver.AbstractStageDriver;
29 import org.apache.commons.pipeline.Feeder;
30 import org.apache.commons.pipeline.StageDriver;
31 import org.apache.commons.pipeline.Stage;
32 import org.apache.commons.pipeline.StageContext;
33 import org.apache.commons.pipeline.StageException;
34 import static org.apache.commons.pipeline.StageDriver.State.*;
35 import org.apache.commons.pipeline.StageDriver.State;
36 import static org.apache.commons.pipeline.driver.FaultTolerance.*;
37
38 /**
39 * This is a very simple implementation of a AbstractStageDriver which spawns
40 * a single thread to process a stage.
41 */
42 public class DedicatedThreadStageDriver extends AbstractStageDriver {
43 private final Log log = LogFactory.getLog(DedicatedThreadStageDriver.class);
44
45 //poll timeout to ensure deadlock cannot occur on thread termination
46 private long timeout;
47
48 //thread responsible for stage processing
49 private Thread workerThread;
50
51 //queue to hold data to be processed
52 private BlockingQueue queue;
53
54 //feeder used to feed data to this stage's queue
55 private final Feeder feeder = new Feeder() {
56 public void feed(Object obj) {
57 if (log.isDebugEnabled()) log.debug(obj + " is being fed to stage " + stage
58 + " (" + DedicatedThreadStageDriver.this.queue.remainingCapacity() + " available slots in queue)");
59 try {
60 DedicatedThreadStageDriver.this.queue.put(obj);
61 } catch (InterruptedException e) {
62 throw new IllegalStateException("Unexpected interrupt while waiting for space to become available for object "
63 + obj + " in queue for stage " + stage, e);
64 }
65
66 synchronized(DedicatedThreadStageDriver.this) {
67 DedicatedThreadStageDriver.this.notifyAll();
68 }
69 }
70 };
71
72 /**
73 * Creates a new DedicatedThreadStageDriver with the specified thread wait
74 * timeout and fault tolerance values.
75 * @param stage The stage that the driver will run
76 * @param context the context in which to run the stage
77 * @param queue The object queue to use for storing objects prior to processing. The
78 * default is {@link LinkedBlockingQueue}
79 * @param timeout The amount of time, in milliseconds, that the worker thread
80 * will wait before checking the processing state if no objects are available
81 * in the thread's queue.
82 * @param faultTolerance Flag determining the behavior of the driver when
83 * an error is encountered in execution of {@link Stage#process(Object)}.
84 * If this is set to false, any exception thrown during {@link Stage#process(Object)}
85 * will cause the worker thread to halt without executing {@link Stage#postprocess()}
86 * ({@link Stage#release()} will be called.)
87 */
88 public DedicatedThreadStageDriver(Stage stage, StageContext context, BlockingQueue queue, long timeout, FaultTolerance faultTolerance) {
89 super(stage, context, faultTolerance);
90 this.queue = queue;
91 this.timeout = timeout;
92 }
93
94 /**
95 * Return the Feeder used to feed data to the queue of objects to be processed.
96 * @return The feeder for objects processed by this driver's stage.
97 */
98 public Feeder getFeeder() {
99 return this.feeder;
100 }
101
102 /**
103 * Start the processing of the stage.
104 * @throws org.apache.commons.pipeline.StageException Thrown if the driver is in an illegal state during startup
105 */
106 public synchronized void start() throws StageException {
107 if (this.currentState == STOPPED) {
108 log.debug("Starting worker thread for stage " + stage + ".");
109 this.workerThread = new WorkerThread(stage);
110 this.workerThread.start();
111 log.debug("Worker thread for stage " + stage + " started.");
112
113 //wait to ensure that the stage starts up correctly
114 try {
115 while ( !(this.currentState == RUNNING || this.currentState == ERROR) ) this.wait();
116 } catch (InterruptedException e) {
117 throw new StageException(this.getStage(), "Worker thread unexpectedly interrupted while waiting for thread startup.", e);
118 }
119 } else {
120 throw new IllegalStateException("Attempt to start driver in state " + this.currentState);
121 }
122 }
123
124 /**
125 * Causes processing to shut down gracefully.
126 * @throws org.apache.commons.pipeline.StageException Thrown if the driver is in an illegal state for shutdown.
127 */
128 public synchronized void finish() throws StageException {
129 if (currentState == STOPPED) {
130 throw new IllegalStateException("The driver is not currently running.");
131 }
132
133 try {
134 while ( !(this.currentState == RUNNING || this.currentState == ERROR) ) this.wait();
135
136 //ask the worker thread to shut down
137 testAndSetState(RUNNING, STOP_REQUESTED);
138
139 while ( !(this.currentState == FINISHED || this.currentState == ERROR) ) this.wait();
140
141 log.debug("Waiting for worker thread stop for stage " + stage + ".");
142 this.workerThread.join();
143 log.debug("Worker thread for stage " + stage + " halted");
144
145 } catch (InterruptedException e) {
146 throw new StageException(this.getStage(), "Worker thread unexpectedly interrupted while waiting for graceful shutdown.", e);
147 }
148
149 setState(STOPPED);
150 }
151
152 /*********************************
153 * WORKER THREAD IMPLEMENTATIONS *
154 *********************************/
155 private UncaughtExceptionHandler workerThreadExceptionHandler = new UncaughtExceptionHandler() {
156 public void uncaughtException(Thread t, Throwable e) {
157 setState(ERROR);
158 recordFatalError(e);
159 log.error("Uncaught exception in stage " + stage, e);
160 }
161 };
162
163 /**
164 * This worker thread removes and processes data objects from the incoming synchronize
165 *
166 * queue. It first calls preprocess(), then begins a loop that calls the process()
167 * method to process data from the queue. This loop runs as long as the
168 * {@link getRunning() running} property is true or the queue is not empty. To break the loop the
169 * calling code must run the writer's finish() method to set the running property to false.
170 * At this point the loop will continue to run until the queue is empty, then the loop will
171 * exit and the postprocess() method is called.<P>
172 *
173 * @throws StageException if an error is encountered during data processing
174 * and faultTolerant is set to false.
175 */
176 private class WorkerThread extends Thread {
177 /** The Stage this thread will work on */
178 private Stage stage;
179
180 public WorkerThread(Stage stage) {
181 this.setUncaughtExceptionHandler(workerThreadExceptionHandler);
182 this.stage = stage;
183 }
184
185 public final void run() {
186 setState(STARTED);
187
188 try {
189 if (log.isDebugEnabled()) log.debug("Preprocessing stage " + stage + "...");
190 stage.preprocess();
191 if (log.isDebugEnabled()) log.debug("Preprocessing for stage " + stage + " complete.");
192
193 //do not transition into running state if an error has occurred or a stop requested
194 testAndSetState(STARTED, RUNNING);
195 running: while (currentState != ERROR) {
196 try {
197 Object obj = queue.poll(timeout, TimeUnit.MILLISECONDS);
198 if (obj == null) {
199 if (currentState == STOP_REQUESTED) break running;
200 //else continue running;
201 } else {
202 try {
203 stage.process(obj);
204 } catch (StageException e) {
205 recordProcessingException(obj, e);
206 if (faultTolerance == NONE) throw e;
207 } catch (RuntimeException e) {
208 recordProcessingException(obj, e);
209 if (faultTolerance == CHECKED || faultTolerance == NONE) throw e;
210 }
211 }
212 } catch (InterruptedException e) {
213 throw new RuntimeException("Worker thread unexpectedly interrupted while waiting on data for stage " + stage, e);
214 }
215 }
216 if (log.isDebugEnabled()) log.debug("Stage " + stage + " exited running state.");
217
218 if (log.isDebugEnabled()) log.debug("Postprocessing stage " + stage + "...");
219 stage.postprocess();
220 if (log.isDebugEnabled()) log.debug("Postprocessing for stage " + stage + " complete.");
221
222 } catch (StageException e) {
223 log.error("An error occurred in the stage " + stage, e);
224 recordFatalError(e);
225 setState(ERROR);
226 } finally {
227 if (log.isDebugEnabled()) log.debug("Releasing resources for stage " + stage + "...");
228 stage.release();
229 if (log.isDebugEnabled()) log.debug("Stage " + stage + " released.");
230 }
231
232 //do not transition into finished state if an error has occurred
233 testAndSetState(STOP_REQUESTED, FINISHED);
234 }
235 }
236
237 /**
238 * Get the size of the queue used by this StageDriver.
239 * @return the queue capacity
240 */
241 public int getQueueSize() {
242 return this.queue.size() + this.queue.remainingCapacity();
243 }
244
245 /**
246 * Get the timeout value (in milliseconds) used by this StageDriver on
247 * thread termination.
248 * @return the timeout setting in milliseconds
249 */
250 public long getTimeout() {
251 return this.timeout;
252 }
253 }