View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.commons.pipeline;
19  
20  import java.util.ArrayList;
21  import java.util.Collection;
22  import java.util.Collections;
23  import java.util.EventObject;
24  import java.util.HashMap;
25  import java.util.LinkedList;
26  import java.util.List;
27  import java.util.Map;
28  import org.apache.commons.pipeline.driver.SynchronousStageDriver;
29  import org.apache.commons.pipeline.validation.PipelineValidator;
30  import org.apache.commons.pipeline.validation.ValidationException;
31  import org.apache.commons.pipeline.validation.ValidationFailure;
32  
33  /**
34   * This class represents a processing system consisting of a number of stages
35   * and branches. Each stage contains a queue and manages one or more threads
36   * that process data in that stage's queue and allow processed data to be
37   * passed along to subsequent stages and onto branches of the pipeline.<P>
38   *
39   * This class allows all stages in the pipeline to be managed collectively
40   * with methods to start and stop processing for all stages, as well as
41   * a simple framework for asynchronous event-based communication between stages.
42   */
43  public class Pipeline implements Runnable, StageContext {
44      /**
45       * The branch key for the main line of production. This value is reserved
46       * and may not be used as a key for other branch pipelines.
47       */
48      public static final String MAIN_BRANCH = "main";
49      
50      //The logger used for reporting by this pipeline
51      //private final Log log = LogFactory.getLog(Pipeline.class);
52      
53      // List of stages in the pipeline, encapsulated in the drivers
54      // that will be used to onStart them.
55      private final LinkedList<StageDriver> drivers;
56      private final Map<Stage, StageDriver> driverMap;
57      
58      // The list of stages in the pipeline.
59      private final LinkedList<Stage> stages;
60      
61      // Map of pipeline branches where the keys are branch names.
62      private final Map<String,Pipeline> branches;
63      
64      // Used to store a reference to the parent pipeline, if this is a branch
65      private Pipeline parent;
66      
67      // The list of listeners registered with the pipeline.
68      private final List<StageEventListener> listeners;
69      
70      // Holds value of property validator.
71      private PipelineValidator validator;
72      
73      // Feeder used to handle output of final stage
74      private Feeder terminalFeeder = Feeder.VOID;
75      
76      // Global environment variables
77      private Map<String,Object> env = Collections.synchronizedMap(new HashMap<String,Object>());
78      
79      // List of jobs to be run at defined points in pipeline lifecycle
80      private Collection<PipelineLifecycleJob> lifecycleJobs = new ArrayList<PipelineLifecycleJob>();
81      
82      /**
83       * Creates and initializes a new Pipeline.
84       */
85      public Pipeline() {
86          this.drivers = new LinkedList<StageDriver>();
87          this.driverMap = new HashMap<Stage, StageDriver>();
88          this.stages = new LinkedList<Stage>();
89          this.branches = new HashMap<String,Pipeline>();
90          this.listeners = Collections.synchronizedList(new ArrayList<StageEventListener>());
91      }
92      
93      /**
94       * {@inheritDoc}
95       */
96      public void registerListener(StageEventListener listener) {
97          listeners.add(listener);
98      }
99      
100     /**
101      * {@inheritDoc}
102      */
103     public Collection<StageEventListener> getRegisteredListeners() {
104         return this.listeners;
105     }
106     
107     /**
108      * Asynchronously notifies each registered listener of an event and propagates
109      * the event to any attached branches and the parent pipeline.
110      *
111      * @param ev The event to be sent to registered listeners
112      */
113     public void raise(final EventObject ev) {
114         new Thread() {
115             public void run() {
116                 //first, recursively find the root pipeline
117                 Pipeline root = Pipeline.this;
118                 while (root.parent != null) root = root.parent;
119                 
120                 //notify the listeners from the root pipeline
121                 root.notifyListeners(ev);
122             }
123         }.start();
124     }
125     
126     /**
127      * Notify all listeners and recursively notify child branches of the
128      * specified event. This method does not propagate events to the
129      * parent pipeline.
130      */
131     private void notifyListeners(EventObject ev) {
132         for (StageEventListener listener : listeners) listener.notify(ev);
133         for (Pipeline branch : branches.values()) branch.notifyListeners(ev);
134     }
135     
136     /**
137      * {@inheritDoc}
138      */
139     public Feeder getDownstreamFeeder(Stage stage) {
140         if (stage == null) throw new IllegalArgumentException("Unable to look up downstream feeder for null stage.");
141         if (stage == drivers.getLast().getStage()) {
142             return this.terminalFeeder;
143         } else {
144             //Iterate backwards over the list until the stage is found, then return
145             //the feeder for the subsequent stage. Comparisons are done using reference
146             //equality.
147             for (int i = drivers.size() - 2; i >= 0; i--) {
148                 if (stage == drivers.get(i).getStage()) return drivers.get(i+1).getFeeder();
149             }
150             
151             throw new IllegalStateException("Unable to find stage " + stage + " in pipeline.");
152         }
153     }
154     
155     /**
156      * {@inheritDoc}
157      */
158     public Feeder getBranchFeeder(String branch) {
159         if (!getBranches().containsKey(branch)) {
160             throw new IllegalStateException("Unable to find branch in pipeline: '" + branch + "'");
161         }
162         
163         return branches.get(branch).getSourceFeeder();
164     }
165     
166     /**
167      * {@inheritDoc}
168      */
169     public Object getEnv(String key) {
170         return this.env.get(key);
171     }
172     
173     /**
174      * Sets the value corresponding to the specified environment variable key.
175      */
176     public void setEnv(String key, Object value) {
177         this.env.put(key, value);
178     }
179     
180     /**
181      * Adds a {@link Stage} object to the end of this Pipeline. If a
182      * {@link PipelineValidator} has been set using {@link #setValidator}, it will
183      * be used to validate that the appended Stage can consume the output of the
184      * previous stage of the pipeline. It does NOT validate the ability or availability
185      * of branches to consume data produced by the appended stage.
186      *
187      * @param stage the stage to be added to the pipeline
188      * @param driverFactory the factory that will be used to create a {@link StageDriver} that will run the stage
189      * @throws ValidationException if there is a non-null validator set for this pipeline and an error is
190      * encountered validating the addition of the stage to the pipeline.
191      */
192     public final void addStage(Stage stage, StageDriverFactory driverFactory) throws ValidationException {
193         if (stage == null) throw new IllegalArgumentException("Argument \"stage\" for call to Pipeline.addStage() may not be null.");
194         
195         if (validator != null) {
196             List<ValidationFailure> errors = validator.validateAddStage(this, stage, driverFactory);
197             if (errors != null && !errors.isEmpty()) {
198                 throw new ValidationException("An error occurred adding stage " + stage.toString(), errors);
199             }
200         }
201         
202         stage.init(this);
203         this.stages.add(stage);
204         
205         StageDriver driver = driverFactory.createStageDriver(stage, this);
206         this.driverMap.put(stage, driver);
207         this.drivers.add(driver);
208     }
209     
210     /**
211      * Returns an unmodifiable list of stages that have been added to this
212      * pipeline.
213      * @return A list of the stages that have been added to the pipeline
214      */
215     public final List<Stage> getStages() {
216         return Collections.unmodifiableList(this.stages);
217     }
218     
219     /**
220      * Return the StageDriver for the specified Stage.
221      *
222      * @return the StageDriver for the specified Stage.
223      */
224     public final StageDriver getStageDriver(Stage stage) {
225         return this.driverMap.get(stage);
226     }
227     
228     /**
229      * Returns an unmodifiable list of stage drivers that have been added
230      * to the pipeline.
231      * @return the list of drivers for stages in the pipeline
232      */
233     public final List<StageDriver> getStageDrivers() {
234         return Collections.unmodifiableList(this.drivers);
235     }
236     
237     /**
238      * Adds a branch to the pipeline.
239      * @param key the string identifier that will be used to refer to the added branch
240      * @param pipeline the branch pipeline
241      * @throws org.apache.commons.pipeline.validation.ValidationException if the pipeline has a non-null {@link PipelineValidator} and the branch
242      * cannot consume the data produced for the branch by stages in the pipeline.
243      */
244     public void addBranch(String key, Pipeline branch) throws ValidationException {
245         if (key == null)
246             throw new IllegalArgumentException("Branch key may not be null.");
247         if (MAIN_BRANCH.equalsIgnoreCase(key))
248             throw new IllegalArgumentException("Branch key name \"" + MAIN_BRANCH + "\" is reserved.");
249         if (branch == null)
250             throw new IllegalArgumentException("Illegal attempt to set reference to null branch.");
251         if (branch == this || branch.hasBranch(this))
252             throw new IllegalArgumentException("Illegal attempt to set reference to self as a branch (infinite recursion potential)");
253         
254         if (validator != null) {
255             List<ValidationFailure> errors = validator.validateAddBranch(this, key, branch);
256             if (errors != null && !errors.isEmpty()) {
257                 throw new ValidationException("An error occurred adding branch pipeline " + branch, errors);
258             }
259         }
260         
261         branch.parent = this;
262         this.branches.put(key, branch);
263     }
264     
265     /**
266      * Returns an unmodifiable map of branch pipelines, keyed by branch identifier.
267      * @return the map of registered branch pipelines, keyed by branch identifier
268      */
269     public Map<String,Pipeline> getBranches() {
270         return Collections.unmodifiableMap(branches);
271     }
272     
273     /**
274      * Simple method that recursively checks whether the specified
275      * pipeline is a branch of this pipeline.
276      * @param pipeline the candidate branch
277      * @return true if the specified pipeline is a branch of this pipeline, or recursively
278      * a branch of a branch. Tests are performed using reference equality.
279      */
280     private boolean hasBranch(Pipeline pipeline) {
281         if (branches.containsValue(pipeline)) return true;
282         for (Pipeline branch : branches.values()) {
283             if (branch.hasBranch(pipeline)) return true;
284         }
285         
286         return false;
287     }
288     
289     /**
290      * Returns a feeder for the first stage if the pipeline is not empty
291      * @return the feeder to feed the first stage of the pipeline
292      */
293     public Feeder getSourceFeeder() {
294         if (drivers.isEmpty()) return this.terminalFeeder;
295         return drivers.peek().getFeeder();
296     }
297     
298     /**
299      * Gets the feeder that receives output from the final stage.
300      * @return the terminal feeder being used to handle any output from the final stage. The default is {@link Feeder#VOID}
301      */
302     public Feeder getTerminalFeeder() {
303         return this.terminalFeeder;
304     }
305     
306     /**
307      * Sets the terminal feeder used to handle any output from the final stage.
308      * @param terminalFeeder the {@link Feeder} that will receive any output from the final stage
309      */
310     public void setTerminalFeeder(Feeder terminalFeeder) {
311         this.terminalFeeder = terminalFeeder;
312     }
313     
314     /**
315      * Adds a job to be onStart on startup to the pipeline.
316      */
317     public void addLifecycleJob(PipelineLifecycleJob job) {
318         this.lifecycleJobs.add(job);
319     }
320     
321     /**
322      * This method iterates over the stages in the pipeline, looking up a
323      * {@link StageDriver} for each stage and using that driver to start the stage.
324      * Startups may occur sequentially or in parallel, depending upon the stage driver
325      * used.  If a the stage has not been configured with a {@link StageDriver},
326      * we will use the default, non-threaded {@link SynchronousStageDriver}.
327      *
328      * @throws org.apache.commons.pipeline.StageException Thrown if there is an error during pipeline startup
329      */
330     public void start() throws StageException {
331         for (PipelineLifecycleJob job : lifecycleJobs) job.onStart(this);
332         for (StageDriver driver: this.drivers) driver.start();
333         for (Pipeline branch : branches.values()) branch.start();
334     }
335     
336     /**
337      * This method iterates over the stages in the pipeline, looking up a {@link StageDriver}
338      * for each stage and using that driver to request that the stage finish
339      * execution. The {@link StageDriver#finish(Stage)}
340      * method will block until the stage's queue is exhausted, so this method
341      * may be used to safely finalize all stages without the risk of
342      * losing data in the queues.
343      *
344      * @throws org.apache.commons.pipeline.StageException Thrown if there is an unhandled error during stage shutdown
345      */
346     public void finish() throws StageException {
347         for (StageDriver driver: this.drivers) driver.finish();
348         for (Pipeline pipeline : branches.values()) pipeline.finish();
349         for (PipelineLifecycleJob job : lifecycleJobs) job.onFinish(this);
350     }
351     
352     /**
353      * Runs the pipeline from start to finish.
354      */
355     public void run() {
356         try {
357             start();
358             finish();
359         } catch (StageException e) {
360             throw new RuntimeException(e);
361         }
362     }
363     
364     /**
365      * Returns the validator being used to validate the pipeline structure,
366      * or null if no validation is being performed..
367      * @return Validator used to validate pipeline structure.
368      */
369     public PipelineValidator getValidator() {
370         return this.validator;
371     }
372     
373     /**
374      * Sets the validator used to validate the pipeline as it is contstructed.
375      * Setting the validator to null disables validation
376      * @param validator Validator used to validate pipeline structure.
377      */
378     public void setValidator(PipelineValidator validator) {
379         this.validator = validator;
380     }
381     
382     /**
383      * Returns the parent of this pipeline, if it is a branch
384      * @return parent Pipeline, or null if this is the main pipeline
385      */
386     public Pipeline getParent() {
387         return parent;
388     }
389 }