001 /* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018 package org.apache.commons.pipeline.validation; 019 020 import java.util.ArrayList; 021 import java.util.Collections; 022 import java.util.Iterator; 023 import java.util.List; 024 import org.apache.commons.pipeline.Pipeline; 025 import org.apache.commons.pipeline.Stage; 026 import org.apache.commons.pipeline.StageDriverFactory; 027 028 /** 029 * This is a simple default implementation of the PipelineValidator interface 030 * that checks stage and branch connectivity. It assumes that any un-annotated 031 * stage simply passes data through and can accept any type of object (as though 032 * it were annotated with @ConsumedTypes({Object.class}) and @ProducesConsumed. 033 * 034 */ 035 public class SimplePipelineValidator implements PipelineValidator { 036 037 /** Creates a new instance of PipelineValidator */ 038 public SimplePipelineValidator() { 039 } 040 041 /** 042 * This method validates the entire structure of the pipeline, ensuring that 043 * the data produced by each stage can be consumed by the subsequent 044 * stage and/or relevant branch pipelines. 045 * @param pipeline The pipeline to be validated 046 * @return The list of validation errors encountered. 047 */ 048 public List<ValidationFailure> validate(Pipeline pipeline) { 049 List<ValidationFailure> errors = new ArrayList<ValidationFailure>(); 050 Stage previous = null; 051 for (Iterator<Stage> iter = pipeline.getStages().iterator(); iter.hasNext();) { 052 Stage stage = iter.next(); 053 054 //only check that the stage can succeed known production 055 if (previous != null) { 056 if (!ValidationUtils.canSucceed(previous, stage)) { 057 errors.add(new ValidationFailure(ValidationFailure.Type.STAGE_CONNECT, 058 "Stage cannot consume output of prior stage.", previous, stage)); 059 } 060 } 061 062 if (stage.getClass().isAnnotationPresent(ProductionOnBranch.class)) { 063 ProductionOnBranch pob = stage.getClass().getAnnotation(ProductionOnBranch.class); 064 errors.addAll(validateBranchConnect(pipeline, pob.branchKey(), stage)); 065 } else if (stage.getClass().isAnnotationPresent(Branches.class)) { 066 Branches branches = stage.getClass().getAnnotation(Branches.class); 067 068 for (ProductionOnBranch pob : branches.productionOnBranches()) { 069 errors.addAll(validateBranchConnect(pipeline, pob.branchKey(), stage)); 070 } 071 072 //only check that each branch can consume from known production. 073 if (previous != null) { 074 for (String branchKey : branches.producesConsumedOnBranches()) { 075 errors.addAll(validateBranchConnect(pipeline, branchKey, previous)); 076 } 077 } 078 } 079 080 //only update the previous stage reference if the stage has non-null 081 //and non-pass-through production 082 if (stage.getClass().isAnnotationPresent(ProducedTypes.class)) { 083 //stop if the stage produces nothing, and raise an error if not at the end of the pipeline 084 if (stage.getClass().getAnnotation(ProducedTypes.class).value().length == 0) { 085 if (iter.hasNext()) errors.add(new ValidationFailure(ValidationFailure.Type.STAGE_CONNECT, 086 "Stage with no production is not at terminus of pipeline.", stage, iter.next())); 087 break; 088 } 089 090 previous = stage; 091 } 092 } 093 094 // recursively perform validation on the branch pipelines 095 for (Pipeline branch : pipeline.getBranches().values()) { 096 errors.addAll(validate(branch)); 097 } 098 099 return errors; 100 } 101 102 /** 103 * Utility method for validating connection between stages and branches. 104 */ 105 private List<ValidationFailure> validateBranchConnect(Pipeline pipeline, String branchKey, Stage upstreamStage) { 106 List<ValidationFailure> errors = new ArrayList<ValidationFailure>(); 107 Pipeline branch = pipeline.getBranches().get(branchKey); 108 109 if (branch == null) { 110 errors.add(new ValidationFailure(ValidationFailure.Type.BRANCH_NOT_FOUND, 111 "Branch not found for production key " + branchKey, upstreamStage, null)); 112 } else if (branch.getStages().isEmpty()) { 113 errors.add(new ValidationFailure(ValidationFailure.Type.BRANCH_CONNECT, 114 "Branch pipeline for key " + branchKey + " has no stages.", upstreamStage, null)); 115 } else if (!ValidationUtils.canSucceed(upstreamStage, branch.getStages().get(0))) { 116 errors.add(new ValidationFailure(ValidationFailure.Type.BRANCH_CONNECT, 117 "Branch " + branchKey + " cannot consume data produced by stage.", upstreamStage, branch.getStages().get(0))); 118 } 119 120 return errors; 121 } 122 123 /** 124 * Validate whether or not a stage can be added to the pipeline. 125 * @param pipeline The pipeline to which the stage is being added 126 * @param stage The stage to be added 127 * @param driverFactory the StageDriverFactory used to create a driver for the stage 128 * @return The list of validation errors encountered, or an empty list if the add 129 * passed validation. 130 */ 131 public List<ValidationFailure> validateAddStage(Pipeline pipeline, Stage stage, StageDriverFactory driverFactory) { 132 if (pipeline.getStages().isEmpty()) return Collections.emptyList(); //trivial case 133 134 //establish list of errors to be returned, initially empty 135 List<ValidationFailure> errors = new ArrayList<ValidationFailure>(); 136 137 //search backwards along pipeline for known production 138 Stage previous = null; 139 for (int i = pipeline.getStages().size() - 1; i >= 0; i--) { 140 Stage test = pipeline.getStages().get(i); 141 if (test.getClass().isAnnotationPresent(ProducedTypes.class)) { 142 if (test.getClass().getAnnotation(ProducedTypes.class).value().length == 0) { 143 errors.add(new ValidationFailure(ValidationFailure.Type.STAGE_CONNECT, 144 "Attempt to add stage to pipeline with no production at terminus.", test, stage)); 145 } else { 146 previous = test; 147 } 148 149 break; 150 } 151 } 152 153 if (previous != null) { 154 if (!ValidationUtils.canSucceed(previous, stage)) { 155 errors.add(new ValidationFailure(ValidationFailure.Type.STAGE_CONNECT, 156 "Stage cannot consume output of prior stage.", previous, stage)); 157 158 //TODO: Add checks to determine whether the branch production of the 159 //stage can be consumed by branch pipelines. 160 } 161 } 162 163 return errors; 164 } 165 166 /** 167 * Validate whether or not the specified branch pipeline can be added 168 * with the specified key. 169 * @param pipeline The pipeline to which the branch is being added 170 * @param branchKey The identifier for the newly added branch 171 * @param branch The branch pipeline being added 172 * @return The list of validation errors, or an empty list if no errors were found. 173 */ 174 public List<ValidationFailure> validateAddBranch(Pipeline pipeline, String branchKey, Pipeline branch) { 175 return Collections.emptyList(); //all default validation rules exist in pipeline 176 } 177 }