View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.commons.pipeline.validation;
19  
20  import java.util.ArrayList;
21  import java.util.Collections;
22  import java.util.Iterator;
23  import java.util.List;
24  import org.apache.commons.pipeline.Pipeline;
25  import org.apache.commons.pipeline.Stage;
26  import org.apache.commons.pipeline.StageDriverFactory;
27  
28  /**
29   * This is a simple default implementation of the PipelineValidator interface
30   * that checks stage and branch connectivity. It assumes that any un-annotated
31   * stage simply passes data through and can accept any type of object (as though
32   * it were annotated with @ConsumedTypes({Object.class}) and @ProducesConsumed.
33   *
34   */
35  public class SimplePipelineValidator implements PipelineValidator {
36      
37      /** Creates a new instance of PipelineValidator */
38      public SimplePipelineValidator() {
39      }
40      
41      /**
42       * This method validates the entire structure of the pipeline, ensuring that
43       * the data produced by each stage can be consumed by the subsequent
44       * stage and/or relevant branch pipelines.
45       * @param pipeline The pipeline to be validated
46       * @return The list of validation errors encountered.
47       */
48      public List<ValidationFailure> validate(Pipeline pipeline) {
49          List<ValidationFailure> errors = new ArrayList<ValidationFailure>();
50          Stage previous = null;
51          for (Iterator<Stage> iter = pipeline.getStages().iterator(); iter.hasNext();) {
52              Stage stage = iter.next();
53              
54              //only check that the stage can succeed known production
55              if (previous != null) {
56                  if (!ValidationUtils.canSucceed(previous, stage)) {
57                      errors.add(new ValidationFailure(ValidationFailure.Type.STAGE_CONNECT,
58                              "Stage cannot consume output of prior stage.", previous, stage));
59                  }
60              }
61              
62              if (stage.getClass().isAnnotationPresent(ProductionOnBranch.class)) {
63                  ProductionOnBranch pob = stage.getClass().getAnnotation(ProductionOnBranch.class);
64                  errors.addAll(validateBranchConnect(pipeline, pob.branchKey(), stage));
65              } else if (stage.getClass().isAnnotationPresent(Branches.class)) {
66                  Branches branches = stage.getClass().getAnnotation(Branches.class);
67                  
68                  for (ProductionOnBranch pob : branches.productionOnBranches()) {
69                      errors.addAll(validateBranchConnect(pipeline, pob.branchKey(), stage));
70                  }
71                  
72                  //only check that each branch can consume from known production.
73                  if (previous != null) {
74                      for (String branchKey : branches.producesConsumedOnBranches()) {
75                          errors.addAll(validateBranchConnect(pipeline, branchKey, previous));
76                      }
77                  }
78              }
79              
80              //only update the previous stage reference if the stage has non-null
81              //and non-pass-through production
82              if (stage.getClass().isAnnotationPresent(ProducedTypes.class)) {
83                  //stop if the stage produces nothing, and raise an error if not at the end of the pipeline
84                  if (stage.getClass().getAnnotation(ProducedTypes.class).value().length == 0) {
85                      if (iter.hasNext()) errors.add(new ValidationFailure(ValidationFailure.Type.STAGE_CONNECT,
86                              "Stage with no production is not at terminus of pipeline.", stage, iter.next()));
87                      break;
88                  }
89                  
90                  previous = stage;
91              }
92          }
93          
94          // recursively perform validation on the branch pipelines
95          for (Pipeline branch : pipeline.getBranches().values()) {
96              errors.addAll(validate(branch));
97          }
98          
99          return errors;
100     }
101     
102     /**
103      * Utility method for validating connection between stages and branches.
104      */
105     private List<ValidationFailure> validateBranchConnect(Pipeline pipeline, String branchKey, Stage upstreamStage) {
106         List<ValidationFailure> errors = new ArrayList<ValidationFailure>();
107         Pipeline branch = pipeline.getBranches().get(branchKey);
108         
109         if (branch == null) {
110             errors.add(new ValidationFailure(ValidationFailure.Type.BRANCH_NOT_FOUND,
111                     "Branch not found for production key " + branchKey, upstreamStage, null));
112         } else if (branch.getStages().isEmpty()) {
113             errors.add(new ValidationFailure(ValidationFailure.Type.BRANCH_CONNECT,
114                     "Branch pipeline for key " + branchKey + " has no stages.", upstreamStage, null));
115         } else if (!ValidationUtils.canSucceed(upstreamStage, branch.getStages().get(0))) {
116             errors.add(new ValidationFailure(ValidationFailure.Type.BRANCH_CONNECT,
117                     "Branch " + branchKey + " cannot consume data produced by stage.", upstreamStage, branch.getStages().get(0)));
118         }
119         
120         return errors;
121     }
122     
123     /**
124      * Validate whether or not a stage can be added to the pipeline.
125      * @param pipeline The pipeline to which the stage is being added
126      * @param stage The stage to be added
127      * @param driverFactory the StageDriverFactory used to create a driver for the stage
128      * @return The list of validation errors encountered, or an empty list if the add
129      * passed validation.
130      */
131     public List<ValidationFailure> validateAddStage(Pipeline pipeline, Stage stage, StageDriverFactory driverFactory) {
132         if (pipeline.getStages().isEmpty()) return Collections.emptyList(); //trivial case
133         
134         //establish list of errors to be returned, initially empty
135         List<ValidationFailure> errors = new ArrayList<ValidationFailure>();
136         
137         //search backwards along pipeline for known production
138         Stage previous = null;
139         for (int i = pipeline.getStages().size() - 1; i >= 0; i--) {
140             Stage test = pipeline.getStages().get(i);
141             if (test.getClass().isAnnotationPresent(ProducedTypes.class)) {
142                 if (test.getClass().getAnnotation(ProducedTypes.class).value().length == 0) {
143                     errors.add(new ValidationFailure(ValidationFailure.Type.STAGE_CONNECT,
144                             "Attempt to add stage to pipeline with no production at terminus.", test, stage));
145                 } else {
146                     previous = test;
147                 }
148                 
149                 break;
150             }
151         }
152         
153         if (previous != null) {
154             if (!ValidationUtils.canSucceed(previous, stage)) {
155                 errors.add(new ValidationFailure(ValidationFailure.Type.STAGE_CONNECT,
156                         "Stage cannot consume output of prior stage.", previous, stage));
157                 
158                 //TODO: Add checks to determine whether the branch production of the
159                 //stage can be consumed by branch pipelines.
160             }
161         }
162         
163         return errors;
164     }
165     
166     /**
167      * Validate whether or not the specified branch pipeline can be added
168      * with the specified key.
169      * @param pipeline The pipeline to which the branch is being added
170      * @param branchKey The identifier for the newly added branch
171      * @param branch The branch pipeline being added
172      * @return The list of validation errors, or an empty list if no errors were found.
173      */
174     public List<ValidationFailure> validateAddBranch(Pipeline pipeline, String branchKey, Pipeline branch) {
175         return Collections.emptyList(); //all default validation rules exist in pipeline
176     }
177 }