View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.commons.pipeline.driver;
19  
20  import java.util.LinkedList;
21  import java.util.Queue;
22  import org.apache.commons.logging.Log;
23  import org.apache.commons.logging.LogFactory;
24  import org.apache.commons.pipeline.Feeder;
25  import org.apache.commons.pipeline.Stage;
26  import org.apache.commons.pipeline.StageException;
27  import org.apache.commons.pipeline.StageContext;
28  import static org.apache.commons.pipeline.StageDriver.State.*;
29  import org.apache.commons.pipeline.StageDriver.State;
30  import static org.apache.commons.pipeline.driver.FaultTolerance.*;
31  
32  /**
33   * This is a non-threaded version of the AbstractStageDriver.
34   */
35  public class SynchronousStageDriver extends AbstractStageDriver {
36      private final Log log = LogFactory.getLog(SynchronousStageDriver.class);
37      
38      //queue of objects to be processed that are fed to the driver
39      //when it is not in a running state
40      private Queue<Object> queue = new LinkedList<Object>();
41      
42      //Feeder used to feed objects to this stage
43      private final Feeder feeder = new Feeder() {
44          public void feed(Object obj) {
45              synchronized (SynchronousStageDriver.this) {
46                  if (currentState == ERROR) throw new IllegalStateException("Unable to process data: driver in fatal error state.");
47                  if (currentState != RUNNING) { //enqueue objects if stage has not been started
48                      queue.add(obj);
49                      return;
50                  }
51              }
52              
53              try {
54                  stage.process(obj);
55              } catch (StageException e) {
56                  recordProcessingException(obj, e);
57                  if (faultTolerance == NONE) throw fatalError(e);
58              }
59          }
60      };
61      
62      /**
63       * Creates a new instance of SimpleStageDriver
64       * @param stage The stage to be run
65       * @param context The context in which the stage will be run
66       */
67      public SynchronousStageDriver(Stage stage, StageContext context, FaultTolerance faultTolerance) {
68          super(stage, context, faultTolerance);
69      }
70      
71      /**
72       * Get the feeder for the encapsulated stage. Since the SynchronousStageDriver
73       * is designed to run the stage in the main thread of execution, calls
74       * to {@link Feeder#feed(Object)} on the returned feeder will trigger processing
75       * of the object fed to the stage.
76       * @return The Feeder instance for the stage.
77       */
78      public Feeder getFeeder() {
79          return this.feeder;
80      }
81      
82      /**
83       * Performs preprocessing and updates the driver state.
84       * @throws org.apache.commons.pipeline.StageException Thrown if the driver is in an illegal state to be started or an error occurs
85       * during preprocessing.
86       */
87      public synchronized void start() throws StageException {
88          if (this.currentState == STOPPED) {
89              try {
90                  stage.preprocess();
91                  setState(RUNNING);
92              } catch (StageException e) {
93                  throw fatalError(e);
94              }
95              
96              // feed any queued values before returning control
97              while (!queue.isEmpty()) this.getFeeder().feed(queue.remove());
98          } else {
99              throw new IllegalStateException("Illegal attempt to start driver from state: " + this.currentState);
100         }
101     }
102     
103     /**
104      * Performs postprocessing and releases stage resources, and updates the driver
105      * state accordingly.
106      * @throws org.apache.commons.pipeline.StageException Thrown if an error occurs during postprocessing
107      */
108     public synchronized void finish() throws StageException {
109         if (this.currentState == RUNNING) {            
110             try {
111                 testAndSetState(RUNNING, STOP_REQUESTED);
112                 if (this.currentState == STOP_REQUESTED) stage.postprocess();
113             } catch (StageException e) {
114                 throw fatalError(e);
115             } finally {
116                 stage.release();
117                 testAndSetState(STOP_REQUESTED, STOPPED);
118             }            
119         } else {
120             throw new IllegalStateException("Driver is not running (current state: " + this.currentState + ")");
121         }
122     }
123     
124     /**
125      * This method obtains a lock to set the current state of processing
126      * to error, records the error and returns a RuntimeException encapsulating
127      * the specified throwable.
128      */
129     private RuntimeException fatalError(Throwable t) {
130         try {
131             setState(ERROR);
132             this.recordFatalError(t);
133             stage.release();
134             this.notifyAll();
135         } catch (Exception e) {
136             this.recordFatalError(e);
137         }
138         
139         return new RuntimeException("Fatal error halted processing of stage: " + stage);
140     }
141 }