001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *     http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    
018    package org.apache.commons.pipeline.validation;
019    
020    import java.util.ArrayList;
021    import java.util.Collections;
022    import java.util.Iterator;
023    import java.util.List;
024    import org.apache.commons.pipeline.Pipeline;
025    import org.apache.commons.pipeline.Stage;
026    import org.apache.commons.pipeline.StageDriverFactory;
027    
028    /**
029     * This is a simple default implementation of the PipelineValidator interface
030     * that checks stage and branch connectivity. It assumes that any un-annotated
031     * stage simply passes data through and can accept any type of object (as though
032     * it were annotated with @ConsumedTypes({Object.class}) and @ProducesConsumed.
033     *
034     */
035    public class SimplePipelineValidator implements PipelineValidator {
036        
037        /** Creates a new instance of PipelineValidator */
038        public SimplePipelineValidator() {
039        }
040        
041        /**
042         * This method validates the entire structure of the pipeline, ensuring that
043         * the data produced by each stage can be consumed by the subsequent
044         * stage and/or relevant branch pipelines.
045         * @param pipeline The pipeline to be validated
046         * @return The list of validation errors encountered.
047         */
048        public List<ValidationFailure> validate(Pipeline pipeline) {
049            List<ValidationFailure> errors = new ArrayList<ValidationFailure>();
050            Stage previous = null;
051            for (Iterator<Stage> iter = pipeline.getStages().iterator(); iter.hasNext();) {
052                Stage stage = iter.next();
053                
054                //only check that the stage can succeed known production
055                if (previous != null) {
056                    if (!ValidationUtils.canSucceed(previous, stage)) {
057                        errors.add(new ValidationFailure(ValidationFailure.Type.STAGE_CONNECT,
058                                "Stage cannot consume output of prior stage.", previous, stage));
059                    }
060                }
061                
062                if (stage.getClass().isAnnotationPresent(ProductionOnBranch.class)) {
063                    ProductionOnBranch pob = stage.getClass().getAnnotation(ProductionOnBranch.class);
064                    errors.addAll(validateBranchConnect(pipeline, pob.branchKey(), stage));
065                } else if (stage.getClass().isAnnotationPresent(Branches.class)) {
066                    Branches branches = stage.getClass().getAnnotation(Branches.class);
067                    
068                    for (ProductionOnBranch pob : branches.productionOnBranches()) {
069                        errors.addAll(validateBranchConnect(pipeline, pob.branchKey(), stage));
070                    }
071                    
072                    //only check that each branch can consume from known production.
073                    if (previous != null) {
074                        for (String branchKey : branches.producesConsumedOnBranches()) {
075                            errors.addAll(validateBranchConnect(pipeline, branchKey, previous));
076                        }
077                    }
078                }
079                
080                //only update the previous stage reference if the stage has non-null
081                //and non-pass-through production
082                if (stage.getClass().isAnnotationPresent(ProducedTypes.class)) {
083                    //stop if the stage produces nothing, and raise an error if not at the end of the pipeline
084                    if (stage.getClass().getAnnotation(ProducedTypes.class).value().length == 0) {
085                        if (iter.hasNext()) errors.add(new ValidationFailure(ValidationFailure.Type.STAGE_CONNECT,
086                                "Stage with no production is not at terminus of pipeline.", stage, iter.next()));
087                        break;
088                    }
089                    
090                    previous = stage;
091                }
092            }
093            
094            // recursively perform validation on the branch pipelines
095            for (Pipeline branch : pipeline.getBranches().values()) {
096                errors.addAll(validate(branch));
097            }
098            
099            return errors;
100        }
101        
102        /**
103         * Utility method for validating connection between stages and branches.
104         */
105        private List<ValidationFailure> validateBranchConnect(Pipeline pipeline, String branchKey, Stage upstreamStage) {
106            List<ValidationFailure> errors = new ArrayList<ValidationFailure>();
107            Pipeline branch = pipeline.getBranches().get(branchKey);
108            
109            if (branch == null) {
110                errors.add(new ValidationFailure(ValidationFailure.Type.BRANCH_NOT_FOUND,
111                        "Branch not found for production key " + branchKey, upstreamStage, null));
112            } else if (branch.getStages().isEmpty()) {
113                errors.add(new ValidationFailure(ValidationFailure.Type.BRANCH_CONNECT,
114                        "Branch pipeline for key " + branchKey + " has no stages.", upstreamStage, null));
115            } else if (!ValidationUtils.canSucceed(upstreamStage, branch.getStages().get(0))) {
116                errors.add(new ValidationFailure(ValidationFailure.Type.BRANCH_CONNECT,
117                        "Branch " + branchKey + " cannot consume data produced by stage.", upstreamStage, branch.getStages().get(0)));
118            }
119            
120            return errors;
121        }
122        
123        /**
124         * Validate whether or not a stage can be added to the pipeline.
125         * @param pipeline The pipeline to which the stage is being added
126         * @param stage The stage to be added
127         * @param driverFactory the StageDriverFactory used to create a driver for the stage
128         * @return The list of validation errors encountered, or an empty list if the add
129         * passed validation.
130         */
131        public List<ValidationFailure> validateAddStage(Pipeline pipeline, Stage stage, StageDriverFactory driverFactory) {
132            if (pipeline.getStages().isEmpty()) return Collections.emptyList(); //trivial case
133            
134            //establish list of errors to be returned, initially empty
135            List<ValidationFailure> errors = new ArrayList<ValidationFailure>();
136            
137            //search backwards along pipeline for known production
138            Stage previous = null;
139            for (int i = pipeline.getStages().size() - 1; i >= 0; i--) {
140                Stage test = pipeline.getStages().get(i);
141                if (test.getClass().isAnnotationPresent(ProducedTypes.class)) {
142                    if (test.getClass().getAnnotation(ProducedTypes.class).value().length == 0) {
143                        errors.add(new ValidationFailure(ValidationFailure.Type.STAGE_CONNECT,
144                                "Attempt to add stage to pipeline with no production at terminus.", test, stage));
145                    } else {
146                        previous = test;
147                    }
148                    
149                    break;
150                }
151            }
152            
153            if (previous != null) {
154                if (!ValidationUtils.canSucceed(previous, stage)) {
155                    errors.add(new ValidationFailure(ValidationFailure.Type.STAGE_CONNECT,
156                            "Stage cannot consume output of prior stage.", previous, stage));
157                    
158                    //TODO: Add checks to determine whether the branch production of the
159                    //stage can be consumed by branch pipelines.
160                }
161            }
162            
163            return errors;
164        }
165        
166        /**
167         * Validate whether or not the specified branch pipeline can be added
168         * with the specified key.
169         * @param pipeline The pipeline to which the branch is being added
170         * @param branchKey The identifier for the newly added branch
171         * @param branch The branch pipeline being added
172         * @return The list of validation errors, or an empty list if no errors were found.
173         */
174        public List<ValidationFailure> validateAddBranch(Pipeline pipeline, String branchKey, Pipeline branch) {
175            return Collections.emptyList(); //all default validation rules exist in pipeline
176        }
177    }