001 /* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018 package org.apache.commons.pipeline.stage; 019 020 import org.apache.commons.logging.Log; 021 import org.apache.commons.logging.LogFactory; 022 import org.apache.commons.pipeline.Feeder; 023 import org.apache.commons.pipeline.Stage; 024 import org.apache.commons.pipeline.StageContext; 025 import org.apache.commons.pipeline.StageException; 026 import org.apache.commons.pipeline.validation.ConsumedTypes; 027 import org.apache.commons.pipeline.validation.ProducesConsumed; 028 029 /** 030 * This is a simple base class for Stages with no-op implementations of the 031 * {@link #preprocess()}, {@link #process()}, {@link #postprocess()}, 032 * and {@link #release()} methods. 033 */ 034 @ConsumedTypes(Object.class) 035 @ProducesConsumed() 036 public abstract class BaseStage implements Stage { 037 private Log log = LogFactory.getLog(BaseStage.class); 038 039 /** 040 * The context in which the stage runs. 041 */ 042 protected StageContext context; 043 044 /** 045 * Feeder for the next downstream stage in the pipeline. This value 046 * is lazily initialized by the process() method. 047 */ 048 private Feeder downstreamFeeder; 049 050 /** 051 * This implementation of init() simply stores a reference to the 052 * stage context. 053 */ 054 public void init(StageContext context) { 055 this.context = context; 056 } 057 058 /** 059 * No-op implementation. This method should be overridden to provide 060 * preprocessing capability for the stage. 061 * 062 * @throws StageException an Exception thrown by an overriding implementation should 063 * be wrapped in a {@link StageException} 064 * @see Stage#preprocess() 065 */ 066 public void preprocess() throws StageException { 067 } 068 069 /** 070 * The only operation performed by this implementation of process() 071 * is to feed the specified object to the downstream feeder. 072 * This method should be overridden to provide processing capability for the stage. 073 * 074 * @param obj Object to be passed to downstream pipeline. 075 * @throws StageException an Exception thrown by an overriding implementation should 076 * be wrapped in a {@link StageException} 077 */ 078 public void process(Object obj) throws StageException { 079 this.emit(obj); 080 } 081 082 /** 083 * No-op implementation. This method should be overridden to provide 084 * postprocessing capability for the stage. 085 * 086 * @throws StageException an Exception thrown by an overriding implementation should 087 * be wrapped in a {@link StageException} 088 */ 089 public void postprocess() throws StageException { 090 } 091 092 /** 093 * No-op implementation. This method should be overridden to provide 094 * resource release capability for the stage. 095 * 096 * Implementations overriding this method should clean up any lingering resources 097 * that might otherwise be left allocated if an exception is thrown during 098 * processing. 099 * 100 * @see Stage#release() 101 */ 102 public void release() { 103 } 104 105 /** 106 * Convenience method to feed the specified object to the next stage downstream. 107 */ 108 public final void emit(Object obj) { 109 if (log.isDebugEnabled()) log.debug(this.getClass() + " is emitting object " + obj); 110 if (this.downstreamFeeder == null) this.downstreamFeeder = context.getDownstreamFeeder(this); 111 this.downstreamFeeder.feed(obj); 112 } 113 114 /** 115 * Convenience method to feed the specified object to the first stage of the specified branch. 116 */ 117 public final void emit(String branch, Object obj) { 118 this.context.getBranchFeeder(branch).feed(obj); 119 } 120 }