001 /* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018 package org.apache.commons.pipeline; 019 020 import java.util.ArrayList; 021 import java.util.Collection; 022 import java.util.Collections; 023 import java.util.EventObject; 024 import java.util.HashMap; 025 import java.util.LinkedList; 026 import java.util.List; 027 import java.util.Map; 028 import org.apache.commons.pipeline.driver.SynchronousStageDriver; 029 import org.apache.commons.pipeline.validation.PipelineValidator; 030 import org.apache.commons.pipeline.validation.ValidationException; 031 import org.apache.commons.pipeline.validation.ValidationFailure; 032 033 /** 034 * This class represents a processing system consisting of a number of stages 035 * and branches. Each stage contains a queue and manages one or more threads 036 * that process data in that stage's queue and allow processed data to be 037 * passed along to subsequent stages and onto branches of the pipeline.<P> 038 * 039 * This class allows all stages in the pipeline to be managed collectively 040 * with methods to start and stop processing for all stages, as well as 041 * a simple framework for asynchronous event-based communication between stages. 042 */ 043 public class Pipeline implements Runnable, StageContext { 044 /** 045 * The branch key for the main line of production. This value is reserved 046 * and may not be used as a key for other branch pipelines. 047 */ 048 public static final String MAIN_BRANCH = "main"; 049 050 //The logger used for reporting by this pipeline 051 //private final Log log = LogFactory.getLog(Pipeline.class); 052 053 // List of stages in the pipeline, encapsulated in the drivers 054 // that will be used to onStart them. 055 private final LinkedList<StageDriver> drivers; 056 private final Map<Stage, StageDriver> driverMap; 057 058 // The list of stages in the pipeline. 059 private final LinkedList<Stage> stages; 060 061 // Map of pipeline branches where the keys are branch names. 062 private final Map<String,Pipeline> branches; 063 064 // Used to store a reference to the parent pipeline, if this is a branch 065 private Pipeline parent; 066 067 // The list of listeners registered with the pipeline. 068 private final List<StageEventListener> listeners; 069 070 // Holds value of property validator. 071 private PipelineValidator validator; 072 073 // Feeder used to handle output of final stage 074 private Feeder terminalFeeder = Feeder.VOID; 075 076 // Global environment variables 077 private Map<String,Object> env = Collections.synchronizedMap(new HashMap<String,Object>()); 078 079 // List of jobs to be run at defined points in pipeline lifecycle 080 private Collection<PipelineLifecycleJob> lifecycleJobs = new ArrayList<PipelineLifecycleJob>(); 081 082 /** 083 * Creates and initializes a new Pipeline. 084 */ 085 public Pipeline() { 086 this.drivers = new LinkedList<StageDriver>(); 087 this.driverMap = new HashMap<Stage, StageDriver>(); 088 this.stages = new LinkedList<Stage>(); 089 this.branches = new HashMap<String,Pipeline>(); 090 this.listeners = Collections.synchronizedList(new ArrayList<StageEventListener>()); 091 } 092 093 /** 094 * {@inheritDoc} 095 */ 096 public void registerListener(StageEventListener listener) { 097 listeners.add(listener); 098 } 099 100 /** 101 * {@inheritDoc} 102 */ 103 public Collection<StageEventListener> getRegisteredListeners() { 104 return this.listeners; 105 } 106 107 /** 108 * Asynchronously notifies each registered listener of an event and propagates 109 * the event to any attached branches and the parent pipeline. 110 * 111 * @param ev The event to be sent to registered listeners 112 */ 113 public void raise(final EventObject ev) { 114 new Thread() { 115 public void run() { 116 //first, recursively find the root pipeline 117 Pipeline root = Pipeline.this; 118 while (root.parent != null) root = root.parent; 119 120 //notify the listeners from the root pipeline 121 root.notifyListeners(ev); 122 } 123 }.start(); 124 } 125 126 /** 127 * Notify all listeners and recursively notify child branches of the 128 * specified event. This method does not propagate events to the 129 * parent pipeline. 130 */ 131 private void notifyListeners(EventObject ev) { 132 for (StageEventListener listener : listeners) listener.notify(ev); 133 for (Pipeline branch : branches.values()) branch.notifyListeners(ev); 134 } 135 136 /** 137 * {@inheritDoc} 138 */ 139 public Feeder getDownstreamFeeder(Stage stage) { 140 if (stage == null) throw new IllegalArgumentException("Unable to look up downstream feeder for null stage."); 141 if (stage == drivers.getLast().getStage()) { 142 return this.terminalFeeder; 143 } else { 144 //Iterate backwards over the list until the stage is found, then return 145 //the feeder for the subsequent stage. Comparisons are done using reference 146 //equality. 147 for (int i = drivers.size() - 2; i >= 0; i--) { 148 if (stage == drivers.get(i).getStage()) return drivers.get(i+1).getFeeder(); 149 } 150 151 throw new IllegalStateException("Unable to find stage " + stage + " in pipeline."); 152 } 153 } 154 155 /** 156 * {@inheritDoc} 157 */ 158 public Feeder getBranchFeeder(String branch) { 159 if (!getBranches().containsKey(branch)) { 160 throw new IllegalStateException("Unable to find branch in pipeline: '" + branch + "'"); 161 } 162 163 return branches.get(branch).getSourceFeeder(); 164 } 165 166 /** 167 * {@inheritDoc} 168 */ 169 public Object getEnv(String key) { 170 return this.env.get(key); 171 } 172 173 /** 174 * Sets the value corresponding to the specified environment variable key. 175 */ 176 public void setEnv(String key, Object value) { 177 this.env.put(key, value); 178 } 179 180 /** 181 * Adds a {@link Stage} object to the end of this Pipeline. If a 182 * {@link PipelineValidator} has been set using {@link #setValidator}, it will 183 * be used to validate that the appended Stage can consume the output of the 184 * previous stage of the pipeline. It does NOT validate the ability or availability 185 * of branches to consume data produced by the appended stage. 186 * 187 * @param stage the stage to be added to the pipeline 188 * @param driverFactory the factory that will be used to create a {@link StageDriver} that will run the stage 189 * @throws ValidationException if there is a non-null validator set for this pipeline and an error is 190 * encountered validating the addition of the stage to the pipeline. 191 */ 192 public final void addStage(Stage stage, StageDriverFactory driverFactory) throws ValidationException { 193 if (stage == null) throw new IllegalArgumentException("Argument \"stage\" for call to Pipeline.addStage() may not be null."); 194 195 if (validator != null) { 196 List<ValidationFailure> errors = validator.validateAddStage(this, stage, driverFactory); 197 if (errors != null && !errors.isEmpty()) { 198 throw new ValidationException("An error occurred adding stage " + stage.toString(), errors); 199 } 200 } 201 202 stage.init(this); 203 this.stages.add(stage); 204 205 StageDriver driver = driverFactory.createStageDriver(stage, this); 206 this.driverMap.put(stage, driver); 207 this.drivers.add(driver); 208 } 209 210 /** 211 * Returns an unmodifiable list of stages that have been added to this 212 * pipeline. 213 * @return A list of the stages that have been added to the pipeline 214 */ 215 public final List<Stage> getStages() { 216 return Collections.unmodifiableList(this.stages); 217 } 218 219 /** 220 * Return the StageDriver for the specified Stage. 221 * 222 * @return the StageDriver for the specified Stage. 223 */ 224 public final StageDriver getStageDriver(Stage stage) { 225 return this.driverMap.get(stage); 226 } 227 228 /** 229 * Returns an unmodifiable list of stage drivers that have been added 230 * to the pipeline. 231 * @return the list of drivers for stages in the pipeline 232 */ 233 public final List<StageDriver> getStageDrivers() { 234 return Collections.unmodifiableList(this.drivers); 235 } 236 237 /** 238 * Adds a branch to the pipeline. 239 * @param key the string identifier that will be used to refer to the added branch 240 * @param pipeline the branch pipeline 241 * @throws org.apache.commons.pipeline.validation.ValidationException if the pipeline has a non-null {@link PipelineValidator} and the branch 242 * cannot consume the data produced for the branch by stages in the pipeline. 243 */ 244 public void addBranch(String key, Pipeline branch) throws ValidationException { 245 if (key == null) 246 throw new IllegalArgumentException("Branch key may not be null."); 247 if (MAIN_BRANCH.equalsIgnoreCase(key)) 248 throw new IllegalArgumentException("Branch key name \"" + MAIN_BRANCH + "\" is reserved."); 249 if (branch == null) 250 throw new IllegalArgumentException("Illegal attempt to set reference to null branch."); 251 if (branch == this || branch.hasBranch(this)) 252 throw new IllegalArgumentException("Illegal attempt to set reference to self as a branch (infinite recursion potential)"); 253 254 if (validator != null) { 255 List<ValidationFailure> errors = validator.validateAddBranch(this, key, branch); 256 if (errors != null && !errors.isEmpty()) { 257 throw new ValidationException("An error occurred adding branch pipeline " + branch, errors); 258 } 259 } 260 261 branch.parent = this; 262 this.branches.put(key, branch); 263 } 264 265 /** 266 * Returns an unmodifiable map of branch pipelines, keyed by branch identifier. 267 * @return the map of registered branch pipelines, keyed by branch identifier 268 */ 269 public Map<String,Pipeline> getBranches() { 270 return Collections.unmodifiableMap(branches); 271 } 272 273 /** 274 * Simple method that recursively checks whether the specified 275 * pipeline is a branch of this pipeline. 276 * @param pipeline the candidate branch 277 * @return true if the specified pipeline is a branch of this pipeline, or recursively 278 * a branch of a branch. Tests are performed using reference equality. 279 */ 280 private boolean hasBranch(Pipeline pipeline) { 281 if (branches.containsValue(pipeline)) return true; 282 for (Pipeline branch : branches.values()) { 283 if (branch.hasBranch(pipeline)) return true; 284 } 285 286 return false; 287 } 288 289 /** 290 * Returns a feeder for the first stage if the pipeline is not empty 291 * @return the feeder to feed the first stage of the pipeline 292 */ 293 public Feeder getSourceFeeder() { 294 if (drivers.isEmpty()) return this.terminalFeeder; 295 return drivers.peek().getFeeder(); 296 } 297 298 /** 299 * Gets the feeder that receives output from the final stage. 300 * @return the terminal feeder being used to handle any output from the final stage. The default is {@link Feeder#VOID} 301 */ 302 public Feeder getTerminalFeeder() { 303 return this.terminalFeeder; 304 } 305 306 /** 307 * Sets the terminal feeder used to handle any output from the final stage. 308 * @param terminalFeeder the {@link Feeder} that will receive any output from the final stage 309 */ 310 public void setTerminalFeeder(Feeder terminalFeeder) { 311 this.terminalFeeder = terminalFeeder; 312 } 313 314 /** 315 * Adds a job to be onStart on startup to the pipeline. 316 */ 317 public void addLifecycleJob(PipelineLifecycleJob job) { 318 this.lifecycleJobs.add(job); 319 } 320 321 /** 322 * This method iterates over the stages in the pipeline, looking up a 323 * {@link StageDriver} for each stage and using that driver to start the stage. 324 * Startups may occur sequentially or in parallel, depending upon the stage driver 325 * used. If a the stage has not been configured with a {@link StageDriver}, 326 * we will use the default, non-threaded {@link SynchronousStageDriver}. 327 * 328 * @throws org.apache.commons.pipeline.StageException Thrown if there is an error during pipeline startup 329 */ 330 public void start() throws StageException { 331 for (PipelineLifecycleJob job : lifecycleJobs) job.onStart(this); 332 for (StageDriver driver: this.drivers) driver.start(); 333 for (Pipeline branch : branches.values()) branch.start(); 334 } 335 336 /** 337 * This method iterates over the stages in the pipeline, looking up a {@link StageDriver} 338 * for each stage and using that driver to request that the stage finish 339 * execution. The {@link StageDriver#finish(Stage)} 340 * method will block until the stage's queue is exhausted, so this method 341 * may be used to safely finalize all stages without the risk of 342 * losing data in the queues. 343 * 344 * @throws org.apache.commons.pipeline.StageException Thrown if there is an unhandled error during stage shutdown 345 */ 346 public void finish() throws StageException { 347 for (StageDriver driver: this.drivers) driver.finish(); 348 for (Pipeline pipeline : branches.values()) pipeline.finish(); 349 for (PipelineLifecycleJob job : lifecycleJobs) job.onFinish(this); 350 } 351 352 /** 353 * Runs the pipeline from start to finish. 354 */ 355 public void run() { 356 try { 357 start(); 358 finish(); 359 } catch (StageException e) { 360 throw new RuntimeException(e); 361 } 362 } 363 364 /** 365 * Returns the validator being used to validate the pipeline structure, 366 * or null if no validation is being performed.. 367 * @return Validator used to validate pipeline structure. 368 */ 369 public PipelineValidator getValidator() { 370 return this.validator; 371 } 372 373 /** 374 * Sets the validator used to validate the pipeline as it is contstructed. 375 * Setting the validator to null disables validation 376 * @param validator Validator used to validate pipeline structure. 377 */ 378 public void setValidator(PipelineValidator validator) { 379 this.validator = validator; 380 } 381 382 /** 383 * Returns the parent of this pipeline, if it is a branch 384 * @return parent Pipeline, or null if this is the main pipeline 385 */ 386 public Pipeline getParent() { 387 return parent; 388 } 389 }