001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *     http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    
018    package org.apache.commons.pipeline.driver;
019    
020    import java.util.ArrayList;
021    import java.util.List;
022    import org.apache.commons.pipeline.*;
023    
024    /**
025     * This interface is used to define how processing for a stage is started,
026     * stopped, and run. AbstractStageDriver implementations may run stages in one or
027     * more threads, and use the {@link StageMonitor} interface to provide communication
028     * between the stage, the driver, and the enclosing pipeline.
029     */
030    public abstract class AbstractStageDriver implements StageDriver {
031        
032        /**
033         * The stage to run.
034         */
035        protected Stage stage;
036        
037        /**
038         * The context for the stage being run
039         */
040        protected StageContext context;    
041        
042        /**
043         * The current state of processing. In most drivers, this is used for
044         * thread control.
045         */
046        protected volatile State currentState = State.STOPPED;    
047                
048        /**
049         * Enumerated value indicating the fault tolerance level of the StageDriver.
050         */
051        protected FaultTolerance faultTolerance = FaultTolerance.NONE;
052        
053        /**
054         * List of processing failures that have occurred.
055         */
056        protected List<ProcessingException> processingExceptions = new ArrayList<ProcessingException>();
057    
058        /**
059         * List of errors that have occurred.
060         */
061        protected List<Throwable> errors = new ArrayList<Throwable>();
062        
063        /**
064         * Creates a StageDriver for the specified stage.
065         * 
066         * @param stage The stage for which the driver will be created
067         * @param context The context in which to run the stage
068         */
069        public AbstractStageDriver(Stage stage, StageContext context) {
070            this(stage, context, FaultTolerance.NONE);
071        }
072        
073        /**
074         * Creates a StageDriver for the specified stage.
075         * 
076         * @param stage The stage for which the driver will be created
077         * @param context The context in which to run the stage
078         */
079        public AbstractStageDriver(Stage stage, StageContext context, FaultTolerance faultTolerance) {
080            if (stage == null) throw new IllegalArgumentException("Stage may not be null.");
081            if (context == null) throw new IllegalArgumentException("Context may not be null.");
082            this.stage = stage;
083            this.context = context;
084            this.faultTolerance = faultTolerance;
085        }
086        
087        /**
088         * Returns the Stage being run by this StageDriver.
089         * 
090         * @return The stage being run by this StageDriver instance
091         */
092        public Stage getStage() {
093            return this.stage;
094        }
095        
096        /**
097         * This method is used to provide a communication channel between the context 
098         * in which the driver is being run and the managed stage.
099         * @return the Feeder used to feed objects to the managed stage for processing.
100         */
101        public abstract Feeder getFeeder();    
102    
103        /**
104         * Return the current state of stage processing.
105         * @return the current state of processing
106         */
107        public State getState() {
108            return this.currentState;
109        }
110        
111        /**
112         * Atomically tests to determine whether or not the driver is in the one of
113         * the specified states.
114         */
115        protected synchronized boolean isInState(State... states) {
116            for (State state : states) if (state == currentState) return true;
117            return false;
118        }
119        
120        /**
121         * Set the current state of stage processing and notify any listeners
122         * that may be waiting on a state change.
123         */
124        protected synchronized void setState(State nextState) {
125            this.currentState = nextState;
126            this.notifyAll();
127        }
128        
129        /**
130         * This method performs an atomic conditional state transition change
131         * to the value specified by the nextState parameter if and only if the
132         * current state is equal to the test state.
133         */
134        protected synchronized boolean testAndSetState(State testState, State nextState) {
135            if (currentState == testState) {
136                setState(nextState);
137                return true;
138            } else {
139                return false;
140            }
141        }
142        
143        /**
144         * This method is used to start the driver, run the 
145         * {@link Stage#preprocess() preprocess()} method of the attached stage
146         * and to then begin processing any objects fed to this driver's Feeder.
147         *
148         * @throws org.apache.commons.pipeline.StageException Thrown if there is an error during stage startup. In most cases, such errors
149         * will be handled internally by the driver.
150         */
151        public abstract void start() throws StageException;
152        
153        /**
154         * This method waits for the stage(s) queue(s) to empty and any processor thread(s) to exit
155         * cleanly and then calls release() to release any resources acquired during processing, if possible.
156         * @throws org.apache.commons.pipeline.StageException Thrown if there is an error during driver shutdown. Ordinarily such 
157         * exceptions will be handled internally.
158         */
159        public abstract void finish() throws StageException;
160    
161        /**
162         * Sets the failure tolerance flag for the worker thread. If faultTolerance
163         * is set to CHECKED, {@link StageException StageException}s thrown by
164         * the {@link Stage#process(Object)} method will not interrupt queue
165         * processing, but will simply be logged with a severity of ERROR.
166         * If faultTolerance is set to ALL, runtime exceptions will also be
167         * logged and otherwise ignored.
168         * @param faultTolerance the flag value
169         */
170        public final void setFaultTolerance(FaultTolerance faultTolerance) {
171            this.faultTolerance = faultTolerance;
172        }
173        
174        /**
175         * Getter for property faultTolerant.
176         * @return Value of property faultTolerant.
177         */
178        public final FaultTolerance getFaultTolerance() {
179            return this.faultTolerance;
180        }    
181        
182        /**
183         * Store a fatal error.
184         * @param error The error to be stored for later analysis
185         */
186        protected void recordFatalError(Throwable error) {
187            this.errors.add(error);
188        }
189    
190        /**
191         * Returns a list of unrecoverable errors that occurred during stage
192         * processing.
193         * @return A list of unrecoverable errors that occurred during stage processing.
194         */
195        public List<Throwable> getFatalErrors() {
196            return this.errors;
197        }
198        
199        /**
200         * Store processing failure information for the specified data object.
201         * @param data The data being processed at the time of the error
202         * @param error The error encountered
203         */
204        protected void recordProcessingException(Object data, Throwable error) {
205            ProcessingException ex = new ProcessingException(this.stage, error, data, this.getState());  
206            this.processingExceptions.add(ex);
207        }    
208        
209        /**
210         * Returns a list of errors that occurred while processing data objects,
211         * along with the objects that were being processed when the errors
212         * were generated.
213         * @return The list of non-fatal processing errors.
214         */
215        public List<ProcessingException> getProcessingExceptions() {
216            return this.processingExceptions;
217        }
218    }