001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *     http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    
018    package org.apache.commons.pipeline.driver;
019    
020    import java.util.LinkedList;
021    import java.util.Queue;
022    import org.apache.commons.logging.Log;
023    import org.apache.commons.logging.LogFactory;
024    import org.apache.commons.pipeline.Feeder;
025    import org.apache.commons.pipeline.Stage;
026    import org.apache.commons.pipeline.StageException;
027    import org.apache.commons.pipeline.StageContext;
028    import static org.apache.commons.pipeline.StageDriver.State.*;
029    import org.apache.commons.pipeline.StageDriver.State;
030    import static org.apache.commons.pipeline.driver.FaultTolerance.*;
031    
032    /**
033     * This is a non-threaded version of the AbstractStageDriver.
034     */
035    public class SynchronousStageDriver extends AbstractStageDriver {
036        private final Log log = LogFactory.getLog(SynchronousStageDriver.class);
037        
038        //queue of objects to be processed that are fed to the driver
039        //when it is not in a running state
040        private Queue<Object> queue = new LinkedList<Object>();
041        
042        //Feeder used to feed objects to this stage
043        private final Feeder feeder = new Feeder() {
044            public void feed(Object obj) {
045                synchronized (SynchronousStageDriver.this) {
046                    if (currentState == ERROR) throw new IllegalStateException("Unable to process data: driver in fatal error state.");
047                    if (currentState != RUNNING) { //enqueue objects if stage has not been started
048                        queue.add(obj);
049                        return;
050                    }
051                }
052                
053                try {
054                    stage.process(obj);
055                } catch (StageException e) {
056                    recordProcessingException(obj, e);
057                    if (faultTolerance == NONE) throw fatalError(e);
058                }
059            }
060        };
061        
062        /**
063         * Creates a new instance of SimpleStageDriver
064         * @param stage The stage to be run
065         * @param context The context in which the stage will be run
066         */
067        public SynchronousStageDriver(Stage stage, StageContext context, FaultTolerance faultTolerance) {
068            super(stage, context, faultTolerance);
069        }
070        
071        /**
072         * Get the feeder for the encapsulated stage. Since the SynchronousStageDriver
073         * is designed to run the stage in the main thread of execution, calls
074         * to {@link Feeder#feed(Object)} on the returned feeder will trigger processing
075         * of the object fed to the stage.
076         * @return The Feeder instance for the stage.
077         */
078        public Feeder getFeeder() {
079            return this.feeder;
080        }
081        
082        /**
083         * Performs preprocessing and updates the driver state.
084         * @throws org.apache.commons.pipeline.StageException Thrown if the driver is in an illegal state to be started or an error occurs
085         * during preprocessing.
086         */
087        public synchronized void start() throws StageException {
088            if (this.currentState == STOPPED) {
089                try {
090                    stage.preprocess();
091                    setState(RUNNING);
092                } catch (StageException e) {
093                    throw fatalError(e);
094                }
095                
096                // feed any queued values before returning control
097                while (!queue.isEmpty()) this.getFeeder().feed(queue.remove());
098            } else {
099                throw new IllegalStateException("Illegal attempt to start driver from state: " + this.currentState);
100            }
101        }
102        
103        /**
104         * Performs postprocessing and releases stage resources, and updates the driver
105         * state accordingly.
106         * @throws org.apache.commons.pipeline.StageException Thrown if an error occurs during postprocessing
107         */
108        public synchronized void finish() throws StageException {
109            if (this.currentState == RUNNING) {            
110                try {
111                    testAndSetState(RUNNING, STOP_REQUESTED);
112                    if (this.currentState == STOP_REQUESTED) stage.postprocess();
113                } catch (StageException e) {
114                    throw fatalError(e);
115                } finally {
116                    stage.release();
117                    testAndSetState(STOP_REQUESTED, STOPPED);
118                }            
119            } else {
120                throw new IllegalStateException("Driver is not running (current state: " + this.currentState + ")");
121            }
122        }
123        
124        /**
125         * This method obtains a lock to set the current state of processing
126         * to error, records the error and returns a RuntimeException encapsulating
127         * the specified throwable.
128         */
129        private RuntimeException fatalError(Throwable t) {
130            try {
131                setState(ERROR);
132                this.recordFatalError(t);
133                stage.release();
134                this.notifyAll();
135            } catch (Exception e) {
136                this.recordFatalError(e);
137            }
138            
139            return new RuntimeException("Fatal error halted processing of stage: " + stage);
140        }
141    }