A tutorial on some of the Basics needed to use the Apache Commons Pipeline workflow framework. The target audience for this document consists of developers who will need to assemble existing stages or write their own stages. The pipeline provides a Java class library intended to make it easy to use and reuse stages as modular processing blocks.

Pipeline Structure

Stages in a pipeline represent the logical the steps needed to process data. Each represents a single high level processing concept such as finding files, reading a file format, computing a product from the data, or writing data to a database. The primary advantage of using the Pipeline framework and building the processing steps into stages is the reusablility of the stages in other pipelines.

A basic pipeline

A Pipeline is built up from stages which can pass data on to subsequent stages. The arrows above that are labelled "EMIT" show the data output of one stage being passed to the next stage. At the code level, there is an emit() method that sends data to the next stage. The data flow starts at the left, where there is an arrow labelled "FEED" . The FEED starts off the pipeline and is usually set up by a configuration file, discussed below. The stages themselves do not care if the incoming data are from a feed or the emit() of a previous stage.

Pipelines may also branch to send the same or different data along different processing routes.
A branching pipeline

Configuration by Digester or Spring

There are two methods for configuring the Pipeline, both based on XML control files. The simpler method uses Digester , end users of a pipeline may be able to modify this for themselves. The Spring framework has also been used to configure the Pipeline, but it is both more complex and more powerful, as it's structure more closely models Java programming objects. The stages are ordered by these XML configuration files, and stage specific parameters are set up by these files. These control files also allow global parameters visible to all stages in the form of Environment parameters. This configuration approach allows alot of control over the pipeline layout and behavior, all without recompiling the Java code.

This tutorial will introduce the Digester route to configuring pipelines since that is the simpler method.

Notes on Stages

A standard stage has a queue to buffer the incoming data objects. The queueing is an aid to efficiency when some stages have different rates of throughput than other stages or irregular processing rates, especially those relying on network connections or near-line media for their data. This queue is not an actual part of the stage itself but is managed by a stage driver , which feeds the objects to the stage as it is ready for them. The stage passes a data object on to the next stage, where it may wait in a queue (in the order received) until the next stage is ready to process it. Typically each stage runs in its own processing thread, however, for some applications you can configure the pipeline to run objects one at a time through all the stages in a single thread, that is, the next object is not started until the previous has finished all the stages.

Stages are derived from the abstract class org.apache.commons.pipeline.stage.BaseStage . There are a number of ready to use existing stages to meet various processing needs. You may also create custom stages by extending the BaseStage or one of the other existing stages.

An example showing mixed types and quantities, see notes below figure.
An example pipeline

Stages in the above diagram illustrate:
  • Normally all the objects going into a stage are of the same type. Avoid the repeated writing of switch statements in stage code to sort objects, instead use branches to segregate different object types.
  • One object fed into a stage does not always produce one object out.
    • Stages that do not pass on (emit) any objecs are referred to as terminal stages . Avoid creating this type of stage, since they limit your possibilities when building pipelines. (This is easy to do, one line of code passes data to the next stage.)
    • Stages that send objects on to more than one subsequent stage are called branching stages .
    • Stages that pass on the same type of object that they receive, but only if meeting some chosen criteria, are called filtering stages .
    • It is common to have reader stages and writer stages to bring information into and out of a pipeline.
    • Stages that create different objects from those passed into them are called converter stages .
  • The type of object emitted does not have to be of the same type going in.
  • When branching, the objects going to different following stages do not have to be of the same type, or of the same quantity. Note that the "FileReader" stage above produces 100 cell objects for each incoming file while just one boundary shape is passed to the branch.
Other notes (not necessarily obvious from the diagram above):
  • Although the data being fed to a stage are passed as Java Objects, the stage receiving them is expecting a more specific data type such as files or data records. Usually incoming objects are checked to see if they are an instance of the desired data class and then casted to that class before the rest of the work is done.
  • You can set the type of stage driver used for each stage in your pipeline. There are options for limiting queue sizes to control memory and resource usage. For these bounded queues, the upstream stages will block and wait until there is adequate room in the downstream stage's queue.

Role of the StageDriver

There is a Java interface called the StageDriver which controls the feeding of data into Stages, and communication between stages and the pipeline containing them. The stage lifecycle and interactions between stages are therefore very dependent on the direction provided by these stage drivers. These StageDriver factories implement the StageDriverFactory interface. During pipeline setup, the StageDrivers are provided by factory classes that produce a specific type of StageDriver. Each stage will have its own instance of a StageDriver, and different stages within a pipeline may use different types of StageDrivers, although it is common for all stages in a pipeline to use the same type of StageDriver (all sharing the same StageDriverFactory implementation).


Some common stage drivers are:

DedicatedThreadStageDriver Spawns a single thread to process a stage. Provided by DedicatedThreadStageDriverFactory()
SynchronousStageDriver This is a non-threaded StageDriver. Provided by SynchronousStageDriverFactory()
ThreadPoolStageDriver Uses a pool of threads to process objects from an input queue. Provided by ThreadPoolStageDriverFactory()

This tutorial will cover the DedicatedThreadStageDriver since that is a good general purpose driver. You may at some point wish to write your own StageDriver implementation, but that is an advanced topic not covered here.

Internal Stage Anatomy

If you need to write your own stage, this section gives an overview on some methods you will need to know about in order to meet the Stage Interface.


Stage itself is an interface defined in org.apache.commons.pipeline.Stage and it must have the following methods:

Stage Interface Methods
init(StageContext) Associate the stage with the environment. Run once in lifecycle.
preprocess() Do any necessary setup. Run once in lifecycle.
process(Object) Process an object & emit results to next stage. Run N times, once for each object fed in.
postprocess() Handle aggregated data, etc. Run once in lifecycle.
release() Clean up any resources being held by stage. Run once in lifecycle.


An abstract class is available called org.apache.commons.pipeline.BaseStage from which many other stages are derived. You can extend this class or one of the other stages built upon BaseStage. This provides no-op implementations of the Stage interface methods. You can then override these methods as needed when you extend one of these classes. For simple processing you may not need to override init(StageContext), postprocess(), nor release(). You will almost always be providing your own process(Object) method however. From a software design perspective, think of Inversion of Control , since instead of writing a custom main program to call standard subroutines, you are writing custom subroutines to be called by a standard main program.



BaseStage provides a method called emit(Object obj), and emit(String branch, Object obj) for branching, which sends objects on to the next Stage. Thus it is normal for emit() to be called near the end of process(). A terminal stage simply doesn't call emit(), so no objects are passed on. It is also very easy to change a stage so it is not a terminal stage by adding an emit() to the code. Note that it is harmless for a stage to emit an object when there is no subsequent stage to use it; the emitted object just goes unused. Sometimes the emit() method is called by postprocess() in addition to or instead of by process(). When processing involves buffering, or summarizing of incoming and outgoing objects, then the process() method normally stores information from incoming objects, and postprocess() finishes up the work and emits a new object.

Stage Lifecycle

When a pipeline is assembled and run, each stage is normally run in its own thread (with all threads of a pipeline being owned by the same JVM instance). This multithreaded approach should give a processing advantage on a multiprocessor system. For a given stage, the various Stage methods are run in order: init(), preprocess(), process(), postprocess() and release(). However, between stages, the order that the various methods begin and complete is not deterministic. In other words, in a pipeline with multiple stages, you can't count on any particular stage's preprocess() methods beginning or completing before or after that method in another stage. If you have dependencies between stages, see the discussion on Events and Listeners in the Communication between Stages section below.



The order of stages in a pipeline is determined by the pipeline configuration file. With Digester, this is an XML file which lists the stages to be used, plus initialization parameters. As each stage is added to the pipeline, its init() method is executed. After all the stages of the pipeline have been loaded into place the pipeline is set to begin running. The preprocess() method is called for the various stages. When using the DedicatedThreadStageDriver each stage begins running in its own thread, and the preprocess() methods are run asynchronously.


When the first stage of a pipeline is done with its preprocess() method, it will begin running process() on objects being fed in by its associated stage driver. As the first stage is done processesing data objects, they will be emitted to the next stage. If the next stage is not finished with its own preprocess() method, the passed data objects will be queued by the second stage's stage driver. When all the initial objects have been processed by the first stage's process() method, then it will then call the postprocess() method. When the postprocess() method is complete, a STOP_REQUESTED signal is sent to the next stage to indicate that no more objects will be coming down the pipeline. The next stage will then finish processing the objects in its queue and then call its own postprocess() method. This sequence of finishing out the queue and postprocessing will propagate down the pipeline. Each stage may begin running its release() method after finishing the postprocess(). init() and release() should not have any dependencies outside their stage.


Each stage can be configured to stop or continue should a fault occur during processing. Stages can throw a StageException during preprocess(), process(), or postprocess(). If configured to continue, the stage will begin processing the next object. If configured to stop on faults, the stage will end processing, and any subsequent process() or postprocess() methods will not be called. The release() method will always be called, as it resides in the finally block of a try-catch construct around the stage processing.

Communication between Stages

There are two primary mechanisms for Stages to communicate with each other. In keeping with the dataflow and "Pipeline" analogy, these both send information "downstream" to subsequent stages.

  • Normal emit() to (queue of) next Stage - sequential passage of data objects. These objects are often implemented as Java Beans, and are sometimes referred to as "data beans".
  • Events and Listeners - often to pass control or synchronizing metadata between stages. Use this mechanism when a stage later in the pipeline needs additional information that can only be provided by an earlier stage, especially information that doesn't belong in the data bean.

As an example of the Event and Listener, suppose you have one stage reading from a database table, and a later stage will be writing data to another database. The table reader stage should pass table layout information to the table writer stage so that the writer can create a table with the proper fields in the event the destination table does not already exist. The TableReader.preprocess() method will raise an event that carries with it the table layout data. The preprocess() method of the following TableWriter stage is set up to listen for the table event, and will wait until that event happens before proceeding. In this way the TableWriter will not process objects until the destination table is ready.

Pipeline Configuration using Digester

Now it's time to present the Pipeline configuration file , which is writtten in XML when using Digester.

First Pipeline Configuration Example

Here is an example showing the basic structure. This pipeline has three stages and an environment constant defined. A summary of the elements shown follows the sample code.

<?xml version="1.0" encoding="UTF-8"?>

<!--
    Document   : configMyPipeline.xml
    Description: An example Pipeline configuration file
-->

<pipeline>
    
    <driverFactory className="org.apache.commons.pipeline.driver.DedicatedThreadStageDriverFactory"
                   id="df0"/>

    <!-- The <env> element can be used to add global environment variable values to the pipeline.
         In this instance almost all of the stages need a key to tell them what type of data
         to process.
    -->
    <env>
        <value key="dataType">STLD</value>
    </env>               
        
    <!-- The initial stage traverses a directory so that it can feed the filenames of
         the files to be processed to the subsequent stages.
         
         The directory path to be traversed is in the feed block following this stage.
  
         The filePattern in the stage block is the pattern to look for within that directory.
    -->
    
    <stage className="org.apache.commons.pipeline.stage.FileFinderStage"
           driverFactoryId="df0"
           filePattern="SALES\.(ASWK|ST(GD|GL|LD))\.N.?\.D\d{5}"/>
      
    <feed>
        <value>/mnt/data2/gdsg/sst/npr</value>
    </feed>

    <stage className="gov.noaa.eds.example.Stage2"
           driverFactoryId="df0" />

    <!-- Write the data from the SstFileReader stage into the Rich Inventory database.  -->
    <stage className="gov.noaa.eds.sst2ri.SstWriterRI"
           driverFactoryId="df0"/>

</pipeline>

Here is a summary explanation of items in the above example


  • <?xml version="1.0" encoding="UTF-8"?>
    These pipeline configuration files always start with this XML declaration.

  • <!-- Standard XML comment -->

  • <pipeline>...</pipeline>
    The top level element is <pipeline> and surrounds the rest of the configuration.

  • <driverFactory className="org.apache.commons.pipeline.driver.DedicatedThreadStageDriverFactory" id="df0"/>
    Sets up a StageDriverFactory to feed and control the stages. Stages that should be controlled by a DedicatedThreadStageDriver will get one from the factory named "df0".

  • <env> <value key="dataType">STLD</value> </env>
    Set up a constant with the name "dataType" that all stages can access to find that "STLD" data are being processed in this run. If there are branches, then the environment constants are local to just the branch they are defined in--they are NOT shared between branches. You can , however, define the same environment constant in as many branches as you need to.

  • <stage className="org.apache.commons.pipeline.stage.FileFinderStage" driverFactoryId="df0" filePattern="SALES\.(ASWK|ST(GD|GL|LD))\.N.?\.D\d{5}"/>
    Defines a stage, FileFinderStage, that will choose files for the next stage to process. This example has a parameter called "filePattern" which limits the files passed on to the next stage. Only files that match the regular expression given will be used. Notice that the "driverFactoryId" is "df0", which matches the name given to the driverFactory element earlier in this file.

  • <feed> <value>/mnt/data2/gdsg/sst/npr</value> </feed>
    Initial data for the first stage are passed in by the <feed> values. In this example, the FileFinderStage expects at least one starting directory from which to get files. Note that the <feed> must come after the first stage in the pipeline in the configuration file. Stages are created as they are encountered in the configuration file, and without any stage defined first, feed values will be discarded.

Second Pipeline Configuration Example: Very Simple

The second example shows a minimal pipeline with two stages. The first stage is a FileFinderStage, which reads in file names from the starting directory "/data/sample" and passes on any starting with "HelloWorld". The second stage is a LogStage, which is commonly used during debugging. LogStage writes it's input to a log file using the passed in object's toString method and then passes on what it receives to the next stage, making it easy to drop between any two stages for debugging purposes without changing the objects passed between them.



Simple Pipeline Configuration Example



The configuration file corresponding the the image above has some colored text to make it easier to match the elements to the objects in the image.



<?xml version="1.0" encoding="UTF-8"?>

<!--
    Document   : configSimplePipeline.xml
    Description: A sample configuration file for a very simple pipeline
-->

<pipeline>

    <driverFactory className="org.apache.commons.pipeline.driver.DedicatedThreadStageDriverFactory"
                   id="driverFactory"/>

    <!--
        ((1)) The first stage recursively searches the directory given in the feed statement.
        The filePattern given will match any files beginning with "HelloWorld".
    -->
    <stage className="org.apache.commons.pipeline.stage.FileFinderStage"
           driverFactoryId="driverFactory"
           filePattern="HelloWorld.*"/><!-- ((3)) -->

    <!-- Starting directory for the first stage. -->
    <feed>
        <value>/data/sample</value> <!-- ((4)) -->
    </feed>

    <!-- ((2)) Report the files found. -->
    <stage className="org.apache.commons.pipeline.stage.LogStage"
           driverFactoryId="driverFactory" />

</pipeline>


One driver factory serves both stages. The driver factory ID is "driverFactory", and this value is used by the driverFactoryId in both stages.


In theory a pipeline could consist of just one stage, but this degenerate case is not much different from a plain program except that it can be easily expanded with additional stages.

Third Pipeline Configuration Example: A More Complex, Branching Pipeline

Complex Pipeline Configuration Example


A color coded configuration file:

<?xml version="1.0" encoding="UTF-8"?>

<!--
   Document   : branchingPipeline.xml
   Description: Configuration file for a pipeline that takes
                user provided files as input, and from that both generates HTML files and
                puts data into a database.
-->

<pipeline>

    <driverFactory className="org.apache.commons.pipeline.driver.DedicatedThreadStageDriverFactory"
        id="df0"/>


    <driverFactory className="org.apache.commons.pipeline.driver.DedicatedThreadStageDriverFactory"
        id="df1">
        <property propName="queueFactory"
            className="org.apache.commons.pipeline.util.BlockingQueueFactory$ArrayBlockingQueueFactory"
            capacity="4" fair="false"/>
    </driverFactory>


    <!-- 
        The <env> element can be used to add global environment variable values to the pipeline.
        In this instance almost all of the stages need a key to tell them what type of data
        to process.
    -->
    <env>
        <value key="division">West</value> <!-- ((9)) -->
    </env>


    <!-- 
        ((1)) The initial stage traverses a directory so that it can feed the filenames of
        of the files to be processed to the subsequent stages.

        The directory path to be traversed is in the feed block at the end of this file.

        The filePattern in the stage block is the pattern to look for within that directory.
    -->
    <stage className="org.apache.commons.pipeline.stage.FileFinderStage"
        driverFactoryId="df0"
        filePattern="SALES\.(ASWK|ST(GD|GL|LD))\.N.?\.D\d{5}"/><!-- ((8)) --><feed>
        <value>/data/INPUT/raw</value> <!-- ((7)), ((11)) -->
    </feed>


    <!--  
        ((2)) This stage is going to select a subset of the files from the previous stage
        and orders them for time sequential processing using the date embedded in
        the last several characters of the file name.

        The filesToProcess is the number of files to emit to the next stage, before
        terminating processing.  Zero (0) has the special meaning that ALL available
        files should be processed.
    -->
    <stage className="com.demo.pipeline.stages.FileSorterStage"
        driverFactoryId="df1"
        filesToProcess="0"/>


    <!-- 
        ((3)) Read the files and create the objects to be passed to stage that writes to
        the database and to the stage that writes the data to
        HTML files.

        WARNING:  The value for htmlPipelineKey in the stage declaration here
        must exactly match the branch pipeline key further down in this file.
    -->
    <stage className="com.demo.pipeline.stages.FileReaderStage"
        driverFactoryId="df1"
        htmlPipelineKey="sales2html"/>


    <!-- 
        ((4)) Write the data from the FileReaderStage stage into the database.
    -->
    <stage className="com.demo.pipeline.stages.DatabaseWriterStage"
        driverFactoryId="df1">

        <datasource user="test"
        password="abc123"
        type="oracle"
        host="brain.demo.com"
        port="1521"
        database="SALES" />

        <database-proxy className="gov.noaa.gdsg.sql.oracle.OracleDatabaseProxy" />

        <tablePath path="summary.inventory" /> <!-- ((13)) -->
    </stage>


    <!-- 
        Write the data from the FileReaderStage stage to HTML files.

        The outputFilePath is the path to which we will be writing our summary HTML files.

        WARNING:  The value for the branch pipeline key declaration here must
        exactly match the htmlPipelineKey in the FileReaderStage stage in this file.
    -->
    <branch>
        <pipeline key="sales2html"> <!-- ((10)) --><env>
                <value key="division">West</value> <!-- ((14)) -->
            </env>

            <driverFactory className="org.apache.commons.pipeline.driver.DedicatedThreadStageDriverFactory"
                id="df2">
                <property propName="queueFactory"
                    className="org.apache.commons.pipeline.util.BlockingQueueFactory$ArrayBlockingQueueFactory"
                    capacity="4" fair="false"/>
            </driverFactory>


            <!-- ((5)) HTMLWriterStage -->
            <stage className="com.demo.pipeline.stages.HTMLWriterStage"
                driverFactoryId="df2"
                outputFilePath="/data/OUTPUT/web"/> <!-- ((12)) -->


            <!-- ((6)) StatPlotterStage -->
            <stage className="com.demo.pipeline.stages.StatPlotterStage"
                driverFactoryId="df2"
                outputFilePath="/data/OUTPUT/web"/> <!-- ((12)) --></pipeline>
    </branch>

</pipeline>

Notes: The "division" configured to "West " in this example in the <env> definition is set in two places. It should be set to the same value in both the main pipeline and the branch pipeline. This is because branches don't share the same environment constants.

The driverFactories "df1" and "df2" override the default queueFactory by specifying the ArrayBlockingQueueFactory . They do this in order to limit the queue sizes of the stages that use df1 or df2, setting a capacity of 4 objects. This is often done to limit the resources used by the pipeline, and may be necessary if an unbounded queue is using all of the available java memory or exceeds the number of open filehandles allowed. The queue size cannot be changed after the queue is created. The fair attribute can be set to "false", since just one thread is accessing the queue. If fair = "true" then there is additional overhead to make sure all threads that access the queue are processed in order (requests are FIFO).

TODO

More should be added to this page:

  • Filtering and other configuration techniques
  • Logfile configuration
  • Other tutorials will be linked in as they are completed

Related topics

Links to other pipeline resources


Credits

Several diagrams and descriptions were drawn from powerpoint presentations by Bill and Kris as well as from the Pipeline code comments.

  • Multithreaded Data Processing Using Jakarta Commons Pipeline , November 2006, Kris Nuttycombe
  • Pipelining the Level 3 SST / Aerosol data: An illustration of how to use the org.apache.commons.pipeline , November 2006, Bill Barrett