001 /*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018 package org.apache.commons.pipeline.driver;
019
020 import java.util.LinkedList;
021 import java.util.Queue;
022 import org.apache.commons.logging.Log;
023 import org.apache.commons.logging.LogFactory;
024 import org.apache.commons.pipeline.Feeder;
025 import org.apache.commons.pipeline.Stage;
026 import org.apache.commons.pipeline.StageException;
027 import org.apache.commons.pipeline.StageContext;
028 import static org.apache.commons.pipeline.StageDriver.State.*;
029 import org.apache.commons.pipeline.StageDriver.State;
030 import static org.apache.commons.pipeline.driver.FaultTolerance.*;
031
032 /**
033 * This is a non-threaded version of the AbstractStageDriver.
034 */
035 public class SynchronousStageDriver extends AbstractStageDriver {
036 private final Log log = LogFactory.getLog(SynchronousStageDriver.class);
037
038 //queue of objects to be processed that are fed to the driver
039 //when it is not in a running state
040 private Queue<Object> queue = new LinkedList<Object>();
041
042 //Feeder used to feed objects to this stage
043 private final Feeder feeder = new Feeder() {
044 public void feed(Object obj) {
045 synchronized (SynchronousStageDriver.this) {
046 if (currentState == ERROR) throw new IllegalStateException("Unable to process data: driver in fatal error state.");
047 if (currentState != RUNNING) { //enqueue objects if stage has not been started
048 queue.add(obj);
049 return;
050 }
051 }
052
053 try {
054 stage.process(obj);
055 } catch (StageException e) {
056 recordProcessingException(obj, e);
057 if (faultTolerance == NONE) throw fatalError(e);
058 }
059 }
060 };
061
062 /**
063 * Creates a new instance of SimpleStageDriver
064 * @param stage The stage to be run
065 * @param context The context in which the stage will be run
066 */
067 public SynchronousStageDriver(Stage stage, StageContext context, FaultTolerance faultTolerance) {
068 super(stage, context, faultTolerance);
069 }
070
071 /**
072 * Get the feeder for the encapsulated stage. Since the SynchronousStageDriver
073 * is designed to run the stage in the main thread of execution, calls
074 * to {@link Feeder#feed(Object)} on the returned feeder will trigger processing
075 * of the object fed to the stage.
076 * @return The Feeder instance for the stage.
077 */
078 public Feeder getFeeder() {
079 return this.feeder;
080 }
081
082 /**
083 * Performs preprocessing and updates the driver state.
084 * @throws org.apache.commons.pipeline.StageException Thrown if the driver is in an illegal state to be started or an error occurs
085 * during preprocessing.
086 */
087 public synchronized void start() throws StageException {
088 if (this.currentState == STOPPED) {
089 try {
090 stage.preprocess();
091 setState(RUNNING);
092 } catch (StageException e) {
093 throw fatalError(e);
094 }
095
096 // feed any queued values before returning control
097 while (!queue.isEmpty()) this.getFeeder().feed(queue.remove());
098 } else {
099 throw new IllegalStateException("Illegal attempt to start driver from state: " + this.currentState);
100 }
101 }
102
103 /**
104 * Performs postprocessing and releases stage resources, and updates the driver
105 * state accordingly.
106 * @throws org.apache.commons.pipeline.StageException Thrown if an error occurs during postprocessing
107 */
108 public synchronized void finish() throws StageException {
109 if (this.currentState == RUNNING) {
110 try {
111 testAndSetState(RUNNING, STOP_REQUESTED);
112 if (this.currentState == STOP_REQUESTED) stage.postprocess();
113 } catch (StageException e) {
114 throw fatalError(e);
115 } finally {
116 stage.release();
117 testAndSetState(STOP_REQUESTED, STOPPED);
118 }
119 } else {
120 throw new IllegalStateException("Driver is not running (current state: " + this.currentState + ")");
121 }
122 }
123
124 /**
125 * This method obtains a lock to set the current state of processing
126 * to error, records the error and returns a RuntimeException encapsulating
127 * the specified throwable.
128 */
129 private RuntimeException fatalError(Throwable t) {
130 try {
131 setState(ERROR);
132 this.recordFatalError(t);
133 stage.release();
134 this.notifyAll();
135 } catch (Exception e) {
136 this.recordFatalError(e);
137 }
138
139 return new RuntimeException("Fatal error halted processing of stage: " + stage);
140 }
141 }