001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *     http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    
018    package org.apache.commons.pipeline.stage;
019    
020    import org.apache.commons.logging.Log;
021    import org.apache.commons.logging.LogFactory;
022    import org.apache.commons.pipeline.Feeder;
023    import org.apache.commons.pipeline.Stage;
024    import org.apache.commons.pipeline.StageContext;
025    import org.apache.commons.pipeline.StageException;
026    import org.apache.commons.pipeline.validation.ConsumedTypes;
027    import org.apache.commons.pipeline.validation.ProducesConsumed;
028    
029    /**
030     * This is a simple base class for Stages with no-op implementations of the
031     * {@link #preprocess()}, {@link #process()}, {@link #postprocess()},
032     * and {@link #release()} methods.
033     */
034    @ConsumedTypes(Object.class)
035    @ProducesConsumed()
036    public abstract class BaseStage implements Stage {
037        private Log log = LogFactory.getLog(BaseStage.class);
038        
039        /**
040         * The context in which the stage runs.
041         */
042        protected StageContext context;
043        
044        /**
045         * Feeder for the next downstream stage in the pipeline. This value
046         * is lazily initialized by the process() method.
047         */
048        private Feeder downstreamFeeder;
049        
050        /**
051         * This implementation of init() simply stores a reference to the
052         * stage context.
053         */
054        public void init(StageContext context) {
055            this.context = context;
056        }
057        
058        /**
059         * No-op implementation. This method should be overridden to provide
060         * preprocessing capability for the stage.
061         *
062         * @throws StageException an Exception thrown by an overriding implementation should
063         * be wrapped in a {@link StageException}
064         * @see Stage#preprocess()
065         */
066        public void preprocess() throws StageException {
067        }
068        
069        /**
070         * The only operation performed by this implementation of process()
071         * is to feed the specified object to the downstream feeder.
072         * This method should be overridden to provide processing capability for the stage.
073         *
074         * @param obj Object to be passed to downstream pipeline.
075         * @throws StageException an Exception thrown by an overriding implementation should
076         * be wrapped in a {@link StageException}
077         */
078        public void process(Object obj) throws StageException {
079            this.emit(obj);
080        }
081        
082        /**
083         * No-op implementation. This method should be overridden to provide
084         * postprocessing capability for the stage.
085         *
086         * @throws StageException an Exception thrown by an overriding implementation should
087         * be wrapped in a {@link StageException}
088         */
089        public void postprocess() throws StageException {
090        }
091        
092        /**
093         * No-op implementation. This method should be overridden to provide
094         * resource release capability for the stage.
095         *
096         * Implementations overriding this method should clean up any lingering resources
097         * that might otherwise be left allocated if an exception is thrown during
098         * processing.
099         *
100         * @see Stage#release()
101         */
102        public void release() {
103        }
104        
105        /**
106         * Convenience method to feed the specified object to the next stage downstream.
107         */
108        public final void emit(Object obj) {
109            if (log.isDebugEnabled()) log.debug(this.getClass() + " is emitting object " + obj);
110            if (this.downstreamFeeder == null) this.downstreamFeeder = context.getDownstreamFeeder(this);
111            this.downstreamFeeder.feed(obj);
112        }
113        
114        /**
115         * Convenience method to feed the specified object to the first stage of the specified branch.
116         */
117        public final void emit(String branch, Object obj) {
118            this.context.getBranchFeeder(branch).feed(obj);
119        }
120    }