001 /*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018 package org.apache.commons.pipeline.stage;
019
020 import org.apache.commons.logging.Log;
021 import org.apache.commons.logging.LogFactory;
022 import org.apache.commons.pipeline.Feeder;
023 import org.apache.commons.pipeline.Stage;
024 import org.apache.commons.pipeline.StageContext;
025 import org.apache.commons.pipeline.StageException;
026 import org.apache.commons.pipeline.validation.ConsumedTypes;
027 import org.apache.commons.pipeline.validation.ProducesConsumed;
028
029 /**
030 * This is a simple base class for Stages with no-op implementations of the
031 * {@link #preprocess()}, {@link #process()}, {@link #postprocess()},
032 * and {@link #release()} methods.
033 */
034 @ConsumedTypes(Object.class)
035 @ProducesConsumed()
036 public abstract class BaseStage implements Stage {
037 private Log log = LogFactory.getLog(BaseStage.class);
038
039 /**
040 * The context in which the stage runs.
041 */
042 protected StageContext context;
043
044 /**
045 * Feeder for the next downstream stage in the pipeline. This value
046 * is lazily initialized by the process() method.
047 */
048 private Feeder downstreamFeeder;
049
050 /**
051 * This implementation of init() simply stores a reference to the
052 * stage context.
053 */
054 public void init(StageContext context) {
055 this.context = context;
056 }
057
058 /**
059 * No-op implementation. This method should be overridden to provide
060 * preprocessing capability for the stage.
061 *
062 * @throws StageException an Exception thrown by an overriding implementation should
063 * be wrapped in a {@link StageException}
064 * @see Stage#preprocess()
065 */
066 public void preprocess() throws StageException {
067 }
068
069 /**
070 * The only operation performed by this implementation of process()
071 * is to feed the specified object to the downstream feeder.
072 * This method should be overridden to provide processing capability for the stage.
073 *
074 * @param obj Object to be passed to downstream pipeline.
075 * @throws StageException an Exception thrown by an overriding implementation should
076 * be wrapped in a {@link StageException}
077 */
078 public void process(Object obj) throws StageException {
079 this.emit(obj);
080 }
081
082 /**
083 * No-op implementation. This method should be overridden to provide
084 * postprocessing capability for the stage.
085 *
086 * @throws StageException an Exception thrown by an overriding implementation should
087 * be wrapped in a {@link StageException}
088 */
089 public void postprocess() throws StageException {
090 }
091
092 /**
093 * No-op implementation. This method should be overridden to provide
094 * resource release capability for the stage.
095 *
096 * Implementations overriding this method should clean up any lingering resources
097 * that might otherwise be left allocated if an exception is thrown during
098 * processing.
099 *
100 * @see Stage#release()
101 */
102 public void release() {
103 }
104
105 /**
106 * Convenience method to feed the specified object to the next stage downstream.
107 */
108 public final void emit(Object obj) {
109 if (log.isDebugEnabled()) log.debug(this.getClass() + " is emitting object " + obj);
110 if (this.downstreamFeeder == null) this.downstreamFeeder = context.getDownstreamFeeder(this);
111 this.downstreamFeeder.feed(obj);
112 }
113
114 /**
115 * Convenience method to feed the specified object to the first stage of the specified branch.
116 */
117 public final void emit(String branch, Object obj) {
118 this.context.getBranchFeeder(branch).feed(obj);
119 }
120 }