1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 package org.apache.commons.pipeline.driver;
19
20 import java.util.LinkedList;
21 import java.util.Queue;
22 import org.apache.commons.logging.Log;
23 import org.apache.commons.logging.LogFactory;
24 import org.apache.commons.pipeline.Feeder;
25 import org.apache.commons.pipeline.Stage;
26 import org.apache.commons.pipeline.StageException;
27 import org.apache.commons.pipeline.StageContext;
28 import static org.apache.commons.pipeline.StageDriver.State.*;
29 import org.apache.commons.pipeline.StageDriver.State;
30 import static org.apache.commons.pipeline.driver.FaultTolerance.*;
31
32 /**
33 * This is a non-threaded version of the AbstractStageDriver.
34 */
35 public class SynchronousStageDriver extends AbstractStageDriver {
36 private final Log log = LogFactory.getLog(SynchronousStageDriver.class);
37
38 //queue of objects to be processed that are fed to the driver
39 //when it is not in a running state
40 private Queue<Object> queue = new LinkedList<Object>();
41
42 //Feeder used to feed objects to this stage
43 private final Feeder feeder = new Feeder() {
44 public void feed(Object obj) {
45 synchronized (SynchronousStageDriver.this) {
46 if (currentState == ERROR) throw new IllegalStateException("Unable to process data: driver in fatal error state.");
47 if (currentState != RUNNING) { //enqueue objects if stage has not been started
48 queue.add(obj);
49 return;
50 }
51 }
52
53 try {
54 stage.process(obj);
55 } catch (StageException e) {
56 recordProcessingException(obj, e);
57 if (faultTolerance == NONE) throw fatalError(e);
58 }
59 }
60 };
61
62 /**
63 * Creates a new instance of SimpleStageDriver
64 * @param stage The stage to be run
65 * @param context The context in which the stage will be run
66 */
67 public SynchronousStageDriver(Stage stage, StageContext context, FaultTolerance faultTolerance) {
68 super(stage, context, faultTolerance);
69 }
70
71 /**
72 * Get the feeder for the encapsulated stage. Since the SynchronousStageDriver
73 * is designed to run the stage in the main thread of execution, calls
74 * to {@link Feeder#feed(Object)} on the returned feeder will trigger processing
75 * of the object fed to the stage.
76 * @return The Feeder instance for the stage.
77 */
78 public Feeder getFeeder() {
79 return this.feeder;
80 }
81
82 /**
83 * Performs preprocessing and updates the driver state.
84 * @throws org.apache.commons.pipeline.StageException Thrown if the driver is in an illegal state to be started or an error occurs
85 * during preprocessing.
86 */
87 public synchronized void start() throws StageException {
88 if (this.currentState == STOPPED) {
89 try {
90 stage.preprocess();
91 setState(RUNNING);
92 } catch (StageException e) {
93 throw fatalError(e);
94 }
95
96 // feed any queued values before returning control
97 while (!queue.isEmpty()) this.getFeeder().feed(queue.remove());
98 } else {
99 throw new IllegalStateException("Illegal attempt to start driver from state: " + this.currentState);
100 }
101 }
102
103 /**
104 * Performs postprocessing and releases stage resources, and updates the driver
105 * state accordingly.
106 * @throws org.apache.commons.pipeline.StageException Thrown if an error occurs during postprocessing
107 */
108 public synchronized void finish() throws StageException {
109 if (this.currentState == RUNNING) {
110 try {
111 testAndSetState(RUNNING, STOP_REQUESTED);
112 if (this.currentState == STOP_REQUESTED) stage.postprocess();
113 } catch (StageException e) {
114 throw fatalError(e);
115 } finally {
116 stage.release();
117 testAndSetState(STOP_REQUESTED, STOPPED);
118 }
119 } else {
120 throw new IllegalStateException("Driver is not running (current state: " + this.currentState + ")");
121 }
122 }
123
124 /**
125 * This method obtains a lock to set the current state of processing
126 * to error, records the error and returns a RuntimeException encapsulating
127 * the specified throwable.
128 */
129 private RuntimeException fatalError(Throwable t) {
130 try {
131 setState(ERROR);
132 this.recordFatalError(t);
133 stage.release();
134 this.notifyAll();
135 } catch (Exception e) {
136 this.recordFatalError(e);
137 }
138
139 return new RuntimeException("Fatal error halted processing of stage: " + stage);
140 }
141 }