001 /*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied. See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019
020
021 package org.apache.commons.pipeline.driver;
022
023 import java.lang.Thread.UncaughtExceptionHandler;
024 import java.util.concurrent.BlockingQueue;
025 import java.util.concurrent.TimeUnit;
026 import org.apache.commons.logging.Log;
027 import org.apache.commons.logging.LogFactory;
028 import org.apache.commons.pipeline.driver.AbstractStageDriver;
029 import org.apache.commons.pipeline.Feeder;
030 import org.apache.commons.pipeline.StageDriver;
031 import org.apache.commons.pipeline.Stage;
032 import org.apache.commons.pipeline.StageContext;
033 import org.apache.commons.pipeline.StageException;
034 import static org.apache.commons.pipeline.StageDriver.State.*;
035 import org.apache.commons.pipeline.StageDriver.State;
036 import static org.apache.commons.pipeline.driver.FaultTolerance.*;
037
038 /**
039 * This is a very simple implementation of a AbstractStageDriver which spawns
040 * a single thread to process a stage.
041 */
042 public class DedicatedThreadStageDriver extends AbstractStageDriver {
043 private final Log log = LogFactory.getLog(DedicatedThreadStageDriver.class);
044
045 //poll timeout to ensure deadlock cannot occur on thread termination
046 private long timeout;
047
048 //thread responsible for stage processing
049 private Thread workerThread;
050
051 //queue to hold data to be processed
052 private BlockingQueue queue;
053
054 //feeder used to feed data to this stage's queue
055 private final Feeder feeder = new Feeder() {
056 public void feed(Object obj) {
057 if (log.isDebugEnabled()) log.debug(obj + " is being fed to stage " + stage
058 + " (" + DedicatedThreadStageDriver.this.queue.remainingCapacity() + " available slots in queue)");
059 try {
060 DedicatedThreadStageDriver.this.queue.put(obj);
061 } catch (InterruptedException e) {
062 throw new IllegalStateException("Unexpected interrupt while waiting for space to become available for object "
063 + obj + " in queue for stage " + stage, e);
064 }
065
066 synchronized(DedicatedThreadStageDriver.this) {
067 DedicatedThreadStageDriver.this.notifyAll();
068 }
069 }
070 };
071
072 /**
073 * Creates a new DedicatedThreadStageDriver with the specified thread wait
074 * timeout and fault tolerance values.
075 * @param stage The stage that the driver will run
076 * @param context the context in which to run the stage
077 * @param queue The object queue to use for storing objects prior to processing. The
078 * default is {@link LinkedBlockingQueue}
079 * @param timeout The amount of time, in milliseconds, that the worker thread
080 * will wait before checking the processing state if no objects are available
081 * in the thread's queue.
082 * @param faultTolerance Flag determining the behavior of the driver when
083 * an error is encountered in execution of {@link Stage#process(Object)}.
084 * If this is set to false, any exception thrown during {@link Stage#process(Object)}
085 * will cause the worker thread to halt without executing {@link Stage#postprocess()}
086 * ({@link Stage#release()} will be called.)
087 */
088 public DedicatedThreadStageDriver(Stage stage, StageContext context, BlockingQueue queue, long timeout, FaultTolerance faultTolerance) {
089 super(stage, context, faultTolerance);
090 this.queue = queue;
091 this.timeout = timeout;
092 }
093
094 /**
095 * Return the Feeder used to feed data to the queue of objects to be processed.
096 * @return The feeder for objects processed by this driver's stage.
097 */
098 public Feeder getFeeder() {
099 return this.feeder;
100 }
101
102 /**
103 * Start the processing of the stage.
104 * @throws org.apache.commons.pipeline.StageException Thrown if the driver is in an illegal state during startup
105 */
106 public synchronized void start() throws StageException {
107 if (this.currentState == STOPPED) {
108 log.debug("Starting worker thread for stage " + stage + ".");
109 this.workerThread = new WorkerThread(stage);
110 this.workerThread.start();
111 log.debug("Worker thread for stage " + stage + " started.");
112
113 //wait to ensure that the stage starts up correctly
114 try {
115 while ( !(this.currentState == RUNNING || this.currentState == ERROR) ) this.wait();
116 } catch (InterruptedException e) {
117 throw new StageException(this.getStage(), "Worker thread unexpectedly interrupted while waiting for thread startup.", e);
118 }
119 } else {
120 throw new IllegalStateException("Attempt to start driver in state " + this.currentState);
121 }
122 }
123
124 /**
125 * Causes processing to shut down gracefully.
126 * @throws org.apache.commons.pipeline.StageException Thrown if the driver is in an illegal state for shutdown.
127 */
128 public synchronized void finish() throws StageException {
129 if (currentState == STOPPED) {
130 throw new IllegalStateException("The driver is not currently running.");
131 }
132
133 try {
134 while ( !(this.currentState == RUNNING || this.currentState == ERROR) ) this.wait();
135
136 //ask the worker thread to shut down
137 testAndSetState(RUNNING, STOP_REQUESTED);
138
139 while ( !(this.currentState == FINISHED || this.currentState == ERROR) ) this.wait();
140
141 log.debug("Waiting for worker thread stop for stage " + stage + ".");
142 this.workerThread.join();
143 log.debug("Worker thread for stage " + stage + " halted");
144
145 } catch (InterruptedException e) {
146 throw new StageException(this.getStage(), "Worker thread unexpectedly interrupted while waiting for graceful shutdown.", e);
147 }
148
149 setState(STOPPED);
150 }
151
152 /*********************************
153 * WORKER THREAD IMPLEMENTATIONS *
154 *********************************/
155 private UncaughtExceptionHandler workerThreadExceptionHandler = new UncaughtExceptionHandler() {
156 public void uncaughtException(Thread t, Throwable e) {
157 setState(ERROR);
158 recordFatalError(e);
159 log.error("Uncaught exception in stage " + stage, e);
160 }
161 };
162
163 /**
164 * This worker thread removes and processes data objects from the incoming synchronize
165 *
166 * queue. It first calls preprocess(), then begins a loop that calls the process()
167 * method to process data from the queue. This loop runs as long as the
168 * {@link getRunning() running} property is true or the queue is not empty. To break the loop the
169 * calling code must run the writer's finish() method to set the running property to false.
170 * At this point the loop will continue to run until the queue is empty, then the loop will
171 * exit and the postprocess() method is called.<P>
172 *
173 * @throws StageException if an error is encountered during data processing
174 * and faultTolerant is set to false.
175 */
176 private class WorkerThread extends Thread {
177 /** The Stage this thread will work on */
178 private Stage stage;
179
180 public WorkerThread(Stage stage) {
181 this.setUncaughtExceptionHandler(workerThreadExceptionHandler);
182 this.stage = stage;
183 }
184
185 public final void run() {
186 setState(STARTED);
187
188 try {
189 if (log.isDebugEnabled()) log.debug("Preprocessing stage " + stage + "...");
190 stage.preprocess();
191 if (log.isDebugEnabled()) log.debug("Preprocessing for stage " + stage + " complete.");
192
193 //do not transition into running state if an error has occurred or a stop requested
194 testAndSetState(STARTED, RUNNING);
195 running: while (currentState != ERROR) {
196 try {
197 Object obj = queue.poll(timeout, TimeUnit.MILLISECONDS);
198 if (obj == null) {
199 if (currentState == STOP_REQUESTED) break running;
200 //else continue running;
201 } else {
202 try {
203 stage.process(obj);
204 } catch (StageException e) {
205 recordProcessingException(obj, e);
206 if (faultTolerance == NONE) throw e;
207 } catch (RuntimeException e) {
208 recordProcessingException(obj, e);
209 if (faultTolerance == CHECKED || faultTolerance == NONE) throw e;
210 }
211 }
212 } catch (InterruptedException e) {
213 throw new RuntimeException("Worker thread unexpectedly interrupted while waiting on data for stage " + stage, e);
214 }
215 }
216 if (log.isDebugEnabled()) log.debug("Stage " + stage + " exited running state.");
217
218 if (log.isDebugEnabled()) log.debug("Postprocessing stage " + stage + "...");
219 stage.postprocess();
220 if (log.isDebugEnabled()) log.debug("Postprocessing for stage " + stage + " complete.");
221
222 } catch (StageException e) {
223 log.error("An error occurred in the stage " + stage, e);
224 recordFatalError(e);
225 setState(ERROR);
226 } finally {
227 if (log.isDebugEnabled()) log.debug("Releasing resources for stage " + stage + "...");
228 stage.release();
229 if (log.isDebugEnabled()) log.debug("Stage " + stage + " released.");
230 }
231
232 //do not transition into finished state if an error has occurred
233 testAndSetState(STOP_REQUESTED, FINISHED);
234 }
235 }
236
237 /**
238 * Get the size of the queue used by this StageDriver.
239 * @return the queue capacity
240 */
241 public int getQueueSize() {
242 return this.queue.size() + this.queue.remainingCapacity();
243 }
244
245 /**
246 * Get the timeout value (in milliseconds) used by this StageDriver on
247 * thread termination.
248 * @return the timeout setting in milliseconds
249 */
250 public long getTimeout() {
251 return this.timeout;
252 }
253 }