001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *     http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    
018    package org.apache.commons.pipeline;
019    
020    import java.util.ArrayList;
021    import java.util.Collection;
022    import java.util.Collections;
023    import java.util.EventObject;
024    import java.util.HashMap;
025    import java.util.LinkedList;
026    import java.util.List;
027    import java.util.Map;
028    import org.apache.commons.pipeline.driver.SynchronousStageDriver;
029    import org.apache.commons.pipeline.validation.PipelineValidator;
030    import org.apache.commons.pipeline.validation.ValidationException;
031    import org.apache.commons.pipeline.validation.ValidationFailure;
032    
033    /**
034     * This class represents a processing system consisting of a number of stages
035     * and branches. Each stage contains a queue and manages one or more threads
036     * that process data in that stage's queue and allow processed data to be
037     * passed along to subsequent stages and onto branches of the pipeline.<P>
038     *
039     * This class allows all stages in the pipeline to be managed collectively
040     * with methods to start and stop processing for all stages, as well as
041     * a simple framework for asynchronous event-based communication between stages.
042     */
043    public class Pipeline implements Runnable, StageContext {
044        /**
045         * The branch key for the main line of production. This value is reserved
046         * and may not be used as a key for other branch pipelines.
047         */
048        public static final String MAIN_BRANCH = "main";
049        
050        //The logger used for reporting by this pipeline
051        //private final Log log = LogFactory.getLog(Pipeline.class);
052        
053        // List of stages in the pipeline, encapsulated in the drivers
054        // that will be used to onStart them.
055        private final LinkedList<StageDriver> drivers;
056        private final Map<Stage, StageDriver> driverMap;
057        
058        // The list of stages in the pipeline.
059        private final LinkedList<Stage> stages;
060        
061        // Map of pipeline branches where the keys are branch names.
062        private final Map<String,Pipeline> branches;
063        
064        // Used to store a reference to the parent pipeline, if this is a branch
065        private Pipeline parent;
066        
067        // The list of listeners registered with the pipeline.
068        private final List<StageEventListener> listeners;
069        
070        // Holds value of property validator.
071        private PipelineValidator validator;
072        
073        // Feeder used to handle output of final stage
074        private Feeder terminalFeeder = Feeder.VOID;
075        
076        // Global environment variables
077        private Map<String,Object> env = Collections.synchronizedMap(new HashMap<String,Object>());
078        
079        // List of jobs to be run at defined points in pipeline lifecycle
080        private Collection<PipelineLifecycleJob> lifecycleJobs = new ArrayList<PipelineLifecycleJob>();
081        
082        /**
083         * Creates and initializes a new Pipeline.
084         */
085        public Pipeline() {
086            this.drivers = new LinkedList<StageDriver>();
087            this.driverMap = new HashMap<Stage, StageDriver>();
088            this.stages = new LinkedList<Stage>();
089            this.branches = new HashMap<String,Pipeline>();
090            this.listeners = Collections.synchronizedList(new ArrayList<StageEventListener>());
091        }
092        
093        /**
094         * {@inheritDoc}
095         */
096        public void registerListener(StageEventListener listener) {
097            listeners.add(listener);
098        }
099        
100        /**
101         * {@inheritDoc}
102         */
103        public Collection<StageEventListener> getRegisteredListeners() {
104            return this.listeners;
105        }
106        
107        /**
108         * Asynchronously notifies each registered listener of an event and propagates
109         * the event to any attached branches and the parent pipeline.
110         *
111         * @param ev The event to be sent to registered listeners
112         */
113        public void raise(final EventObject ev) {
114            new Thread() {
115                public void run() {
116                    //first, recursively find the root pipeline
117                    Pipeline root = Pipeline.this;
118                    while (root.parent != null) root = root.parent;
119                    
120                    //notify the listeners from the root pipeline
121                    root.notifyListeners(ev);
122                }
123            }.start();
124        }
125        
126        /**
127         * Notify all listeners and recursively notify child branches of the
128         * specified event. This method does not propagate events to the
129         * parent pipeline.
130         */
131        private void notifyListeners(EventObject ev) {
132            for (StageEventListener listener : listeners) listener.notify(ev);
133            for (Pipeline branch : branches.values()) branch.notifyListeners(ev);
134        }
135        
136        /**
137         * {@inheritDoc}
138         */
139        public Feeder getDownstreamFeeder(Stage stage) {
140            if (stage == null) throw new IllegalArgumentException("Unable to look up downstream feeder for null stage.");
141            if (stage == drivers.getLast().getStage()) {
142                return this.terminalFeeder;
143            } else {
144                //Iterate backwards over the list until the stage is found, then return
145                //the feeder for the subsequent stage. Comparisons are done using reference
146                //equality.
147                for (int i = drivers.size() - 2; i >= 0; i--) {
148                    if (stage == drivers.get(i).getStage()) return drivers.get(i+1).getFeeder();
149                }
150                
151                throw new IllegalStateException("Unable to find stage " + stage + " in pipeline.");
152            }
153        }
154        
155        /**
156         * {@inheritDoc}
157         */
158        public Feeder getBranchFeeder(String branch) {
159            if (!getBranches().containsKey(branch)) {
160                throw new IllegalStateException("Unable to find branch in pipeline: '" + branch + "'");
161            }
162            
163            return branches.get(branch).getSourceFeeder();
164        }
165        
166        /**
167         * {@inheritDoc}
168         */
169        public Object getEnv(String key) {
170            return this.env.get(key);
171        }
172        
173        /**
174         * Sets the value corresponding to the specified environment variable key.
175         */
176        public void setEnv(String key, Object value) {
177            this.env.put(key, value);
178        }
179        
180        /**
181         * Adds a {@link Stage} object to the end of this Pipeline. If a
182         * {@link PipelineValidator} has been set using {@link #setValidator}, it will
183         * be used to validate that the appended Stage can consume the output of the
184         * previous stage of the pipeline. It does NOT validate the ability or availability
185         * of branches to consume data produced by the appended stage.
186         *
187         * @param stage the stage to be added to the pipeline
188         * @param driverFactory the factory that will be used to create a {@link StageDriver} that will run the stage
189         * @throws ValidationException if there is a non-null validator set for this pipeline and an error is
190         * encountered validating the addition of the stage to the pipeline.
191         */
192        public final void addStage(Stage stage, StageDriverFactory driverFactory) throws ValidationException {
193            if (stage == null) throw new IllegalArgumentException("Argument \"stage\" for call to Pipeline.addStage() may not be null.");
194            
195            if (validator != null) {
196                List<ValidationFailure> errors = validator.validateAddStage(this, stage, driverFactory);
197                if (errors != null && !errors.isEmpty()) {
198                    throw new ValidationException("An error occurred adding stage " + stage.toString(), errors);
199                }
200            }
201            
202            stage.init(this);
203            this.stages.add(stage);
204            
205            StageDriver driver = driverFactory.createStageDriver(stage, this);
206            this.driverMap.put(stage, driver);
207            this.drivers.add(driver);
208        }
209        
210        /**
211         * Returns an unmodifiable list of stages that have been added to this
212         * pipeline.
213         * @return A list of the stages that have been added to the pipeline
214         */
215        public final List<Stage> getStages() {
216            return Collections.unmodifiableList(this.stages);
217        }
218        
219        /**
220         * Return the StageDriver for the specified Stage.
221         *
222         * @return the StageDriver for the specified Stage.
223         */
224        public final StageDriver getStageDriver(Stage stage) {
225            return this.driverMap.get(stage);
226        }
227        
228        /**
229         * Returns an unmodifiable list of stage drivers that have been added
230         * to the pipeline.
231         * @return the list of drivers for stages in the pipeline
232         */
233        public final List<StageDriver> getStageDrivers() {
234            return Collections.unmodifiableList(this.drivers);
235        }
236        
237        /**
238         * Adds a branch to the pipeline.
239         * @param key the string identifier that will be used to refer to the added branch
240         * @param pipeline the branch pipeline
241         * @throws org.apache.commons.pipeline.validation.ValidationException if the pipeline has a non-null {@link PipelineValidator} and the branch
242         * cannot consume the data produced for the branch by stages in the pipeline.
243         */
244        public void addBranch(String key, Pipeline branch) throws ValidationException {
245            if (key == null)
246                throw new IllegalArgumentException("Branch key may not be null.");
247            if (MAIN_BRANCH.equalsIgnoreCase(key))
248                throw new IllegalArgumentException("Branch key name \"" + MAIN_BRANCH + "\" is reserved.");
249            if (branch == null)
250                throw new IllegalArgumentException("Illegal attempt to set reference to null branch.");
251            if (branch == this || branch.hasBranch(this))
252                throw new IllegalArgumentException("Illegal attempt to set reference to self as a branch (infinite recursion potential)");
253            
254            if (validator != null) {
255                List<ValidationFailure> errors = validator.validateAddBranch(this, key, branch);
256                if (errors != null && !errors.isEmpty()) {
257                    throw new ValidationException("An error occurred adding branch pipeline " + branch, errors);
258                }
259            }
260            
261            branch.parent = this;
262            this.branches.put(key, branch);
263        }
264        
265        /**
266         * Returns an unmodifiable map of branch pipelines, keyed by branch identifier.
267         * @return the map of registered branch pipelines, keyed by branch identifier
268         */
269        public Map<String,Pipeline> getBranches() {
270            return Collections.unmodifiableMap(branches);
271        }
272        
273        /**
274         * Simple method that recursively checks whether the specified
275         * pipeline is a branch of this pipeline.
276         * @param pipeline the candidate branch
277         * @return true if the specified pipeline is a branch of this pipeline, or recursively
278         * a branch of a branch. Tests are performed using reference equality.
279         */
280        private boolean hasBranch(Pipeline pipeline) {
281            if (branches.containsValue(pipeline)) return true;
282            for (Pipeline branch : branches.values()) {
283                if (branch.hasBranch(pipeline)) return true;
284            }
285            
286            return false;
287        }
288        
289        /**
290         * Returns a feeder for the first stage if the pipeline is not empty
291         * @return the feeder to feed the first stage of the pipeline
292         */
293        public Feeder getSourceFeeder() {
294            if (drivers.isEmpty()) return this.terminalFeeder;
295            return drivers.peek().getFeeder();
296        }
297        
298        /**
299         * Gets the feeder that receives output from the final stage.
300         * @return the terminal feeder being used to handle any output from the final stage. The default is {@link Feeder#VOID}
301         */
302        public Feeder getTerminalFeeder() {
303            return this.terminalFeeder;
304        }
305        
306        /**
307         * Sets the terminal feeder used to handle any output from the final stage.
308         * @param terminalFeeder the {@link Feeder} that will receive any output from the final stage
309         */
310        public void setTerminalFeeder(Feeder terminalFeeder) {
311            this.terminalFeeder = terminalFeeder;
312        }
313        
314        /**
315         * Adds a job to be onStart on startup to the pipeline.
316         */
317        public void addLifecycleJob(PipelineLifecycleJob job) {
318            this.lifecycleJobs.add(job);
319        }
320        
321        /**
322         * This method iterates over the stages in the pipeline, looking up a
323         * {@link StageDriver} for each stage and using that driver to start the stage.
324         * Startups may occur sequentially or in parallel, depending upon the stage driver
325         * used.  If a the stage has not been configured with a {@link StageDriver},
326         * we will use the default, non-threaded {@link SynchronousStageDriver}.
327         *
328         * @throws org.apache.commons.pipeline.StageException Thrown if there is an error during pipeline startup
329         */
330        public void start() throws StageException {
331            for (PipelineLifecycleJob job : lifecycleJobs) job.onStart(this);
332            for (StageDriver driver: this.drivers) driver.start();
333            for (Pipeline branch : branches.values()) branch.start();
334        }
335        
336        /**
337         * This method iterates over the stages in the pipeline, looking up a {@link StageDriver}
338         * for each stage and using that driver to request that the stage finish
339         * execution. The {@link StageDriver#finish(Stage)}
340         * method will block until the stage's queue is exhausted, so this method
341         * may be used to safely finalize all stages without the risk of
342         * losing data in the queues.
343         *
344         * @throws org.apache.commons.pipeline.StageException Thrown if there is an unhandled error during stage shutdown
345         */
346        public void finish() throws StageException {
347            for (StageDriver driver: this.drivers) driver.finish();
348            for (Pipeline pipeline : branches.values()) pipeline.finish();
349            for (PipelineLifecycleJob job : lifecycleJobs) job.onFinish(this);
350        }
351        
352        /**
353         * Runs the pipeline from start to finish.
354         */
355        public void run() {
356            try {
357                start();
358                finish();
359            } catch (StageException e) {
360                throw new RuntimeException(e);
361            }
362        }
363        
364        /**
365         * Returns the validator being used to validate the pipeline structure,
366         * or null if no validation is being performed..
367         * @return Validator used to validate pipeline structure.
368         */
369        public PipelineValidator getValidator() {
370            return this.validator;
371        }
372        
373        /**
374         * Sets the validator used to validate the pipeline as it is contstructed.
375         * Setting the validator to null disables validation
376         * @param validator Validator used to validate pipeline structure.
377         */
378        public void setValidator(PipelineValidator validator) {
379            this.validator = validator;
380        }
381        
382        /**
383         * Returns the parent of this pipeline, if it is a branch
384         * @return parent Pipeline, or null if this is the main pipeline
385         */
386        public Pipeline getParent() {
387            return parent;
388        }
389    }