001 /* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019 020 package org.apache.commons.pipeline.driver; 021 022 import java.lang.Thread.UncaughtExceptionHandler; 023 import java.util.concurrent.BlockingQueue; 024 import java.util.concurrent.CountDownLatch; 025 import java.util.concurrent.TimeUnit; 026 import org.apache.commons.logging.Log; 027 import org.apache.commons.logging.LogFactory; 028 import org.apache.commons.pipeline.Feeder; 029 import org.apache.commons.pipeline.StageDriver; 030 import org.apache.commons.pipeline.Stage; 031 import org.apache.commons.pipeline.StageContext; 032 import org.apache.commons.pipeline.StageException; 033 import org.apache.commons.pipeline.driver.AbstractStageDriver; 034 import org.apache.commons.pipeline.driver.FaultTolerance; 035 036 import static org.apache.commons.pipeline.StageDriver.State.*; 037 import static org.apache.commons.pipeline.driver.FaultTolerance.*; 038 039 /** 040 * This {@link StageDriver} implementation uses a pool of threads 041 * to process objects from an input queue. 042 */ 043 public class ThreadPoolStageDriver extends AbstractStageDriver { 044 // logger for the class 045 private final Log log = LogFactory.getLog(ThreadPoolStageDriver.class); 046 047 // wait timeout to ensure deadlock cannot occur on thread termination 048 private long timeout; 049 050 // signal telling threads to start polling queue 051 private final CountDownLatch startSignal; 052 053 // signal threads use to tell driver they have finished 054 private final CountDownLatch doneSignal; 055 056 // number of threads polling queue 057 private final int numThreads; 058 059 // queue to hold data to be processed 060 private final BlockingQueue queue; 061 062 //feeder used to feed data to this stage's queue 063 private final Feeder feeder = new Feeder() { 064 public void feed(Object obj) { 065 if (log.isDebugEnabled()) log.debug(obj + " is being fed to stage " + stage 066 + " (" + ThreadPoolStageDriver.this.queue.remainingCapacity() + " available slots in queue)"); 067 068 try { 069 ThreadPoolStageDriver.this.queue.put(obj); 070 } catch (InterruptedException e) { 071 throw new IllegalStateException("Unexpected interrupt while waiting for space to become available for object " 072 + obj + " in queue for stage " + stage, e); 073 } 074 075 synchronized(ThreadPoolStageDriver.this) { 076 ThreadPoolStageDriver.this.notifyAll(); 077 } 078 } 079 }; 080 081 /** 082 * Creates a new ThreadPoolStageDriver. 083 * 084 * @param stage The stage that the driver will run 085 * @param context the context in which to run the stage 086 * @param queue The object queue to use for storing objects prior to processing. The 087 * default is {@link LinkedBlockingQueue} 088 * @param timeout The amount of time, in milliseconds, that the worker thread 089 * will wait before checking the processing state if no objects are available 090 * in the thread's queue. 091 * @param faultTolerance Flag determining the behavior of the driver when 092 * an error is encountered in execution of {@link Stage#process(Object)}. 093 * If this is set to false, any exception thrown during {@link Stage#process(Object)} 094 * will cause the worker thread to halt without executing {@link Stage#postprocess()} 095 * ({@link Stage#release()} will be called.) 096 * @param numThreads Number of threads that will be simultaneously reading from queue 097 */ 098 public ThreadPoolStageDriver(Stage stage, StageContext context, 099 BlockingQueue queue, 100 long timeout, 101 FaultTolerance faultTolerance, 102 int numThreads) { 103 super(stage, context, faultTolerance); 104 this.numThreads = numThreads; 105 106 this.startSignal = new CountDownLatch(1); 107 this.doneSignal = new CountDownLatch(this.numThreads); 108 109 this.queue = queue; 110 this.timeout = timeout; 111 } 112 113 /** 114 * Return the Feeder used to feed data to the queue of objects to be processed. 115 * @return The feeder for objects processed by this driver's stage. 116 */ 117 public Feeder getFeeder() { 118 return this.feeder; 119 } 120 121 /** 122 * Start the processing of the stage. Creates threads to poll items 123 * from queue. 124 * @throws org.apache.commons.pipeline.StageException Thrown if the driver is in an illegal state during startup 125 */ 126 public synchronized void start() throws StageException { 127 if (this.currentState == STOPPED) { 128 setState(STARTED); 129 130 if (log.isDebugEnabled()) log.debug("Preprocessing stage " + stage + "..."); 131 stage.preprocess(); 132 if (log.isDebugEnabled()) log.debug("Preprocessing for stage " + stage + " complete."); 133 134 log.debug("Starting worker threads for stage " + stage + "."); 135 136 for (int i=0;i<this.numThreads;i++) { 137 new LatchWorkerThread(i).start(); 138 } 139 140 // let threads know they can start 141 testAndSetState(STARTED, RUNNING); 142 startSignal.countDown(); 143 144 log.debug("Worker threads for stage " + stage + " started."); 145 } else { 146 throw new IllegalStateException("Attempt to start driver in state " + this.currentState); 147 } 148 } 149 150 /** 151 * Causes processing to shut down gracefully. Waits until all worker threads 152 * have completed. It is important that this method be called only after 153 * the completion of execution of finish() in the driver for the prior 154 * stage; parallel finish calls can cause the stage to shut down before 155 * all prior stages have finished processing. 156 * 157 * @throws org.apache.commons.pipeline.StageException Thrown if the driver is in an illegal state for shutdown. 158 */ 159 public synchronized void finish() throws StageException { 160 if (currentState == STOPPED) { 161 throw new IllegalStateException("The driver is not currently running."); 162 } 163 164 try { 165 //it may be the case that finish() is called when the driver is still in the process 166 //of starting up, so it is necessary to wait to enter the running state before 167 //a stop can be requested 168 while ( !(this.currentState == RUNNING || this.currentState == ERROR) ) this.wait(); 169 170 //ask the worker threads to shut down 171 testAndSetState(RUNNING, STOP_REQUESTED); 172 173 if (log.isDebugEnabled()) log.debug("Waiting for worker threads to stop for stage " + stage + "."); 174 doneSignal.await(); 175 if (log.isDebugEnabled()) log.debug("Worker threads for stage " + stage + " halted"); 176 177 //transition into finished state (not used internally?) 178 testAndSetState(STOP_REQUESTED, FINISHED); 179 180 //do not run postprocessing if the driver is in an error state 181 if (this.currentState != ERROR) { 182 if (log.isDebugEnabled()) log.debug("Postprocessing stage " + stage + "..."); 183 this.stage.postprocess(); 184 if (log.isDebugEnabled()) log.debug("Postprocessing for stage " + stage + " complete."); 185 } 186 187 //the following lines appear to be artifacts of copy-and-paste from 188 //DedicatedThreadStageDriver. 189 // //do not transition into finished state if an error has occurred 190 // testAndSetState(STOP_REQUESTED, FINISHED); 191 // 192 // while ( !(this.currentState == FINISHED || this.currentState == ERROR) ) this.wait(); 193 194 } catch (StageException e) { 195 log.error("An error occurred during postprocessing of stage " + stage , e); 196 recordFatalError(e); 197 setState(ERROR); 198 } catch (InterruptedException e) { 199 throw new StageException(this.getStage(), "StageDriver unexpectedly interrupted while waiting for shutdown of worker threads.", e); 200 } finally { 201 if (log.isDebugEnabled()) log.debug("Releasing resources for stage " + stage + "..."); 202 stage.release(); 203 if (log.isDebugEnabled()) log.debug("Stage " + stage + " released."); 204 } 205 206 testAndSetState(FINISHED, STOPPED); 207 } 208 209 /** 210 * Get the size of the queue used by this StageDriver. 211 * @return the queue capacity 212 */ 213 public int getQueueSize() { 214 return this.queue.size() + this.queue.remainingCapacity(); 215 } 216 217 /** 218 * Get the timeout value (in milliseconds) used by this StageDriver on 219 * thread termination. 220 * @return the timeout setting in milliseconds 221 */ 222 public long getTimeout() { 223 return this.timeout; 224 } 225 226 /** 227 * Returns the number of threads allocated to the thread pool. 228 */ 229 public int getNumThreads() { 230 return numThreads; 231 } 232 233 /********************************* 234 * WORKER THREAD IMPLEMENTATIONS * 235 *********************************/ 236 private UncaughtExceptionHandler workerThreadExceptionHandler = new UncaughtExceptionHandler() { 237 public void uncaughtException(Thread t, Throwable e) { 238 setState(ERROR); 239 recordFatalError(e); 240 log.error("Uncaught exception in stage " + stage, e); 241 } 242 }; 243 244 /** 245 * This worker thread removes and processes data objects from the incoming 246 * synchronize queue. It calls the Stage's process() method to process data 247 * from the queue. This loop runs until State has changed to 248 * STOP_REQUESTED. To break the loop the calling code must run the writer's 249 * finish() method to set the running property to false. 250 * 251 * @throws StageException if an error is encountered during data processing 252 * and faultTolerant is set to false. 253 */ 254 private class LatchWorkerThread extends Thread { 255 final int threadID; 256 257 LatchWorkerThread(int threadID) { 258 this.setUncaughtExceptionHandler(workerThreadExceptionHandler); 259 this.threadID = threadID; 260 } 261 262 public final void run() { 263 try { 264 ThreadPoolStageDriver.this.startSignal.await(); 265 //do not transition into running state if an error has occurred or a stop requested 266 running: while (currentState != ERROR) { 267 try { 268 Object obj = queue.poll(timeout, TimeUnit.MILLISECONDS); 269 if (obj == null) { 270 if (currentState == STOP_REQUESTED) break running; 271 } else { 272 try { 273 stage.process(obj); 274 } catch (StageException e) { 275 recordProcessingException(obj, e); 276 if (faultTolerance == NONE) throw e; 277 } catch (RuntimeException e) { 278 recordProcessingException(obj, e); 279 if (faultTolerance == CHECKED || faultTolerance == NONE) throw e; 280 } 281 } 282 } catch (InterruptedException e) { 283 throw new RuntimeException("Worker thread " + this.threadID + " unexpectedly interrupted while waiting on data for stage " + stage, e); 284 } 285 } 286 if (log.isDebugEnabled()) log.debug("Stage " + stage + " (threadID: " + this.threadID + ") exited running state."); 287 288 } catch (StageException e) { 289 log.error("An error occurred in the stage " + stage + " (threadID: " + this.threadID + ")", e); 290 recordFatalError(e); 291 setState(ERROR); 292 } catch (InterruptedException e) { 293 log.error("Stage " + stage + " (threadID: " + threadID + ") interrupted while waiting for barrier", e); 294 recordFatalError(e); 295 setState(ERROR); 296 } finally { 297 doneSignal.countDown(); 298 synchronized (ThreadPoolStageDriver.this) { 299 ThreadPoolStageDriver.this.notifyAll(); 300 } 301 } 302 } 303 } 304 }