1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 package org.apache.commons.pipeline.validation;
19
20 import java.util.ArrayList;
21 import java.util.Collections;
22 import java.util.Iterator;
23 import java.util.List;
24 import org.apache.commons.pipeline.Pipeline;
25 import org.apache.commons.pipeline.Stage;
26 import org.apache.commons.pipeline.StageDriverFactory;
27
28 /**
29 * This is a simple default implementation of the PipelineValidator interface
30 * that checks stage and branch connectivity. It assumes that any un-annotated
31 * stage simply passes data through and can accept any type of object (as though
32 * it were annotated with @ConsumedTypes({Object.class}) and @ProducesConsumed.
33 *
34 */
35 public class SimplePipelineValidator implements PipelineValidator {
36
37 /** Creates a new instance of PipelineValidator */
38 public SimplePipelineValidator() {
39 }
40
41 /**
42 * This method validates the entire structure of the pipeline, ensuring that
43 * the data produced by each stage can be consumed by the subsequent
44 * stage and/or relevant branch pipelines.
45 * @param pipeline The pipeline to be validated
46 * @return The list of validation errors encountered.
47 */
48 public List<ValidationFailure> validate(Pipeline pipeline) {
49 List<ValidationFailure> errors = new ArrayList<ValidationFailure>();
50 Stage previous = null;
51 for (Iterator<Stage> iter = pipeline.getStages().iterator(); iter.hasNext();) {
52 Stage stage = iter.next();
53
54 //only check that the stage can succeed known production
55 if (previous != null) {
56 if (!ValidationUtils.canSucceed(previous, stage)) {
57 errors.add(new ValidationFailure(ValidationFailure.Type.STAGE_CONNECT,
58 "Stage cannot consume output of prior stage.", previous, stage));
59 }
60 }
61
62 if (stage.getClass().isAnnotationPresent(ProductionOnBranch.class)) {
63 ProductionOnBranch pob = stage.getClass().getAnnotation(ProductionOnBranch.class);
64 errors.addAll(validateBranchConnect(pipeline, pob.branchKey(), stage));
65 } else if (stage.getClass().isAnnotationPresent(Branches.class)) {
66 Branches branches = stage.getClass().getAnnotation(Branches.class);
67
68 for (ProductionOnBranch pob : branches.productionOnBranches()) {
69 errors.addAll(validateBranchConnect(pipeline, pob.branchKey(), stage));
70 }
71
72 //only check that each branch can consume from known production.
73 if (previous != null) {
74 for (String branchKey : branches.producesConsumedOnBranches()) {
75 errors.addAll(validateBranchConnect(pipeline, branchKey, previous));
76 }
77 }
78 }
79
80 //only update the previous stage reference if the stage has non-null
81 //and non-pass-through production
82 if (stage.getClass().isAnnotationPresent(ProducedTypes.class)) {
83 //stop if the stage produces nothing, and raise an error if not at the end of the pipeline
84 if (stage.getClass().getAnnotation(ProducedTypes.class).value().length == 0) {
85 if (iter.hasNext()) errors.add(new ValidationFailure(ValidationFailure.Type.STAGE_CONNECT,
86 "Stage with no production is not at terminus of pipeline.", stage, iter.next()));
87 break;
88 }
89
90 previous = stage;
91 }
92 }
93
94 // recursively perform validation on the branch pipelines
95 for (Pipeline branch : pipeline.getBranches().values()) {
96 errors.addAll(validate(branch));
97 }
98
99 return errors;
100 }
101
102 /**
103 * Utility method for validating connection between stages and branches.
104 */
105 private List<ValidationFailure> validateBranchConnect(Pipeline pipeline, String branchKey, Stage upstreamStage) {
106 List<ValidationFailure> errors = new ArrayList<ValidationFailure>();
107 Pipeline branch = pipeline.getBranches().get(branchKey);
108
109 if (branch == null) {
110 errors.add(new ValidationFailure(ValidationFailure.Type.BRANCH_NOT_FOUND,
111 "Branch not found for production key " + branchKey, upstreamStage, null));
112 } else if (branch.getStages().isEmpty()) {
113 errors.add(new ValidationFailure(ValidationFailure.Type.BRANCH_CONNECT,
114 "Branch pipeline for key " + branchKey + " has no stages.", upstreamStage, null));
115 } else if (!ValidationUtils.canSucceed(upstreamStage, branch.getStages().get(0))) {
116 errors.add(new ValidationFailure(ValidationFailure.Type.BRANCH_CONNECT,
117 "Branch " + branchKey + " cannot consume data produced by stage.", upstreamStage, branch.getStages().get(0)));
118 }
119
120 return errors;
121 }
122
123 /**
124 * Validate whether or not a stage can be added to the pipeline.
125 * @param pipeline The pipeline to which the stage is being added
126 * @param stage The stage to be added
127 * @param driverFactory the StageDriverFactory used to create a driver for the stage
128 * @return The list of validation errors encountered, or an empty list if the add
129 * passed validation.
130 */
131 public List<ValidationFailure> validateAddStage(Pipeline pipeline, Stage stage, StageDriverFactory driverFactory) {
132 if (pipeline.getStages().isEmpty()) return Collections.emptyList(); //trivial case
133
134 //establish list of errors to be returned, initially empty
135 List<ValidationFailure> errors = new ArrayList<ValidationFailure>();
136
137 //search backwards along pipeline for known production
138 Stage previous = null;
139 for (int i = pipeline.getStages().size() - 1; i >= 0; i--) {
140 Stage test = pipeline.getStages().get(i);
141 if (test.getClass().isAnnotationPresent(ProducedTypes.class)) {
142 if (test.getClass().getAnnotation(ProducedTypes.class).value().length == 0) {
143 errors.add(new ValidationFailure(ValidationFailure.Type.STAGE_CONNECT,
144 "Attempt to add stage to pipeline with no production at terminus.", test, stage));
145 } else {
146 previous = test;
147 }
148
149 break;
150 }
151 }
152
153 if (previous != null) {
154 if (!ValidationUtils.canSucceed(previous, stage)) {
155 errors.add(new ValidationFailure(ValidationFailure.Type.STAGE_CONNECT,
156 "Stage cannot consume output of prior stage.", previous, stage));
157
158 //TODO: Add checks to determine whether the branch production of the
159 //stage can be consumed by branch pipelines.
160 }
161 }
162
163 return errors;
164 }
165
166 /**
167 * Validate whether or not the specified branch pipeline can be added
168 * with the specified key.
169 * @param pipeline The pipeline to which the branch is being added
170 * @param branchKey The identifier for the newly added branch
171 * @param branch The branch pipeline being added
172 * @return The list of validation errors, or an empty list if no errors were found.
173 */
174 public List<ValidationFailure> validateAddBranch(Pipeline pipeline, String branchKey, Pipeline branch) {
175 return Collections.emptyList(); //all default validation rules exist in pipeline
176 }
177 }