1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 package org.apache.commons.pipeline;
19
20 import java.util.ArrayList;
21 import java.util.Collection;
22 import java.util.Collections;
23 import java.util.EventObject;
24 import java.util.HashMap;
25 import java.util.LinkedList;
26 import java.util.List;
27 import java.util.Map;
28 import org.apache.commons.pipeline.driver.SynchronousStageDriver;
29 import org.apache.commons.pipeline.validation.PipelineValidator;
30 import org.apache.commons.pipeline.validation.ValidationException;
31 import org.apache.commons.pipeline.validation.ValidationFailure;
32
33 /**
34 * This class represents a processing system consisting of a number of stages
35 * and branches. Each stage contains a queue and manages one or more threads
36 * that process data in that stage's queue and allow processed data to be
37 * passed along to subsequent stages and onto branches of the pipeline.<P>
38 *
39 * This class allows all stages in the pipeline to be managed collectively
40 * with methods to start and stop processing for all stages, as well as
41 * a simple framework for asynchronous event-based communication between stages.
42 */
43 public class Pipeline implements Runnable, StageContext {
44 /**
45 * The branch key for the main line of production. This value is reserved
46 * and may not be used as a key for other branch pipelines.
47 */
48 public static final String MAIN_BRANCH = "main";
49
50 //The logger used for reporting by this pipeline
51 //private final Log log = LogFactory.getLog(Pipeline.class);
52
53 // List of stages in the pipeline, encapsulated in the drivers
54 // that will be used to onStart them.
55 private final LinkedList<StageDriver> drivers;
56 private final Map<Stage, StageDriver> driverMap;
57
58 // The list of stages in the pipeline.
59 private final LinkedList<Stage> stages;
60
61 // Map of pipeline branches where the keys are branch names.
62 private final Map<String,Pipeline> branches;
63
64 // Used to store a reference to the parent pipeline, if this is a branch
65 private Pipeline parent;
66
67 // The list of listeners registered with the pipeline.
68 private final List<StageEventListener> listeners;
69
70 // Holds value of property validator.
71 private PipelineValidator validator;
72
73 // Feeder used to handle output of final stage
74 private Feeder terminalFeeder = Feeder.VOID;
75
76 // Global environment variables
77 private Map<String,Object> env = Collections.synchronizedMap(new HashMap<String,Object>());
78
79 // List of jobs to be run at defined points in pipeline lifecycle
80 private Collection<PipelineLifecycleJob> lifecycleJobs = new ArrayList<PipelineLifecycleJob>();
81
82 /**
83 * Creates and initializes a new Pipeline.
84 */
85 public Pipeline() {
86 this.drivers = new LinkedList<StageDriver>();
87 this.driverMap = new HashMap<Stage, StageDriver>();
88 this.stages = new LinkedList<Stage>();
89 this.branches = new HashMap<String,Pipeline>();
90 this.listeners = Collections.synchronizedList(new ArrayList<StageEventListener>());
91 }
92
93 /**
94 * {@inheritDoc}
95 */
96 public void registerListener(StageEventListener listener) {
97 listeners.add(listener);
98 }
99
100 /**
101 * {@inheritDoc}
102 */
103 public Collection<StageEventListener> getRegisteredListeners() {
104 return this.listeners;
105 }
106
107 /**
108 * Asynchronously notifies each registered listener of an event and propagates
109 * the event to any attached branches and the parent pipeline.
110 *
111 * @param ev The event to be sent to registered listeners
112 */
113 public void raise(final EventObject ev) {
114 new Thread() {
115 public void run() {
116 //first, recursively find the root pipeline
117 Pipeline root = Pipeline.this;
118 while (root.parent != null) root = root.parent;
119
120 //notify the listeners from the root pipeline
121 root.notifyListeners(ev);
122 }
123 }.start();
124 }
125
126 /**
127 * Notify all listeners and recursively notify child branches of the
128 * specified event. This method does not propagate events to the
129 * parent pipeline.
130 */
131 private void notifyListeners(EventObject ev) {
132 for (StageEventListener listener : listeners) listener.notify(ev);
133 for (Pipeline branch : branches.values()) branch.notifyListeners(ev);
134 }
135
136 /**
137 * {@inheritDoc}
138 */
139 public Feeder getDownstreamFeeder(Stage stage) {
140 if (stage == null) throw new IllegalArgumentException("Unable to look up downstream feeder for null stage.");
141 if (stage == drivers.getLast().getStage()) {
142 return this.terminalFeeder;
143 } else {
144 //Iterate backwards over the list until the stage is found, then return
145 //the feeder for the subsequent stage. Comparisons are done using reference
146 //equality.
147 for (int i = drivers.size() - 2; i >= 0; i--) {
148 if (stage == drivers.get(i).getStage()) return drivers.get(i+1).getFeeder();
149 }
150
151 throw new IllegalStateException("Unable to find stage " + stage + " in pipeline.");
152 }
153 }
154
155 /**
156 * {@inheritDoc}
157 */
158 public Feeder getBranchFeeder(String branch) {
159 if (!getBranches().containsKey(branch)) {
160 throw new IllegalStateException("Unable to find branch in pipeline: '" + branch + "'");
161 }
162
163 return branches.get(branch).getSourceFeeder();
164 }
165
166 /**
167 * {@inheritDoc}
168 */
169 public Object getEnv(String key) {
170 return this.env.get(key);
171 }
172
173 /**
174 * Sets the value corresponding to the specified environment variable key.
175 */
176 public void setEnv(String key, Object value) {
177 this.env.put(key, value);
178 }
179
180 /**
181 * Adds a {@link Stage} object to the end of this Pipeline. If a
182 * {@link PipelineValidator} has been set using {@link #setValidator}, it will
183 * be used to validate that the appended Stage can consume the output of the
184 * previous stage of the pipeline. It does NOT validate the ability or availability
185 * of branches to consume data produced by the appended stage.
186 *
187 * @param stage the stage to be added to the pipeline
188 * @param driverFactory the factory that will be used to create a {@link StageDriver} that will run the stage
189 * @throws ValidationException if there is a non-null validator set for this pipeline and an error is
190 * encountered validating the addition of the stage to the pipeline.
191 */
192 public final void addStage(Stage stage, StageDriverFactory driverFactory) throws ValidationException {
193 if (stage == null) throw new IllegalArgumentException("Argument \"stage\" for call to Pipeline.addStage() may not be null.");
194
195 if (validator != null) {
196 List<ValidationFailure> errors = validator.validateAddStage(this, stage, driverFactory);
197 if (errors != null && !errors.isEmpty()) {
198 throw new ValidationException("An error occurred adding stage " + stage.toString(), errors);
199 }
200 }
201
202 stage.init(this);
203 this.stages.add(stage);
204
205 StageDriver driver = driverFactory.createStageDriver(stage, this);
206 this.driverMap.put(stage, driver);
207 this.drivers.add(driver);
208 }
209
210 /**
211 * Returns an unmodifiable list of stages that have been added to this
212 * pipeline.
213 * @return A list of the stages that have been added to the pipeline
214 */
215 public final List<Stage> getStages() {
216 return Collections.unmodifiableList(this.stages);
217 }
218
219 /**
220 * Return the StageDriver for the specified Stage.
221 *
222 * @return the StageDriver for the specified Stage.
223 */
224 public final StageDriver getStageDriver(Stage stage) {
225 return this.driverMap.get(stage);
226 }
227
228 /**
229 * Returns an unmodifiable list of stage drivers that have been added
230 * to the pipeline.
231 * @return the list of drivers for stages in the pipeline
232 */
233 public final List<StageDriver> getStageDrivers() {
234 return Collections.unmodifiableList(this.drivers);
235 }
236
237 /**
238 * Adds a branch to the pipeline.
239 * @param key the string identifier that will be used to refer to the added branch
240 * @param pipeline the branch pipeline
241 * @throws org.apache.commons.pipeline.validation.ValidationException if the pipeline has a non-null {@link PipelineValidator} and the branch
242 * cannot consume the data produced for the branch by stages in the pipeline.
243 */
244 public void addBranch(String key, Pipeline branch) throws ValidationException {
245 if (key == null)
246 throw new IllegalArgumentException("Branch key may not be null.");
247 if (MAIN_BRANCH.equalsIgnoreCase(key))
248 throw new IllegalArgumentException("Branch key name \"" + MAIN_BRANCH + "\" is reserved.");
249 if (branch == null)
250 throw new IllegalArgumentException("Illegal attempt to set reference to null branch.");
251 if (branch == this || branch.hasBranch(this))
252 throw new IllegalArgumentException("Illegal attempt to set reference to self as a branch (infinite recursion potential)");
253
254 if (validator != null) {
255 List<ValidationFailure> errors = validator.validateAddBranch(this, key, branch);
256 if (errors != null && !errors.isEmpty()) {
257 throw new ValidationException("An error occurred adding branch pipeline " + branch, errors);
258 }
259 }
260
261 branch.parent = this;
262 this.branches.put(key, branch);
263 }
264
265 /**
266 * Returns an unmodifiable map of branch pipelines, keyed by branch identifier.
267 * @return the map of registered branch pipelines, keyed by branch identifier
268 */
269 public Map<String,Pipeline> getBranches() {
270 return Collections.unmodifiableMap(branches);
271 }
272
273 /**
274 * Simple method that recursively checks whether the specified
275 * pipeline is a branch of this pipeline.
276 * @param pipeline the candidate branch
277 * @return true if the specified pipeline is a branch of this pipeline, or recursively
278 * a branch of a branch. Tests are performed using reference equality.
279 */
280 private boolean hasBranch(Pipeline pipeline) {
281 if (branches.containsValue(pipeline)) return true;
282 for (Pipeline branch : branches.values()) {
283 if (branch.hasBranch(pipeline)) return true;
284 }
285
286 return false;
287 }
288
289 /**
290 * Returns a feeder for the first stage if the pipeline is not empty
291 * @return the feeder to feed the first stage of the pipeline
292 */
293 public Feeder getSourceFeeder() {
294 if (drivers.isEmpty()) return this.terminalFeeder;
295 return drivers.peek().getFeeder();
296 }
297
298 /**
299 * Gets the feeder that receives output from the final stage.
300 * @return the terminal feeder being used to handle any output from the final stage. The default is {@link Feeder#VOID}
301 */
302 public Feeder getTerminalFeeder() {
303 return this.terminalFeeder;
304 }
305
306 /**
307 * Sets the terminal feeder used to handle any output from the final stage.
308 * @param terminalFeeder the {@link Feeder} that will receive any output from the final stage
309 */
310 public void setTerminalFeeder(Feeder terminalFeeder) {
311 this.terminalFeeder = terminalFeeder;
312 }
313
314 /**
315 * Adds a job to be onStart on startup to the pipeline.
316 */
317 public void addLifecycleJob(PipelineLifecycleJob job) {
318 this.lifecycleJobs.add(job);
319 }
320
321 /**
322 * This method iterates over the stages in the pipeline, looking up a
323 * {@link StageDriver} for each stage and using that driver to start the stage.
324 * Startups may occur sequentially or in parallel, depending upon the stage driver
325 * used. If a the stage has not been configured with a {@link StageDriver},
326 * we will use the default, non-threaded {@link SynchronousStageDriver}.
327 *
328 * @throws org.apache.commons.pipeline.StageException Thrown if there is an error during pipeline startup
329 */
330 public void start() throws StageException {
331 for (PipelineLifecycleJob job : lifecycleJobs) job.onStart(this);
332 for (StageDriver driver: this.drivers) driver.start();
333 for (Pipeline branch : branches.values()) branch.start();
334 }
335
336 /**
337 * This method iterates over the stages in the pipeline, looking up a {@link StageDriver}
338 * for each stage and using that driver to request that the stage finish
339 * execution. The {@link StageDriver#finish(Stage)}
340 * method will block until the stage's queue is exhausted, so this method
341 * may be used to safely finalize all stages without the risk of
342 * losing data in the queues.
343 *
344 * @throws org.apache.commons.pipeline.StageException Thrown if there is an unhandled error during stage shutdown
345 */
346 public void finish() throws StageException {
347 for (StageDriver driver: this.drivers) driver.finish();
348 for (Pipeline pipeline : branches.values()) pipeline.finish();
349 for (PipelineLifecycleJob job : lifecycleJobs) job.onFinish(this);
350 }
351
352 /**
353 * Runs the pipeline from start to finish.
354 */
355 public void run() {
356 try {
357 start();
358 finish();
359 } catch (StageException e) {
360 throw new RuntimeException(e);
361 }
362 }
363
364 /**
365 * Returns the validator being used to validate the pipeline structure,
366 * or null if no validation is being performed..
367 * @return Validator used to validate pipeline structure.
368 */
369 public PipelineValidator getValidator() {
370 return this.validator;
371 }
372
373 /**
374 * Sets the validator used to validate the pipeline as it is contstructed.
375 * Setting the validator to null disables validation
376 * @param validator Validator used to validate pipeline structure.
377 */
378 public void setValidator(PipelineValidator validator) {
379 this.validator = validator;
380 }
381
382 /**
383 * Returns the parent of this pipeline, if it is a branch
384 * @return parent Pipeline, or null if this is the main pipeline
385 */
386 public Pipeline getParent() {
387 return parent;
388 }
389 }