View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.commons.pipeline.driver;
19  
20  import java.util.ArrayList;
21  import java.util.List;
22  import org.apache.commons.pipeline.*;
23  
24  /**
25   * This interface is used to define how processing for a stage is started,
26   * stopped, and run. AbstractStageDriver implementations may run stages in one or
27   * more threads, and use the {@link StageMonitor} interface to provide communication
28   * between the stage, the driver, and the enclosing pipeline.
29   */
30  public abstract class AbstractStageDriver implements StageDriver {
31      
32      /**
33       * The stage to run.
34       */
35      protected Stage stage;
36      
37      /**
38       * The context for the stage being run
39       */
40      protected StageContext context;    
41      
42      /**
43       * The current state of processing. In most drivers, this is used for
44       * thread control.
45       */
46      protected volatile State currentState = State.STOPPED;    
47              
48      /**
49       * Enumerated value indicating the fault tolerance level of the StageDriver.
50       */
51      protected FaultTolerance faultTolerance = FaultTolerance.NONE;
52      
53      /**
54       * List of processing failures that have occurred.
55       */
56      protected List<ProcessingException> processingExceptions = new ArrayList<ProcessingException>();
57  
58      /**
59       * List of errors that have occurred.
60       */
61      protected List<Throwable> errors = new ArrayList<Throwable>();
62      
63      /**
64       * Creates a StageDriver for the specified stage.
65       * 
66       * @param stage The stage for which the driver will be created
67       * @param context The context in which to run the stage
68       */
69      public AbstractStageDriver(Stage stage, StageContext context) {
70          this(stage, context, FaultTolerance.NONE);
71      }
72      
73      /**
74       * Creates a StageDriver for the specified stage.
75       * 
76       * @param stage The stage for which the driver will be created
77       * @param context The context in which to run the stage
78       */
79      public AbstractStageDriver(Stage stage, StageContext context, FaultTolerance faultTolerance) {
80          if (stage == null) throw new IllegalArgumentException("Stage may not be null.");
81          if (context == null) throw new IllegalArgumentException("Context may not be null.");
82          this.stage = stage;
83          this.context = context;
84          this.faultTolerance = faultTolerance;
85      }
86      
87      /**
88       * Returns the Stage being run by this StageDriver.
89       * 
90       * @return The stage being run by this StageDriver instance
91       */
92      public Stage getStage() {
93          return this.stage;
94      }
95      
96      /**
97       * This method is used to provide a communication channel between the context 
98       * in which the driver is being run and the managed stage.
99       * @return the Feeder used to feed objects to the managed stage for processing.
100      */
101     public abstract Feeder getFeeder();    
102 
103     /**
104      * Return the current state of stage processing.
105      * @return the current state of processing
106      */
107     public State getState() {
108         return this.currentState;
109     }
110     
111     /**
112      * Atomically tests to determine whether or not the driver is in the one of
113      * the specified states.
114      */
115     protected synchronized boolean isInState(State... states) {
116         for (State state : states) if (state == currentState) return true;
117         return false;
118     }
119     
120     /**
121      * Set the current state of stage processing and notify any listeners
122      * that may be waiting on a state change.
123      */
124     protected synchronized void setState(State nextState) {
125         this.currentState = nextState;
126         this.notifyAll();
127     }
128     
129     /**
130      * This method performs an atomic conditional state transition change
131      * to the value specified by the nextState parameter if and only if the
132      * current state is equal to the test state.
133      */
134     protected synchronized boolean testAndSetState(State testState, State nextState) {
135         if (currentState == testState) {
136             setState(nextState);
137             return true;
138         } else {
139             return false;
140         }
141     }
142     
143     /**
144      * This method is used to start the driver, run the 
145      * {@link Stage#preprocess() preprocess()} method of the attached stage
146      * and to then begin processing any objects fed to this driver's Feeder.
147      *
148      * @throws org.apache.commons.pipeline.StageException Thrown if there is an error during stage startup. In most cases, such errors
149      * will be handled internally by the driver.
150      */
151     public abstract void start() throws StageException;
152     
153     /**
154      * This method waits for the stage(s) queue(s) to empty and any processor thread(s) to exit
155      * cleanly and then calls release() to release any resources acquired during processing, if possible.
156      * @throws org.apache.commons.pipeline.StageException Thrown if there is an error during driver shutdown. Ordinarily such 
157      * exceptions will be handled internally.
158      */
159     public abstract void finish() throws StageException;
160 
161     /**
162      * Sets the failure tolerance flag for the worker thread. If faultTolerance
163      * is set to CHECKED, {@link StageException StageException}s thrown by
164      * the {@link Stage#process(Object)} method will not interrupt queue
165      * processing, but will simply be logged with a severity of ERROR.
166      * If faultTolerance is set to ALL, runtime exceptions will also be
167      * logged and otherwise ignored.
168      * @param faultTolerance the flag value
169      */
170     public final void setFaultTolerance(FaultTolerance faultTolerance) {
171         this.faultTolerance = faultTolerance;
172     }
173     
174     /**
175      * Getter for property faultTolerant.
176      * @return Value of property faultTolerant.
177      */
178     public final FaultTolerance getFaultTolerance() {
179         return this.faultTolerance;
180     }    
181     
182     /**
183      * Store a fatal error.
184      * @param error The error to be stored for later analysis
185      */
186     protected void recordFatalError(Throwable error) {
187         this.errors.add(error);
188     }
189 
190     /**
191      * Returns a list of unrecoverable errors that occurred during stage
192      * processing.
193      * @return A list of unrecoverable errors that occurred during stage processing.
194      */
195     public List<Throwable> getFatalErrors() {
196         return this.errors;
197     }
198     
199     /**
200      * Store processing failure information for the specified data object.
201      * @param data The data being processed at the time of the error
202      * @param error The error encountered
203      */
204     protected void recordProcessingException(Object data, Throwable error) {
205         ProcessingException ex = new ProcessingException(this.stage, error, data, this.getState());  
206         this.processingExceptions.add(ex);
207     }    
208     
209     /**
210      * Returns a list of errors that occurred while processing data objects,
211      * along with the objects that were being processed when the errors
212      * were generated.
213      * @return The list of non-fatal processing errors.
214      */
215     public List<ProcessingException> getProcessingExceptions() {
216         return this.processingExceptions;
217     }
218 }