001 /* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019 020 021 package org.apache.commons.pipeline.driver; 022 023 import java.lang.Thread.UncaughtExceptionHandler; 024 import java.util.concurrent.BlockingQueue; 025 import java.util.concurrent.TimeUnit; 026 import org.apache.commons.logging.Log; 027 import org.apache.commons.logging.LogFactory; 028 import org.apache.commons.pipeline.driver.AbstractStageDriver; 029 import org.apache.commons.pipeline.Feeder; 030 import org.apache.commons.pipeline.StageDriver; 031 import org.apache.commons.pipeline.Stage; 032 import org.apache.commons.pipeline.StageContext; 033 import org.apache.commons.pipeline.StageException; 034 import static org.apache.commons.pipeline.StageDriver.State.*; 035 import org.apache.commons.pipeline.StageDriver.State; 036 import static org.apache.commons.pipeline.driver.FaultTolerance.*; 037 038 /** 039 * This is a very simple implementation of a AbstractStageDriver which spawns 040 * a single thread to process a stage. 041 */ 042 public class DedicatedThreadStageDriver extends AbstractStageDriver { 043 private final Log log = LogFactory.getLog(DedicatedThreadStageDriver.class); 044 045 //poll timeout to ensure deadlock cannot occur on thread termination 046 private long timeout; 047 048 //thread responsible for stage processing 049 private Thread workerThread; 050 051 //queue to hold data to be processed 052 private BlockingQueue queue; 053 054 //feeder used to feed data to this stage's queue 055 private final Feeder feeder = new Feeder() { 056 public void feed(Object obj) { 057 if (log.isDebugEnabled()) log.debug(obj + " is being fed to stage " + stage 058 + " (" + DedicatedThreadStageDriver.this.queue.remainingCapacity() + " available slots in queue)"); 059 try { 060 DedicatedThreadStageDriver.this.queue.put(obj); 061 } catch (InterruptedException e) { 062 throw new IllegalStateException("Unexpected interrupt while waiting for space to become available for object " 063 + obj + " in queue for stage " + stage, e); 064 } 065 066 synchronized(DedicatedThreadStageDriver.this) { 067 DedicatedThreadStageDriver.this.notifyAll(); 068 } 069 } 070 }; 071 072 /** 073 * Creates a new DedicatedThreadStageDriver with the specified thread wait 074 * timeout and fault tolerance values. 075 * @param stage The stage that the driver will run 076 * @param context the context in which to run the stage 077 * @param queue The object queue to use for storing objects prior to processing. The 078 * default is {@link LinkedBlockingQueue} 079 * @param timeout The amount of time, in milliseconds, that the worker thread 080 * will wait before checking the processing state if no objects are available 081 * in the thread's queue. 082 * @param faultTolerance Flag determining the behavior of the driver when 083 * an error is encountered in execution of {@link Stage#process(Object)}. 084 * If this is set to false, any exception thrown during {@link Stage#process(Object)} 085 * will cause the worker thread to halt without executing {@link Stage#postprocess()} 086 * ({@link Stage#release()} will be called.) 087 */ 088 public DedicatedThreadStageDriver(Stage stage, StageContext context, BlockingQueue queue, long timeout, FaultTolerance faultTolerance) { 089 super(stage, context, faultTolerance); 090 this.queue = queue; 091 this.timeout = timeout; 092 } 093 094 /** 095 * Return the Feeder used to feed data to the queue of objects to be processed. 096 * @return The feeder for objects processed by this driver's stage. 097 */ 098 public Feeder getFeeder() { 099 return this.feeder; 100 } 101 102 /** 103 * Start the processing of the stage. 104 * @throws org.apache.commons.pipeline.StageException Thrown if the driver is in an illegal state during startup 105 */ 106 public synchronized void start() throws StageException { 107 if (this.currentState == STOPPED) { 108 log.debug("Starting worker thread for stage " + stage + "."); 109 this.workerThread = new WorkerThread(stage); 110 this.workerThread.start(); 111 log.debug("Worker thread for stage " + stage + " started."); 112 113 //wait to ensure that the stage starts up correctly 114 try { 115 while ( !(this.currentState == RUNNING || this.currentState == ERROR) ) this.wait(); 116 } catch (InterruptedException e) { 117 throw new StageException(this.getStage(), "Worker thread unexpectedly interrupted while waiting for thread startup.", e); 118 } 119 } else { 120 throw new IllegalStateException("Attempt to start driver in state " + this.currentState); 121 } 122 } 123 124 /** 125 * Causes processing to shut down gracefully. 126 * @throws org.apache.commons.pipeline.StageException Thrown if the driver is in an illegal state for shutdown. 127 */ 128 public synchronized void finish() throws StageException { 129 if (currentState == STOPPED) { 130 throw new IllegalStateException("The driver is not currently running."); 131 } 132 133 try { 134 while ( !(this.currentState == RUNNING || this.currentState == ERROR) ) this.wait(); 135 136 //ask the worker thread to shut down 137 testAndSetState(RUNNING, STOP_REQUESTED); 138 139 while ( !(this.currentState == FINISHED || this.currentState == ERROR) ) this.wait(); 140 141 log.debug("Waiting for worker thread stop for stage " + stage + "."); 142 this.workerThread.join(); 143 log.debug("Worker thread for stage " + stage + " halted"); 144 145 } catch (InterruptedException e) { 146 throw new StageException(this.getStage(), "Worker thread unexpectedly interrupted while waiting for graceful shutdown.", e); 147 } 148 149 setState(STOPPED); 150 } 151 152 /********************************* 153 * WORKER THREAD IMPLEMENTATIONS * 154 *********************************/ 155 private UncaughtExceptionHandler workerThreadExceptionHandler = new UncaughtExceptionHandler() { 156 public void uncaughtException(Thread t, Throwable e) { 157 setState(ERROR); 158 recordFatalError(e); 159 log.error("Uncaught exception in stage " + stage, e); 160 } 161 }; 162 163 /** 164 * This worker thread removes and processes data objects from the incoming synchronize 165 * 166 * queue. It first calls preprocess(), then begins a loop that calls the process() 167 * method to process data from the queue. This loop runs as long as the 168 * {@link getRunning() running} property is true or the queue is not empty. To break the loop the 169 * calling code must run the writer's finish() method to set the running property to false. 170 * At this point the loop will continue to run until the queue is empty, then the loop will 171 * exit and the postprocess() method is called.<P> 172 * 173 * @throws StageException if an error is encountered during data processing 174 * and faultTolerant is set to false. 175 */ 176 private class WorkerThread extends Thread { 177 /** The Stage this thread will work on */ 178 private Stage stage; 179 180 public WorkerThread(Stage stage) { 181 this.setUncaughtExceptionHandler(workerThreadExceptionHandler); 182 this.stage = stage; 183 } 184 185 public final void run() { 186 setState(STARTED); 187 188 try { 189 if (log.isDebugEnabled()) log.debug("Preprocessing stage " + stage + "..."); 190 stage.preprocess(); 191 if (log.isDebugEnabled()) log.debug("Preprocessing for stage " + stage + " complete."); 192 193 //do not transition into running state if an error has occurred or a stop requested 194 testAndSetState(STARTED, RUNNING); 195 running: while (currentState != ERROR) { 196 try { 197 Object obj = queue.poll(timeout, TimeUnit.MILLISECONDS); 198 if (obj == null) { 199 if (currentState == STOP_REQUESTED) break running; 200 //else continue running; 201 } else { 202 try { 203 stage.process(obj); 204 } catch (StageException e) { 205 recordProcessingException(obj, e); 206 if (faultTolerance == NONE) throw e; 207 } catch (RuntimeException e) { 208 recordProcessingException(obj, e); 209 if (faultTolerance == CHECKED || faultTolerance == NONE) throw e; 210 } 211 } 212 } catch (InterruptedException e) { 213 throw new RuntimeException("Worker thread unexpectedly interrupted while waiting on data for stage " + stage, e); 214 } 215 } 216 if (log.isDebugEnabled()) log.debug("Stage " + stage + " exited running state."); 217 218 if (log.isDebugEnabled()) log.debug("Postprocessing stage " + stage + "..."); 219 stage.postprocess(); 220 if (log.isDebugEnabled()) log.debug("Postprocessing for stage " + stage + " complete."); 221 222 } catch (StageException e) { 223 log.error("An error occurred in the stage " + stage, e); 224 recordFatalError(e); 225 setState(ERROR); 226 } finally { 227 if (log.isDebugEnabled()) log.debug("Releasing resources for stage " + stage + "..."); 228 stage.release(); 229 if (log.isDebugEnabled()) log.debug("Stage " + stage + " released."); 230 } 231 232 //do not transition into finished state if an error has occurred 233 testAndSetState(STOP_REQUESTED, FINISHED); 234 } 235 } 236 237 /** 238 * Get the size of the queue used by this StageDriver. 239 * @return the queue capacity 240 */ 241 public int getQueueSize() { 242 return this.queue.size() + this.queue.remainingCapacity(); 243 } 244 245 /** 246 * Get the timeout value (in milliseconds) used by this StageDriver on 247 * thread termination. 248 * @return the timeout setting in milliseconds 249 */ 250 public long getTimeout() { 251 return this.timeout; 252 } 253 }