1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.commons.pipeline;
19
20 import java.util.ArrayList;
21 import java.util.Collection;
22 import java.util.Collections;
23 import java.util.EventObject;
24 import java.util.HashMap;
25 import java.util.LinkedList;
26 import java.util.List;
27 import java.util.Map;
28 import org.apache.commons.pipeline.driver.SynchronousStageDriver;
29 import org.apache.commons.pipeline.validation.PipelineValidator;
30 import org.apache.commons.pipeline.validation.ValidationException;
31 import org.apache.commons.pipeline.validation.ValidationFailure;
32
33
34
35
36
37
38
39
40
41
42
43 public class Pipeline implements Runnable, StageContext {
44
45
46
47
48 public static final String MAIN_BRANCH = "main";
49
50
51
52
53
54
55 private final LinkedList<StageDriver> drivers;
56 private final Map<Stage, StageDriver> driverMap;
57
58
59 private final LinkedList<Stage> stages;
60
61
62 private final Map<String,Pipeline> branches;
63
64
65 private Pipeline parent;
66
67
68 private final List<StageEventListener> listeners;
69
70
71 private PipelineValidator validator;
72
73
74 private Feeder terminalFeeder = Feeder.VOID;
75
76
77 private Map<String,Object> env = Collections.synchronizedMap(new HashMap<String,Object>());
78
79
80 private Collection<PipelineLifecycleJob> lifecycleJobs = new ArrayList<PipelineLifecycleJob>();
81
82
83
84
85 public Pipeline() {
86 this.drivers = new LinkedList<StageDriver>();
87 this.driverMap = new HashMap<Stage, StageDriver>();
88 this.stages = new LinkedList<Stage>();
89 this.branches = new HashMap<String,Pipeline>();
90 this.listeners = Collections.synchronizedList(new ArrayList<StageEventListener>());
91 }
92
93
94
95
96 public void registerListener(StageEventListener listener) {
97 listeners.add(listener);
98 }
99
100
101
102
103 public Collection<StageEventListener> getRegisteredListeners() {
104 return this.listeners;
105 }
106
107
108
109
110
111
112
113 public void raise(final EventObject ev) {
114 new Thread() {
115 public void run() {
116
117 Pipeline root = Pipeline.this;
118 while (root.parent != null) root = root.parent;
119
120
121 root.notifyListeners(ev);
122 }
123 }.start();
124 }
125
126
127
128
129
130
131 private void notifyListeners(EventObject ev) {
132 for (StageEventListener listener : listeners) listener.notify(ev);
133 for (Pipeline branch : branches.values()) branch.notifyListeners(ev);
134 }
135
136
137
138
139 public Feeder getDownstreamFeeder(Stage stage) {
140 if (stage == null) throw new IllegalArgumentException("Unable to look up downstream feeder for null stage.");
141 if (stage == drivers.getLast().getStage()) {
142 return this.terminalFeeder;
143 } else {
144
145
146
147 for (int i = drivers.size() - 2; i >= 0; i--) {
148 if (stage == drivers.get(i).getStage()) return drivers.get(i+1).getFeeder();
149 }
150
151 throw new IllegalStateException("Unable to find stage " + stage + " in pipeline.");
152 }
153 }
154
155
156
157
158 public Feeder getBranchFeeder(String branch) {
159 if (!getBranches().containsKey(branch)) {
160 throw new IllegalStateException("Unable to find branch in pipeline: '" + branch + "'");
161 }
162
163 return branches.get(branch).getSourceFeeder();
164 }
165
166
167
168
169 public Object getEnv(String key) {
170 return this.env.get(key);
171 }
172
173
174
175
176 public void setEnv(String key, Object value) {
177 this.env.put(key, value);
178 }
179
180
181
182
183
184
185
186
187
188
189
190
191
192 public final void addStage(Stage stage, StageDriverFactory driverFactory) throws ValidationException {
193 if (stage == null) throw new IllegalArgumentException("Argument \"stage\" for call to Pipeline.addStage() may not be null.");
194
195 if (validator != null) {
196 List<ValidationFailure> errors = validator.validateAddStage(this, stage, driverFactory);
197 if (errors != null && !errors.isEmpty()) {
198 throw new ValidationException("An error occurred adding stage " + stage.toString(), errors);
199 }
200 }
201
202 stage.init(this);
203 this.stages.add(stage);
204
205 StageDriver driver = driverFactory.createStageDriver(stage, this);
206 this.driverMap.put(stage, driver);
207 this.drivers.add(driver);
208 }
209
210
211
212
213
214
215 public final List<Stage> getStages() {
216 return Collections.unmodifiableList(this.stages);
217 }
218
219
220
221
222
223
224 public final StageDriver getStageDriver(Stage stage) {
225 return this.driverMap.get(stage);
226 }
227
228
229
230
231
232
233 public final List<StageDriver> getStageDrivers() {
234 return Collections.unmodifiableList(this.drivers);
235 }
236
237
238
239
240
241
242
243
244 public void addBranch(String key, Pipeline branch) throws ValidationException {
245 if (key == null)
246 throw new IllegalArgumentException("Branch key may not be null.");
247 if (MAIN_BRANCH.equalsIgnoreCase(key))
248 throw new IllegalArgumentException("Branch key name \"" + MAIN_BRANCH + "\" is reserved.");
249 if (branch == null)
250 throw new IllegalArgumentException("Illegal attempt to set reference to null branch.");
251 if (branch == this || branch.hasBranch(this))
252 throw new IllegalArgumentException("Illegal attempt to set reference to self as a branch (infinite recursion potential)");
253
254 if (validator != null) {
255 List<ValidationFailure> errors = validator.validateAddBranch(this, key, branch);
256 if (errors != null && !errors.isEmpty()) {
257 throw new ValidationException("An error occurred adding branch pipeline " + branch, errors);
258 }
259 }
260
261 branch.parent = this;
262 this.branches.put(key, branch);
263 }
264
265
266
267
268
269 public Map<String,Pipeline> getBranches() {
270 return Collections.unmodifiableMap(branches);
271 }
272
273
274
275
276
277
278
279
280 private boolean hasBranch(Pipeline pipeline) {
281 if (branches.containsValue(pipeline)) return true;
282 for (Pipeline branch : branches.values()) {
283 if (branch.hasBranch(pipeline)) return true;
284 }
285
286 return false;
287 }
288
289
290
291
292
293 public Feeder getSourceFeeder() {
294 if (drivers.isEmpty()) return this.terminalFeeder;
295 return drivers.peek().getFeeder();
296 }
297
298
299
300
301
302 public Feeder getTerminalFeeder() {
303 return this.terminalFeeder;
304 }
305
306
307
308
309
310 public void setTerminalFeeder(Feeder terminalFeeder) {
311 this.terminalFeeder = terminalFeeder;
312 }
313
314
315
316
317 public void addLifecycleJob(PipelineLifecycleJob job) {
318 this.lifecycleJobs.add(job);
319 }
320
321
322
323
324
325
326
327
328
329
330 public void start() throws StageException {
331 for (PipelineLifecycleJob job : lifecycleJobs) job.onStart(this);
332 for (StageDriver driver: this.drivers) driver.start();
333 for (Pipeline branch : branches.values()) branch.start();
334 }
335
336
337
338
339
340
341
342
343
344
345
346 public void finish() throws StageException {
347 for (StageDriver driver: this.drivers) driver.finish();
348 for (Pipeline pipeline : branches.values()) pipeline.finish();
349 for (PipelineLifecycleJob job : lifecycleJobs) job.onFinish(this);
350 }
351
352
353
354
355 public void run() {
356 try {
357 start();
358 finish();
359 } catch (StageException e) {
360 throw new RuntimeException(e);
361 }
362 }
363
364
365
366
367
368
369 public PipelineValidator getValidator() {
370 return this.validator;
371 }
372
373
374
375
376
377
378 public void setValidator(PipelineValidator validator) {
379 this.validator = validator;
380 }
381
382
383
384
385
386 public Pipeline getParent() {
387 return parent;
388 }
389 }