View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    * 
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   * 
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.    
18   */ 
19  
20  
21  package org.apache.commons.pipeline.driver;
22  
23  import java.lang.Thread.UncaughtExceptionHandler;
24  import java.util.concurrent.BlockingQueue;
25  import java.util.concurrent.TimeUnit;
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.commons.pipeline.driver.AbstractStageDriver;
29  import org.apache.commons.pipeline.Feeder;
30  import org.apache.commons.pipeline.StageDriver;
31  import org.apache.commons.pipeline.Stage;
32  import org.apache.commons.pipeline.StageContext;
33  import org.apache.commons.pipeline.StageException;
34  import static org.apache.commons.pipeline.StageDriver.State.*;
35  import org.apache.commons.pipeline.StageDriver.State;
36  import static org.apache.commons.pipeline.driver.FaultTolerance.*;
37  
38  /**
39   * This is a very simple implementation of a AbstractStageDriver which spawns
40   * a single thread to process a stage.
41   */
42  public class DedicatedThreadStageDriver extends AbstractStageDriver {
43      private final Log log = LogFactory.getLog(DedicatedThreadStageDriver.class);
44      
45      //poll timeout to ensure deadlock cannot occur on thread termination
46      private long timeout;
47      
48      //thread responsible for stage processing
49      private Thread workerThread;
50      
51      //queue to hold data to be processed
52      private BlockingQueue queue;    
53      
54      //feeder used to feed data to this stage's queue
55      private final Feeder feeder = new Feeder() {
56          public void feed(Object obj) {
57              if (log.isDebugEnabled()) log.debug(obj + " is being fed to stage " + stage
58                      + " (" + DedicatedThreadStageDriver.this.queue.remainingCapacity() + " available slots in queue)");
59              try {
60                  DedicatedThreadStageDriver.this.queue.put(obj);
61              } catch (InterruptedException e) {
62                  throw new IllegalStateException("Unexpected interrupt while waiting for space to become available for object "
63                          + obj + " in queue for stage " + stage, e);
64              }
65              
66              synchronized(DedicatedThreadStageDriver.this) {
67                  DedicatedThreadStageDriver.this.notifyAll();
68              }
69          }
70      };
71      
72      /**
73       * Creates a new DedicatedThreadStageDriver with the specified thread wait
74       * timeout and fault tolerance values.
75       * @param stage The stage that the driver will run
76       * @param context the context in which to run the stage
77       * @param queue The object queue to use for storing objects prior to processing. The
78       * default is {@link LinkedBlockingQueue}
79       * @param timeout The amount of time, in milliseconds, that the worker thread
80       * will wait before checking the processing state if no objects are available
81       * in the thread's queue.
82       * @param faultTolerance Flag determining the behavior of the driver when
83       * an error is encountered in execution of {@link Stage#process(Object)}.
84       * If this is set to false, any exception thrown during {@link Stage#process(Object)}
85       * will cause the worker thread to halt without executing {@link Stage#postprocess()}
86       * ({@link Stage#release()} will be called.)
87       */
88      public DedicatedThreadStageDriver(Stage stage, StageContext context, BlockingQueue queue, long timeout, FaultTolerance faultTolerance) {
89          super(stage, context, faultTolerance);
90          this.queue = queue;
91          this.timeout = timeout;
92      }
93      
94      /**
95       * Return the Feeder used to feed data to the queue of objects to be processed.
96       * @return The feeder for objects processed by this driver's stage.
97       */
98      public Feeder getFeeder() {
99          return this.feeder;
100     }
101     
102     /**
103      * Start the processing of the stage.
104      * @throws org.apache.commons.pipeline.StageException Thrown if the driver is in an illegal state during startup
105      */
106     public synchronized void start() throws StageException {
107         if (this.currentState == STOPPED) {
108             log.debug("Starting worker thread for stage " + stage + ".");
109             this.workerThread = new WorkerThread(stage);
110             this.workerThread.start();
111             log.debug("Worker thread for stage " + stage + " started.");
112             
113             //wait to ensure that the stage starts up correctly
114             try {
115                 while ( !(this.currentState == RUNNING || this.currentState == ERROR) ) this.wait();
116             } catch (InterruptedException e) {
117                 throw new StageException(this.getStage(), "Worker thread unexpectedly interrupted while waiting for thread startup.", e);
118             }
119         } else {
120             throw new IllegalStateException("Attempt to start driver in state " + this.currentState);
121         }
122     }
123     
124     /**
125      * Causes processing to shut down gracefully.
126      * @throws org.apache.commons.pipeline.StageException Thrown if the driver is in an illegal state for shutdown.
127      */
128     public synchronized void finish() throws StageException {
129         if (currentState == STOPPED) {
130             throw new IllegalStateException("The driver is not currently running.");
131         }
132         
133         try {
134             while ( !(this.currentState == RUNNING || this.currentState == ERROR) ) this.wait();
135             
136             //ask the worker thread to shut down
137             testAndSetState(RUNNING, STOP_REQUESTED);
138             
139             while ( !(this.currentState == FINISHED || this.currentState == ERROR) ) this.wait();
140             
141             log.debug("Waiting for worker thread stop for stage " + stage + ".");
142             this.workerThread.join();
143             log.debug("Worker thread for stage " + stage + " halted");
144             
145         } catch (InterruptedException e) {
146             throw new StageException(this.getStage(), "Worker thread unexpectedly interrupted while waiting for graceful shutdown.", e);
147         }
148         
149         setState(STOPPED);
150     }
151     
152     /*********************************
153      * WORKER THREAD IMPLEMENTATIONS *
154      *********************************/
155     private UncaughtExceptionHandler workerThreadExceptionHandler = new UncaughtExceptionHandler() {
156         public void uncaughtException(Thread t, Throwable e) {
157             setState(ERROR);
158             recordFatalError(e);
159             log.error("Uncaught exception in stage " + stage, e);
160         }
161     };
162     
163     /**
164      * This worker thread removes and processes data objects from the incoming                synchronize
165      *
166      * queue. It first calls preprocess(), then begins a loop that calls the process()
167      * method to process data from the queue. This loop runs as long as the
168      * {@link getRunning() running} property is true or the queue is not empty. To break the loop the
169      * calling code must run the writer's finish() method to set the running property to false.
170      * At this point the loop will continue to run until the queue is empty, then the loop will
171      * exit and the postprocess() method is called.<P>
172      *
173      * @throws StageException if an error is encountered during data processing
174      * and faultTolerant is set to false.
175      */
176     private class WorkerThread extends Thread {
177         /** The Stage this thread will work on */
178         private Stage stage;
179         
180         public WorkerThread(Stage stage) {
181             this.setUncaughtExceptionHandler(workerThreadExceptionHandler);
182             this.stage = stage;
183         }
184         
185         public final void run() {
186             setState(STARTED);
187             
188             try {
189                 if (log.isDebugEnabled()) log.debug("Preprocessing stage " + stage + "...");
190                 stage.preprocess();
191                 if (log.isDebugEnabled()) log.debug("Preprocessing for stage " + stage + " complete.");
192                 
193                 //do not transition into running state if an error has occurred or a stop requested
194                 testAndSetState(STARTED, RUNNING);
195                 running: while (currentState != ERROR) {
196                     try {
197                         Object obj = queue.poll(timeout, TimeUnit.MILLISECONDS);
198                         if (obj == null) {
199                             if (currentState == STOP_REQUESTED) break running;
200                             //else continue running;
201                         } else {
202                             try {
203                                 stage.process(obj);
204                             } catch (StageException e) {
205                                 recordProcessingException(obj, e);
206                                 if (faultTolerance == NONE) throw e;
207                             } catch (RuntimeException e) {
208                                 recordProcessingException(obj, e);
209                                 if (faultTolerance == CHECKED || faultTolerance == NONE) throw e;
210                             }
211                         }
212                     } catch (InterruptedException e) {
213                         throw new RuntimeException("Worker thread unexpectedly interrupted while waiting on data for stage " + stage, e);
214                     }
215                 }
216                 if (log.isDebugEnabled()) log.debug("Stage " + stage + " exited running state.");
217                 
218                 if (log.isDebugEnabled()) log.debug("Postprocessing stage " + stage + "...");
219                 stage.postprocess();
220                 if (log.isDebugEnabled()) log.debug("Postprocessing for stage " + stage + " complete.");
221                 
222             } catch (StageException e) {
223                 log.error("An error occurred in the stage " + stage, e);
224                 recordFatalError(e);
225                 setState(ERROR);
226             } finally {
227                 if (log.isDebugEnabled()) log.debug("Releasing resources for stage " + stage + "...");
228                 stage.release();
229                 if (log.isDebugEnabled()) log.debug("Stage " + stage + " released.");
230             }
231             
232             //do not transition into finished state if an error has occurred
233             testAndSetState(STOP_REQUESTED, FINISHED);
234         }
235     }
236     
237     /**
238      * Get the size of the queue used by this StageDriver.
239      * @return the queue capacity
240      */
241     public int getQueueSize() {
242         return this.queue.size() + this.queue.remainingCapacity();
243     }
244     
245     /**
246      * Get the timeout value (in milliseconds) used by this StageDriver on
247      * thread termination.
248      * @return the timeout setting in milliseconds
249      */
250     public long getTimeout() {
251         return this.timeout;
252     }
253 }