1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.commons.pipeline.driver;
19
20 import java.util.LinkedList;
21 import java.util.Queue;
22 import org.apache.commons.logging.Log;
23 import org.apache.commons.logging.LogFactory;
24 import org.apache.commons.pipeline.Feeder;
25 import org.apache.commons.pipeline.Stage;
26 import org.apache.commons.pipeline.StageException;
27 import org.apache.commons.pipeline.StageContext;
28 import static org.apache.commons.pipeline.StageDriver.State.*;
29 import org.apache.commons.pipeline.StageDriver.State;
30 import static org.apache.commons.pipeline.driver.FaultTolerance.*;
31
32
33
34
35 public class SynchronousStageDriver extends AbstractStageDriver {
36 private final Log log = LogFactory.getLog(SynchronousStageDriver.class);
37
38
39
40 private Queue<Object> queue = new LinkedList<Object>();
41
42
43 private final Feeder feeder = new Feeder() {
44 public void feed(Object obj) {
45 synchronized (SynchronousStageDriver.this) {
46 if (currentState == ERROR) throw new IllegalStateException("Unable to process data: driver in fatal error state.");
47 if (currentState != RUNNING) {
48 queue.add(obj);
49 return;
50 }
51 }
52
53 try {
54 stage.process(obj);
55 } catch (StageException e) {
56 recordProcessingException(obj, e);
57 if (faultTolerance == NONE) throw fatalError(e);
58 }
59 }
60 };
61
62
63
64
65
66
67 public SynchronousStageDriver(Stage stage, StageContext context, FaultTolerance faultTolerance) {
68 super(stage, context, faultTolerance);
69 }
70
71
72
73
74
75
76
77
78 public Feeder getFeeder() {
79 return this.feeder;
80 }
81
82
83
84
85
86
87 public synchronized void start() throws StageException {
88 if (this.currentState == STOPPED) {
89 try {
90 stage.preprocess();
91 setState(RUNNING);
92 } catch (StageException e) {
93 throw fatalError(e);
94 }
95
96
97 while (!queue.isEmpty()) this.getFeeder().feed(queue.remove());
98 } else {
99 throw new IllegalStateException("Illegal attempt to start driver from state: " + this.currentState);
100 }
101 }
102
103
104
105
106
107
108 public synchronized void finish() throws StageException {
109 if (this.currentState == RUNNING) {
110 try {
111 testAndSetState(RUNNING, STOP_REQUESTED);
112 if (this.currentState == STOP_REQUESTED) stage.postprocess();
113 } catch (StageException e) {
114 throw fatalError(e);
115 } finally {
116 stage.release();
117 testAndSetState(STOP_REQUESTED, STOPPED);
118 }
119 } else {
120 throw new IllegalStateException("Driver is not running (current state: " + this.currentState + ")");
121 }
122 }
123
124
125
126
127
128
129 private RuntimeException fatalError(Throwable t) {
130 try {
131 setState(ERROR);
132 this.recordFatalError(t);
133 stage.release();
134 this.notifyAll();
135 } catch (Exception e) {
136 this.recordFatalError(e);
137 }
138
139 return new RuntimeException("Fatal error halted processing of stage: " + stage);
140 }
141 }