001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing,
013     * software distributed under the License is distributed on an
014     * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015     * KIND, either express or implied.  See the License for the
016     * specific language governing permissions and limitations
017     * under the License.    
018     */ 
019    
020    
021    package org.apache.commons.pipeline.driver;
022    
023    import java.lang.Thread.UncaughtExceptionHandler;
024    import java.util.concurrent.BlockingQueue;
025    import java.util.concurrent.TimeUnit;
026    import org.apache.commons.logging.Log;
027    import org.apache.commons.logging.LogFactory;
028    import org.apache.commons.pipeline.driver.AbstractStageDriver;
029    import org.apache.commons.pipeline.Feeder;
030    import org.apache.commons.pipeline.StageDriver;
031    import org.apache.commons.pipeline.Stage;
032    import org.apache.commons.pipeline.StageContext;
033    import org.apache.commons.pipeline.StageException;
034    import static org.apache.commons.pipeline.StageDriver.State.*;
035    import org.apache.commons.pipeline.StageDriver.State;
036    import static org.apache.commons.pipeline.driver.FaultTolerance.*;
037    
038    /**
039     * This is a very simple implementation of a AbstractStageDriver which spawns
040     * a single thread to process a stage.
041     */
042    public class DedicatedThreadStageDriver extends AbstractStageDriver {
043        private final Log log = LogFactory.getLog(DedicatedThreadStageDriver.class);
044        
045        //poll timeout to ensure deadlock cannot occur on thread termination
046        private long timeout;
047        
048        //thread responsible for stage processing
049        private Thread workerThread;
050        
051        //queue to hold data to be processed
052        private BlockingQueue queue;    
053        
054        //feeder used to feed data to this stage's queue
055        private final Feeder feeder = new Feeder() {
056            public void feed(Object obj) {
057                if (log.isDebugEnabled()) log.debug(obj + " is being fed to stage " + stage
058                        + " (" + DedicatedThreadStageDriver.this.queue.remainingCapacity() + " available slots in queue)");
059                try {
060                    DedicatedThreadStageDriver.this.queue.put(obj);
061                } catch (InterruptedException e) {
062                    throw new IllegalStateException("Unexpected interrupt while waiting for space to become available for object "
063                            + obj + " in queue for stage " + stage, e);
064                }
065                
066                synchronized(DedicatedThreadStageDriver.this) {
067                    DedicatedThreadStageDriver.this.notifyAll();
068                }
069            }
070        };
071        
072        /**
073         * Creates a new DedicatedThreadStageDriver with the specified thread wait
074         * timeout and fault tolerance values.
075         * @param stage The stage that the driver will run
076         * @param context the context in which to run the stage
077         * @param queue The object queue to use for storing objects prior to processing. The
078         * default is {@link LinkedBlockingQueue}
079         * @param timeout The amount of time, in milliseconds, that the worker thread
080         * will wait before checking the processing state if no objects are available
081         * in the thread's queue.
082         * @param faultTolerance Flag determining the behavior of the driver when
083         * an error is encountered in execution of {@link Stage#process(Object)}.
084         * If this is set to false, any exception thrown during {@link Stage#process(Object)}
085         * will cause the worker thread to halt without executing {@link Stage#postprocess()}
086         * ({@link Stage#release()} will be called.)
087         */
088        public DedicatedThreadStageDriver(Stage stage, StageContext context, BlockingQueue queue, long timeout, FaultTolerance faultTolerance) {
089            super(stage, context, faultTolerance);
090            this.queue = queue;
091            this.timeout = timeout;
092        }
093        
094        /**
095         * Return the Feeder used to feed data to the queue of objects to be processed.
096         * @return The feeder for objects processed by this driver's stage.
097         */
098        public Feeder getFeeder() {
099            return this.feeder;
100        }
101        
102        /**
103         * Start the processing of the stage.
104         * @throws org.apache.commons.pipeline.StageException Thrown if the driver is in an illegal state during startup
105         */
106        public synchronized void start() throws StageException {
107            if (this.currentState == STOPPED) {
108                log.debug("Starting worker thread for stage " + stage + ".");
109                this.workerThread = new WorkerThread(stage);
110                this.workerThread.start();
111                log.debug("Worker thread for stage " + stage + " started.");
112                
113                //wait to ensure that the stage starts up correctly
114                try {
115                    while ( !(this.currentState == RUNNING || this.currentState == ERROR) ) this.wait();
116                } catch (InterruptedException e) {
117                    throw new StageException(this.getStage(), "Worker thread unexpectedly interrupted while waiting for thread startup.", e);
118                }
119            } else {
120                throw new IllegalStateException("Attempt to start driver in state " + this.currentState);
121            }
122        }
123        
124        /**
125         * Causes processing to shut down gracefully.
126         * @throws org.apache.commons.pipeline.StageException Thrown if the driver is in an illegal state for shutdown.
127         */
128        public synchronized void finish() throws StageException {
129            if (currentState == STOPPED) {
130                throw new IllegalStateException("The driver is not currently running.");
131            }
132            
133            try {
134                while ( !(this.currentState == RUNNING || this.currentState == ERROR) ) this.wait();
135                
136                //ask the worker thread to shut down
137                testAndSetState(RUNNING, STOP_REQUESTED);
138                
139                while ( !(this.currentState == FINISHED || this.currentState == ERROR) ) this.wait();
140                
141                log.debug("Waiting for worker thread stop for stage " + stage + ".");
142                this.workerThread.join();
143                log.debug("Worker thread for stage " + stage + " halted");
144                
145            } catch (InterruptedException e) {
146                throw new StageException(this.getStage(), "Worker thread unexpectedly interrupted while waiting for graceful shutdown.", e);
147            }
148            
149            setState(STOPPED);
150        }
151        
152        /*********************************
153         * WORKER THREAD IMPLEMENTATIONS *
154         *********************************/
155        private UncaughtExceptionHandler workerThreadExceptionHandler = new UncaughtExceptionHandler() {
156            public void uncaughtException(Thread t, Throwable e) {
157                setState(ERROR);
158                recordFatalError(e);
159                log.error("Uncaught exception in stage " + stage, e);
160            }
161        };
162        
163        /**
164         * This worker thread removes and processes data objects from the incoming                synchronize
165         *
166         * queue. It first calls preprocess(), then begins a loop that calls the process()
167         * method to process data from the queue. This loop runs as long as the
168         * {@link getRunning() running} property is true or the queue is not empty. To break the loop the
169         * calling code must run the writer's finish() method to set the running property to false.
170         * At this point the loop will continue to run until the queue is empty, then the loop will
171         * exit and the postprocess() method is called.<P>
172         *
173         * @throws StageException if an error is encountered during data processing
174         * and faultTolerant is set to false.
175         */
176        private class WorkerThread extends Thread {
177            /** The Stage this thread will work on */
178            private Stage stage;
179            
180            public WorkerThread(Stage stage) {
181                this.setUncaughtExceptionHandler(workerThreadExceptionHandler);
182                this.stage = stage;
183            }
184            
185            public final void run() {
186                setState(STARTED);
187                
188                try {
189                    if (log.isDebugEnabled()) log.debug("Preprocessing stage " + stage + "...");
190                    stage.preprocess();
191                    if (log.isDebugEnabled()) log.debug("Preprocessing for stage " + stage + " complete.");
192                    
193                    //do not transition into running state if an error has occurred or a stop requested
194                    testAndSetState(STARTED, RUNNING);
195                    running: while (currentState != ERROR) {
196                        try {
197                            Object obj = queue.poll(timeout, TimeUnit.MILLISECONDS);
198                            if (obj == null) {
199                                if (currentState == STOP_REQUESTED) break running;
200                                //else continue running;
201                            } else {
202                                try {
203                                    stage.process(obj);
204                                } catch (StageException e) {
205                                    recordProcessingException(obj, e);
206                                    if (faultTolerance == NONE) throw e;
207                                } catch (RuntimeException e) {
208                                    recordProcessingException(obj, e);
209                                    if (faultTolerance == CHECKED || faultTolerance == NONE) throw e;
210                                }
211                            }
212                        } catch (InterruptedException e) {
213                            throw new RuntimeException("Worker thread unexpectedly interrupted while waiting on data for stage " + stage, e);
214                        }
215                    }
216                    if (log.isDebugEnabled()) log.debug("Stage " + stage + " exited running state.");
217                    
218                    if (log.isDebugEnabled()) log.debug("Postprocessing stage " + stage + "...");
219                    stage.postprocess();
220                    if (log.isDebugEnabled()) log.debug("Postprocessing for stage " + stage + " complete.");
221                    
222                } catch (StageException e) {
223                    log.error("An error occurred in the stage " + stage, e);
224                    recordFatalError(e);
225                    setState(ERROR);
226                } finally {
227                    if (log.isDebugEnabled()) log.debug("Releasing resources for stage " + stage + "...");
228                    stage.release();
229                    if (log.isDebugEnabled()) log.debug("Stage " + stage + " released.");
230                }
231                
232                //do not transition into finished state if an error has occurred
233                testAndSetState(STOP_REQUESTED, FINISHED);
234            }
235        }
236        
237        /**
238         * Get the size of the queue used by this StageDriver.
239         * @return the queue capacity
240         */
241        public int getQueueSize() {
242            return this.queue.size() + this.queue.remainingCapacity();
243        }
244        
245        /**
246         * Get the timeout value (in milliseconds) used by this StageDriver on
247         * thread termination.
248         * @return the timeout setting in milliseconds
249         */
250        public long getTimeout() {
251            return this.timeout;
252        }
253    }