001 /*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018 package org.apache.commons.pipeline.validation;
019
020 import java.util.ArrayList;
021 import java.util.Collections;
022 import java.util.Iterator;
023 import java.util.List;
024 import org.apache.commons.pipeline.Pipeline;
025 import org.apache.commons.pipeline.Stage;
026 import org.apache.commons.pipeline.StageDriverFactory;
027
028 /**
029 * This is a simple default implementation of the PipelineValidator interface
030 * that checks stage and branch connectivity. It assumes that any un-annotated
031 * stage simply passes data through and can accept any type of object (as though
032 * it were annotated with @ConsumedTypes({Object.class}) and @ProducesConsumed.
033 *
034 */
035 public class SimplePipelineValidator implements PipelineValidator {
036
037 /** Creates a new instance of PipelineValidator */
038 public SimplePipelineValidator() {
039 }
040
041 /**
042 * This method validates the entire structure of the pipeline, ensuring that
043 * the data produced by each stage can be consumed by the subsequent
044 * stage and/or relevant branch pipelines.
045 * @param pipeline The pipeline to be validated
046 * @return The list of validation errors encountered.
047 */
048 public List<ValidationFailure> validate(Pipeline pipeline) {
049 List<ValidationFailure> errors = new ArrayList<ValidationFailure>();
050 Stage previous = null;
051 for (Iterator<Stage> iter = pipeline.getStages().iterator(); iter.hasNext();) {
052 Stage stage = iter.next();
053
054 //only check that the stage can succeed known production
055 if (previous != null) {
056 if (!ValidationUtils.canSucceed(previous, stage)) {
057 errors.add(new ValidationFailure(ValidationFailure.Type.STAGE_CONNECT,
058 "Stage cannot consume output of prior stage.", previous, stage));
059 }
060 }
061
062 if (stage.getClass().isAnnotationPresent(ProductionOnBranch.class)) {
063 ProductionOnBranch pob = stage.getClass().getAnnotation(ProductionOnBranch.class);
064 errors.addAll(validateBranchConnect(pipeline, pob.branchKey(), stage));
065 } else if (stage.getClass().isAnnotationPresent(Branches.class)) {
066 Branches branches = stage.getClass().getAnnotation(Branches.class);
067
068 for (ProductionOnBranch pob : branches.productionOnBranches()) {
069 errors.addAll(validateBranchConnect(pipeline, pob.branchKey(), stage));
070 }
071
072 //only check that each branch can consume from known production.
073 if (previous != null) {
074 for (String branchKey : branches.producesConsumedOnBranches()) {
075 errors.addAll(validateBranchConnect(pipeline, branchKey, previous));
076 }
077 }
078 }
079
080 //only update the previous stage reference if the stage has non-null
081 //and non-pass-through production
082 if (stage.getClass().isAnnotationPresent(ProducedTypes.class)) {
083 //stop if the stage produces nothing, and raise an error if not at the end of the pipeline
084 if (stage.getClass().getAnnotation(ProducedTypes.class).value().length == 0) {
085 if (iter.hasNext()) errors.add(new ValidationFailure(ValidationFailure.Type.STAGE_CONNECT,
086 "Stage with no production is not at terminus of pipeline.", stage, iter.next()));
087 break;
088 }
089
090 previous = stage;
091 }
092 }
093
094 // recursively perform validation on the branch pipelines
095 for (Pipeline branch : pipeline.getBranches().values()) {
096 errors.addAll(validate(branch));
097 }
098
099 return errors;
100 }
101
102 /**
103 * Utility method for validating connection between stages and branches.
104 */
105 private List<ValidationFailure> validateBranchConnect(Pipeline pipeline, String branchKey, Stage upstreamStage) {
106 List<ValidationFailure> errors = new ArrayList<ValidationFailure>();
107 Pipeline branch = pipeline.getBranches().get(branchKey);
108
109 if (branch == null) {
110 errors.add(new ValidationFailure(ValidationFailure.Type.BRANCH_NOT_FOUND,
111 "Branch not found for production key " + branchKey, upstreamStage, null));
112 } else if (branch.getStages().isEmpty()) {
113 errors.add(new ValidationFailure(ValidationFailure.Type.BRANCH_CONNECT,
114 "Branch pipeline for key " + branchKey + " has no stages.", upstreamStage, null));
115 } else if (!ValidationUtils.canSucceed(upstreamStage, branch.getStages().get(0))) {
116 errors.add(new ValidationFailure(ValidationFailure.Type.BRANCH_CONNECT,
117 "Branch " + branchKey + " cannot consume data produced by stage.", upstreamStage, branch.getStages().get(0)));
118 }
119
120 return errors;
121 }
122
123 /**
124 * Validate whether or not a stage can be added to the pipeline.
125 * @param pipeline The pipeline to which the stage is being added
126 * @param stage The stage to be added
127 * @param driverFactory the StageDriverFactory used to create a driver for the stage
128 * @return The list of validation errors encountered, or an empty list if the add
129 * passed validation.
130 */
131 public List<ValidationFailure> validateAddStage(Pipeline pipeline, Stage stage, StageDriverFactory driverFactory) {
132 if (pipeline.getStages().isEmpty()) return Collections.emptyList(); //trivial case
133
134 //establish list of errors to be returned, initially empty
135 List<ValidationFailure> errors = new ArrayList<ValidationFailure>();
136
137 //search backwards along pipeline for known production
138 Stage previous = null;
139 for (int i = pipeline.getStages().size() - 1; i >= 0; i--) {
140 Stage test = pipeline.getStages().get(i);
141 if (test.getClass().isAnnotationPresent(ProducedTypes.class)) {
142 if (test.getClass().getAnnotation(ProducedTypes.class).value().length == 0) {
143 errors.add(new ValidationFailure(ValidationFailure.Type.STAGE_CONNECT,
144 "Attempt to add stage to pipeline with no production at terminus.", test, stage));
145 } else {
146 previous = test;
147 }
148
149 break;
150 }
151 }
152
153 if (previous != null) {
154 if (!ValidationUtils.canSucceed(previous, stage)) {
155 errors.add(new ValidationFailure(ValidationFailure.Type.STAGE_CONNECT,
156 "Stage cannot consume output of prior stage.", previous, stage));
157
158 //TODO: Add checks to determine whether the branch production of the
159 //stage can be consumed by branch pipelines.
160 }
161 }
162
163 return errors;
164 }
165
166 /**
167 * Validate whether or not the specified branch pipeline can be added
168 * with the specified key.
169 * @param pipeline The pipeline to which the branch is being added
170 * @param branchKey The identifier for the newly added branch
171 * @param branch The branch pipeline being added
172 * @return The list of validation errors, or an empty list if no errors were found.
173 */
174 public List<ValidationFailure> validateAddBranch(Pipeline pipeline, String branchKey, Pipeline branch) {
175 return Collections.emptyList(); //all default validation rules exist in pipeline
176 }
177 }