001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing,
013     * software distributed under the License is distributed on an
014     * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015     * KIND, either express or implied.  See the License for the
016     * specific language governing permissions and limitations
017     * under the License.
018     */
019    
020    package org.apache.commons.pipeline.driver;
021    
022    import java.lang.Thread.UncaughtExceptionHandler;
023    import java.util.concurrent.BlockingQueue;
024    import java.util.concurrent.CountDownLatch;
025    import java.util.concurrent.TimeUnit;
026    import org.apache.commons.logging.Log;
027    import org.apache.commons.logging.LogFactory;
028    import org.apache.commons.pipeline.Feeder;
029    import org.apache.commons.pipeline.StageDriver;
030    import org.apache.commons.pipeline.Stage;
031    import org.apache.commons.pipeline.StageContext;
032    import org.apache.commons.pipeline.StageException;
033    import org.apache.commons.pipeline.driver.AbstractStageDriver;
034    import org.apache.commons.pipeline.driver.FaultTolerance;
035    
036    import static org.apache.commons.pipeline.StageDriver.State.*;
037    import static org.apache.commons.pipeline.driver.FaultTolerance.*;
038    
039    /**
040     * This {@link StageDriver} implementation uses a pool of threads
041     * to process objects from an input queue.
042     */
043    public class ThreadPoolStageDriver extends AbstractStageDriver {
044        // logger for the class
045        private final Log log = LogFactory.getLog(ThreadPoolStageDriver.class);
046        
047        // wait timeout to ensure deadlock cannot occur on thread termination
048        private long timeout;
049        
050        // signal telling threads to start polling queue
051        private final CountDownLatch startSignal;
052        
053        // signal threads use to tell driver they have finished
054        private final CountDownLatch doneSignal;
055        
056        // number of threads polling queue
057        private final int numThreads;
058        
059        // queue to hold data to be processed
060        private final BlockingQueue queue;
061        
062        //feeder used to feed data to this stage's queue
063        private final Feeder feeder = new Feeder() {
064            public void feed(Object obj) {
065                if (log.isDebugEnabled()) log.debug(obj + " is being fed to stage " + stage
066                        + " (" + ThreadPoolStageDriver.this.queue.remainingCapacity() + " available slots in queue)");
067                
068                try {
069                    ThreadPoolStageDriver.this.queue.put(obj);
070                } catch (InterruptedException e) {
071                    throw new IllegalStateException("Unexpected interrupt while waiting for space to become available for object "
072                            + obj + " in queue for stage " + stage, e);
073                }
074                
075                synchronized(ThreadPoolStageDriver.this) {
076                    ThreadPoolStageDriver.this.notifyAll();
077                }
078            }
079        };
080        
081        /**
082         * Creates a new ThreadPoolStageDriver.
083         *
084         * @param stage The stage that the driver will run
085         * @param context the context in which to run the stage
086         * @param queue The object queue to use for storing objects prior to processing. The
087         * default is {@link LinkedBlockingQueue}
088         * @param timeout The amount of time, in milliseconds, that the worker thread
089         * will wait before checking the processing state if no objects are available
090         * in the thread's queue.
091         * @param faultTolerance Flag determining the behavior of the driver when
092         * an error is encountered in execution of {@link Stage#process(Object)}.
093         * If this is set to false, any exception thrown during {@link Stage#process(Object)}
094         * will cause the worker thread to halt without executing {@link Stage#postprocess()}
095         * ({@link Stage#release()} will be called.)
096         * @param numThreads Number of threads that will be simultaneously reading from queue
097         */
098        public ThreadPoolStageDriver(Stage stage, StageContext context,
099                BlockingQueue queue,
100                long timeout,
101                FaultTolerance faultTolerance,
102                int numThreads) {
103            super(stage, context, faultTolerance);
104            this.numThreads = numThreads;
105            
106            this.startSignal = new CountDownLatch(1);
107            this.doneSignal = new CountDownLatch(this.numThreads);
108            
109            this.queue = queue;
110            this.timeout = timeout;
111        }
112        
113        /**
114         * Return the Feeder used to feed data to the queue of objects to be processed.
115         * @return The feeder for objects processed by this driver's stage.
116         */
117        public Feeder getFeeder() {
118            return this.feeder;
119        }
120        
121        /**
122         * Start the processing of the stage. Creates threads to poll items
123         * from queue.
124         * @throws org.apache.commons.pipeline.StageException Thrown if the driver is in an illegal state during startup
125         */
126        public synchronized void start() throws StageException {
127            if (this.currentState == STOPPED) {
128                setState(STARTED);
129                
130                if (log.isDebugEnabled()) log.debug("Preprocessing stage " + stage + "...");
131                stage.preprocess();
132                if (log.isDebugEnabled()) log.debug("Preprocessing for stage " + stage + " complete.");
133                
134                log.debug("Starting worker threads for stage " + stage + ".");
135                
136                for (int i=0;i<this.numThreads;i++) {
137                    new LatchWorkerThread(i).start();
138                }
139                
140                // let threads know they can start
141                testAndSetState(STARTED, RUNNING);
142                startSignal.countDown();
143                
144                log.debug("Worker threads for stage " + stage + " started.");
145            } else {
146                throw new IllegalStateException("Attempt to start driver in state " + this.currentState);
147            }
148        }
149        
150        /**
151         * Causes processing to shut down gracefully. Waits until all worker threads
152         * have completed. It is important that this method be called only after
153         * the completion of execution of finish() in the driver for the prior
154         * stage; parallel finish calls can cause the stage to shut down before
155         * all prior stages have finished processing.
156         *
157         * @throws org.apache.commons.pipeline.StageException Thrown if the driver is in an illegal state for shutdown.
158         */
159        public synchronized void finish() throws StageException {
160            if (currentState == STOPPED) {
161                throw new IllegalStateException("The driver is not currently running.");
162            }
163            
164            try {
165                //it may be the case that finish() is called when the driver is still in the process
166                //of starting up, so it is necessary to wait to enter the running state before
167                //a stop can be requested
168                while ( !(this.currentState == RUNNING || this.currentState == ERROR) ) this.wait();
169                
170                //ask the worker threads to shut down
171                testAndSetState(RUNNING, STOP_REQUESTED);
172                
173                if (log.isDebugEnabled()) log.debug("Waiting for worker threads to stop for stage " + stage + ".");
174                doneSignal.await();
175                if (log.isDebugEnabled()) log.debug("Worker threads for stage " + stage + " halted");
176                
177                //transition into finished state (not used internally?)
178                testAndSetState(STOP_REQUESTED, FINISHED);
179                
180                //do not run postprocessing if the driver is in an error state
181                if (this.currentState != ERROR) {
182                    if (log.isDebugEnabled()) log.debug("Postprocessing stage " + stage + "...");
183                    this.stage.postprocess();
184                    if (log.isDebugEnabled()) log.debug("Postprocessing for stage " + stage + " complete.");
185                }
186                
187                //the following lines appear to be artifacts of copy-and-paste from
188                //DedicatedThreadStageDriver.
189    //            //do not transition into finished state if an error has occurred
190    //            testAndSetState(STOP_REQUESTED, FINISHED);
191    //
192    //            while ( !(this.currentState == FINISHED || this.currentState == ERROR) ) this.wait();
193                
194            } catch (StageException e) {
195                log.error("An error occurred during postprocessing of stage " + stage , e);
196                recordFatalError(e);
197                setState(ERROR);
198            } catch (InterruptedException e) {
199                throw new StageException(this.getStage(), "StageDriver unexpectedly interrupted while waiting for shutdown of worker threads.", e);
200            } finally {
201                if (log.isDebugEnabled()) log.debug("Releasing resources for stage " + stage + "...");
202                stage.release();
203                if (log.isDebugEnabled()) log.debug("Stage " + stage + " released.");
204            }
205            
206            testAndSetState(FINISHED, STOPPED);
207        }
208        
209        /**
210         * Get the size of the queue used by this StageDriver.
211         * @return the queue capacity
212         */
213        public int getQueueSize() {
214            return this.queue.size() + this.queue.remainingCapacity();
215        }
216        
217        /**
218         * Get the timeout value (in milliseconds) used by this StageDriver on
219         * thread termination.
220         * @return the timeout setting in milliseconds
221         */
222        public long getTimeout() {
223            return this.timeout;
224        }
225        
226        /**
227         * Returns the number of threads allocated to the thread pool.
228         */
229        public int getNumThreads() {
230            return numThreads;
231        }
232        
233        /*********************************
234         * WORKER THREAD IMPLEMENTATIONS *
235         *********************************/
236        private UncaughtExceptionHandler workerThreadExceptionHandler = new UncaughtExceptionHandler() {
237            public void uncaughtException(Thread t, Throwable e) {
238                setState(ERROR);
239                recordFatalError(e);
240                log.error("Uncaught exception in stage " + stage, e);
241            }
242        };
243        
244        /**
245         * This worker thread removes and processes data objects from the incoming
246         * synchronize queue. It calls the Stage's process() method to process data
247         * from the queue. This loop runs until State has changed to
248         * STOP_REQUESTED. To break the loop the calling code must run the writer's
249         * finish() method to set the running property to false.
250         *
251         * @throws StageException if an error is encountered during data processing
252         * and faultTolerant is set to false.
253         */
254        private class LatchWorkerThread extends Thread {
255            final int threadID;
256            
257            LatchWorkerThread(int threadID) {
258                this.setUncaughtExceptionHandler(workerThreadExceptionHandler);
259                this.threadID = threadID;
260            }
261            
262            public final void run() {
263                try {
264                    ThreadPoolStageDriver.this.startSignal.await();
265                    //do not transition into running state if an error has occurred or a stop requested
266                    running: while (currentState != ERROR) {
267                        try {
268                            Object obj = queue.poll(timeout, TimeUnit.MILLISECONDS);
269                            if (obj == null) {
270                                if (currentState == STOP_REQUESTED) break running;
271                            } else {
272                                try {
273                                    stage.process(obj);
274                                } catch (StageException e) {
275                                    recordProcessingException(obj, e);
276                                    if (faultTolerance == NONE) throw e;
277                                } catch (RuntimeException e) {
278                                    recordProcessingException(obj, e);
279                                    if (faultTolerance == CHECKED || faultTolerance == NONE) throw e;
280                                }
281                            }
282                        } catch (InterruptedException e) {
283                            throw new RuntimeException("Worker thread " + this.threadID + " unexpectedly interrupted while waiting on data for stage " + stage, e);
284                        }
285                    }
286                    if (log.isDebugEnabled()) log.debug("Stage " + stage + " (threadID: " + this.threadID + ") exited running state.");
287                    
288                } catch (StageException e) {
289                    log.error("An error occurred in the stage " + stage + " (threadID: " + this.threadID + ")", e);
290                    recordFatalError(e);
291                    setState(ERROR);
292                } catch (InterruptedException e) {
293                    log.error("Stage " + stage + " (threadID: " + threadID + ") interrupted while waiting for barrier", e);
294                    recordFatalError(e);
295                    setState(ERROR);
296                } finally {
297                    doneSignal.countDown();
298                    synchronized (ThreadPoolStageDriver.this) {
299                        ThreadPoolStageDriver.this.notifyAll();
300                    }
301                }
302            }
303        }    
304    }