View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  
20  package org.apache.commons.pipeline.driver;
21  
22  import java.lang.Thread.UncaughtExceptionHandler;
23  import java.util.concurrent.BlockingQueue;
24  import java.util.concurrent.CountDownLatch;
25  import java.util.concurrent.TimeUnit;
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.commons.pipeline.Feeder;
29  import org.apache.commons.pipeline.StageDriver;
30  import org.apache.commons.pipeline.Stage;
31  import org.apache.commons.pipeline.StageContext;
32  import org.apache.commons.pipeline.StageException;
33  import org.apache.commons.pipeline.driver.AbstractStageDriver;
34  import org.apache.commons.pipeline.driver.FaultTolerance;
35  
36  import static org.apache.commons.pipeline.StageDriver.State.*;
37  import static org.apache.commons.pipeline.driver.FaultTolerance.*;
38  
39  /**
40   * This {@link StageDriver} implementation uses a pool of threads
41   * to process objects from an input queue.
42   */
43  public class ThreadPoolStageDriver extends AbstractStageDriver {
44      // logger for the class
45      private final Log log = LogFactory.getLog(ThreadPoolStageDriver.class);
46      
47      // wait timeout to ensure deadlock cannot occur on thread termination
48      private long timeout;
49      
50      // signal telling threads to start polling queue
51      private final CountDownLatch startSignal;
52      
53      // signal threads use to tell driver they have finished
54      private final CountDownLatch doneSignal;
55      
56      // number of threads polling queue
57      private final int numThreads;
58      
59      // queue to hold data to be processed
60      private final BlockingQueue queue;
61      
62      //feeder used to feed data to this stage's queue
63      private final Feeder feeder = new Feeder() {
64          public void feed(Object obj) {
65              if (log.isDebugEnabled()) log.debug(obj + " is being fed to stage " + stage
66                      + " (" + ThreadPoolStageDriver.this.queue.remainingCapacity() + " available slots in queue)");
67              
68              try {
69                  ThreadPoolStageDriver.this.queue.put(obj);
70              } catch (InterruptedException e) {
71                  throw new IllegalStateException("Unexpected interrupt while waiting for space to become available for object "
72                          + obj + " in queue for stage " + stage, e);
73              }
74              
75              synchronized(ThreadPoolStageDriver.this) {
76                  ThreadPoolStageDriver.this.notifyAll();
77              }
78          }
79      };
80      
81      /**
82       * Creates a new ThreadPoolStageDriver.
83       *
84       * @param stage The stage that the driver will run
85       * @param context the context in which to run the stage
86       * @param queue The object queue to use for storing objects prior to processing. The
87       * default is {@link LinkedBlockingQueue}
88       * @param timeout The amount of time, in milliseconds, that the worker thread
89       * will wait before checking the processing state if no objects are available
90       * in the thread's queue.
91       * @param faultTolerance Flag determining the behavior of the driver when
92       * an error is encountered in execution of {@link Stage#process(Object)}.
93       * If this is set to false, any exception thrown during {@link Stage#process(Object)}
94       * will cause the worker thread to halt without executing {@link Stage#postprocess()}
95       * ({@link Stage#release()} will be called.)
96       * @param numThreads Number of threads that will be simultaneously reading from queue
97       */
98      public ThreadPoolStageDriver(Stage stage, StageContext context,
99              BlockingQueue queue,
100             long timeout,
101             FaultTolerance faultTolerance,
102             int numThreads) {
103         super(stage, context, faultTolerance);
104         this.numThreads = numThreads;
105         
106         this.startSignal = new CountDownLatch(1);
107         this.doneSignal = new CountDownLatch(this.numThreads);
108         
109         this.queue = queue;
110         this.timeout = timeout;
111     }
112     
113     /**
114      * Return the Feeder used to feed data to the queue of objects to be processed.
115      * @return The feeder for objects processed by this driver's stage.
116      */
117     public Feeder getFeeder() {
118         return this.feeder;
119     }
120     
121     /**
122      * Start the processing of the stage. Creates threads to poll items
123      * from queue.
124      * @throws org.apache.commons.pipeline.StageException Thrown if the driver is in an illegal state during startup
125      */
126     public synchronized void start() throws StageException {
127         if (this.currentState == STOPPED) {
128             setState(STARTED);
129             
130             if (log.isDebugEnabled()) log.debug("Preprocessing stage " + stage + "...");
131             stage.preprocess();
132             if (log.isDebugEnabled()) log.debug("Preprocessing for stage " + stage + " complete.");
133             
134             log.debug("Starting worker threads for stage " + stage + ".");
135             
136             for (int i=0;i<this.numThreads;i++) {
137                 new LatchWorkerThread(i).start();
138             }
139             
140             // let threads know they can start
141             testAndSetState(STARTED, RUNNING);
142             startSignal.countDown();
143             
144             log.debug("Worker threads for stage " + stage + " started.");
145         } else {
146             throw new IllegalStateException("Attempt to start driver in state " + this.currentState);
147         }
148     }
149     
150     /**
151      * Causes processing to shut down gracefully. Waits until all worker threads
152      * have completed. It is important that this method be called only after
153      * the completion of execution of finish() in the driver for the prior
154      * stage; parallel finish calls can cause the stage to shut down before
155      * all prior stages have finished processing.
156      *
157      * @throws org.apache.commons.pipeline.StageException Thrown if the driver is in an illegal state for shutdown.
158      */
159     public synchronized void finish() throws StageException {
160         if (currentState == STOPPED) {
161             throw new IllegalStateException("The driver is not currently running.");
162         }
163         
164         try {
165             //it may be the case that finish() is called when the driver is still in the process
166             //of starting up, so it is necessary to wait to enter the running state before
167             //a stop can be requested
168             while ( !(this.currentState == RUNNING || this.currentState == ERROR) ) this.wait();
169             
170             //ask the worker threads to shut down
171             testAndSetState(RUNNING, STOP_REQUESTED);
172             
173             if (log.isDebugEnabled()) log.debug("Waiting for worker threads to stop for stage " + stage + ".");
174             doneSignal.await();
175             if (log.isDebugEnabled()) log.debug("Worker threads for stage " + stage + " halted");
176             
177             //transition into finished state (not used internally?)
178             testAndSetState(STOP_REQUESTED, FINISHED);
179             
180             //do not run postprocessing if the driver is in an error state
181             if (this.currentState != ERROR) {
182                 if (log.isDebugEnabled()) log.debug("Postprocessing stage " + stage + "...");
183                 this.stage.postprocess();
184                 if (log.isDebugEnabled()) log.debug("Postprocessing for stage " + stage + " complete.");
185             }
186             
187             //the following lines appear to be artifacts of copy-and-paste from
188             //DedicatedThreadStageDriver.
189 //            //do not transition into finished state if an error has occurred
190 //            testAndSetState(STOP_REQUESTED, FINISHED);
191 //
192 //            while ( !(this.currentState == FINISHED || this.currentState == ERROR) ) this.wait();
193             
194         } catch (StageException e) {
195             log.error("An error occurred during postprocessing of stage " + stage , e);
196             recordFatalError(e);
197             setState(ERROR);
198         } catch (InterruptedException e) {
199             throw new StageException(this.getStage(), "StageDriver unexpectedly interrupted while waiting for shutdown of worker threads.", e);
200         } finally {
201             if (log.isDebugEnabled()) log.debug("Releasing resources for stage " + stage + "...");
202             stage.release();
203             if (log.isDebugEnabled()) log.debug("Stage " + stage + " released.");
204         }
205         
206         testAndSetState(FINISHED, STOPPED);
207     }
208     
209     /**
210      * Get the size of the queue used by this StageDriver.
211      * @return the queue capacity
212      */
213     public int getQueueSize() {
214         return this.queue.size() + this.queue.remainingCapacity();
215     }
216     
217     /**
218      * Get the timeout value (in milliseconds) used by this StageDriver on
219      * thread termination.
220      * @return the timeout setting in milliseconds
221      */
222     public long getTimeout() {
223         return this.timeout;
224     }
225     
226     /**
227      * Returns the number of threads allocated to the thread pool.
228      */
229     public int getNumThreads() {
230         return numThreads;
231     }
232     
233     /*********************************
234      * WORKER THREAD IMPLEMENTATIONS *
235      *********************************/
236     private UncaughtExceptionHandler workerThreadExceptionHandler = new UncaughtExceptionHandler() {
237         public void uncaughtException(Thread t, Throwable e) {
238             setState(ERROR);
239             recordFatalError(e);
240             log.error("Uncaught exception in stage " + stage, e);
241         }
242     };
243     
244     /**
245      * This worker thread removes and processes data objects from the incoming
246      * synchronize queue. It calls the Stage's process() method to process data
247      * from the queue. This loop runs until State has changed to
248      * STOP_REQUESTED. To break the loop the calling code must run the writer's
249      * finish() method to set the running property to false.
250      *
251      * @throws StageException if an error is encountered during data processing
252      * and faultTolerant is set to false.
253      */
254     private class LatchWorkerThread extends Thread {
255         final int threadID;
256         
257         LatchWorkerThread(int threadID) {
258             this.setUncaughtExceptionHandler(workerThreadExceptionHandler);
259             this.threadID = threadID;
260         }
261         
262         public final void run() {
263             try {
264                 ThreadPoolStageDriver.this.startSignal.await();
265                 //do not transition into running state if an error has occurred or a stop requested
266                 running: while (currentState != ERROR) {
267                     try {
268                         Object obj = queue.poll(timeout, TimeUnit.MILLISECONDS);
269                         if (obj == null) {
270                             if (currentState == STOP_REQUESTED) break running;
271                         } else {
272                             try {
273                                 stage.process(obj);
274                             } catch (StageException e) {
275                                 recordProcessingException(obj, e);
276                                 if (faultTolerance == NONE) throw e;
277                             } catch (RuntimeException e) {
278                                 recordProcessingException(obj, e);
279                                 if (faultTolerance == CHECKED || faultTolerance == NONE) throw e;
280                             }
281                         }
282                     } catch (InterruptedException e) {
283                         throw new RuntimeException("Worker thread " + this.threadID + " unexpectedly interrupted while waiting on data for stage " + stage, e);
284                     }
285                 }
286                 if (log.isDebugEnabled()) log.debug("Stage " + stage + " (threadID: " + this.threadID + ") exited running state.");
287                 
288             } catch (StageException e) {
289                 log.error("An error occurred in the stage " + stage + " (threadID: " + this.threadID + ")", e);
290                 recordFatalError(e);
291                 setState(ERROR);
292             } catch (InterruptedException e) {
293                 log.error("Stage " + stage + " (threadID: " + threadID + ") interrupted while waiting for barrier", e);
294                 recordFatalError(e);
295                 setState(ERROR);
296             } finally {
297                 doneSignal.countDown();
298                 synchronized (ThreadPoolStageDriver.this) {
299                     ThreadPoolStageDriver.this.notifyAll();
300                 }
301             }
302         }
303     }    
304 }