org.apache.commons.pipeline.stage
Class ExtendedBaseStage

java.lang.Object
  extended by org.apache.commons.pipeline.stage.ExtendedBaseStage
All Implemented Interfaces:
Stage, ExtendedBaseStageMBean

public abstract class ExtendedBaseStage
extends Object
implements Stage, ExtendedBaseStageMBean

Base class for pipeline stages. Keeps track of performance statistics and allows for collection and adjustment via JMX (optional) Cannot extend BaseStage because it marks the emit methods as final.

Author:
mzsanford

Field Summary
protected static ThreadLocal<AtomicInteger> emitCount
          ThreadLocal count of emit calls during the current process call.
protected static ThreadLocal<AtomicLong> emitTotal
          ThreadLocal sum of time spent waiting on blocked queues during the current process call.
protected static ThreadLocal<NumberFormat> floatFormatter
          ThreadLocal formatter since they are not thread safe.
protected  org.apache.commons.logging.Log log
           
protected  StageContext stageContext
           
protected static ThreadLocal<Map<String,AtomicLong>> threadLocalEmitBranchTime
          ThreadLocal sum of time spent waiting on blocked queues during the current process call by queue name.
 
Constructor Summary
ExtendedBaseStage()
           
 
Method Summary
 void emit(Object obj)
          Convenience method to feed the specified object to the next stage downstream.
 void emit(String branch, Object obj)
          Convenience method to feed the specified object to the first stage of the specified branch.
protected  String formatCounterStat(String name, AtomicInteger count)
           
protected  String formatCounterStat(String name, AtomicLong count)
           
protected  String formatCounterStat(String name, long count)
           
protected  String formatTotalTimeStat(String name, AtomicLong totalTime)
           
protected  String formatTotalTimeStat(String name, long totalTime)
           
 Boolean getCollectBranchStats()
           
 double getCurrentServiceTimeAverage()
          Returns a moving average of the service time.
 Integer getCurrentStatWindowSize()
          Get the size of the service time collection window
 long getObjectsReceived()
           
 String getStageName()
           
 Integer getStatusBatchSize()
           
 Long getStatusInterval()
           
 String getStatusMessage()
           
 long getTotalEmits()
           
 long getTotalEmitTime()
           
 long getTotalServiceTime()
           
 void init(StageContext context)
          Initialization takes place when the stage is added to a pipeline.
 void innerPostprocess()
           
 void innerPreprocess()
           
abstract  void innerProcess(Object obj)
           
 boolean isJmxEnabled()
           
 void logStatus()
           
 void postprocess()
          Called when a stage has completed all processing.
 void preprocess()
          Called when a stage has been created but before the first object is sent to the stage for processing.
 void process(Object obj)
          Implementations of this method should atomically process a single data object and transfer any feed objects resulting from this processing to the downstream Feeder.
 void release()
          Implementations of this method should clean up any lingering resources that might otherwise be left allocated if an exception is thrown during processing (or pre/postprocessing).
 void setCollectBranchStats(Boolean collectBranchStats)
          Branch stats are disabled by default because they are slow.
 void setCurrentStatWindowSize(Integer newStatWindowSize)
          Set the size of the service time collection window
 void setJmxEnabled(boolean jmxEnabled)
           
 void setStageName(String name)
           
 void setStatusBatchSize(Integer statusBatchSize)
           
 void setStatusInterval(Long statusInterval)
           
abstract  String status()
          Class-specific status message.
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

log

protected final org.apache.commons.logging.Log log

stageContext

protected StageContext stageContext

emitTotal

protected static ThreadLocal<AtomicLong> emitTotal
ThreadLocal sum of time spent waiting on blocked queues during the current process call.


threadLocalEmitBranchTime

protected static ThreadLocal<Map<String,AtomicLong>> threadLocalEmitBranchTime
ThreadLocal sum of time spent waiting on blocked queues during the current process call by queue name.


emitCount

protected static ThreadLocal<AtomicInteger> emitCount
ThreadLocal count of emit calls during the current process call.


floatFormatter

protected static ThreadLocal<NumberFormat> floatFormatter
ThreadLocal formatter since they are not thread safe.

Constructor Detail

ExtendedBaseStage

public ExtendedBaseStage()
Method Detail

init

public void init(StageContext context)
Description copied from interface: Stage

Initialization takes place when the stage is added to a pipeline. Implementations of this method should perform any necessary setup that is required for the driver to be able to correctly run the stage.

NOTE: Since this method is run when the stage is added to the pipeline, certain information (such as the downstream feeder for the stage) may not yet be available until the pipeline is fully constructoed.

Specified by:
init in interface Stage
Parameters:
context - the StageContext within which the stage sill be run

preprocess

public final void preprocess()
                      throws StageException
Called when a stage has been created but before the first object is sent to the stage for processing. Subclasses should use the innerPreprocess method, which is called by this method.

Specified by:
preprocess in interface Stage
Throws:
StageException - any checked Exception thrown by the implementation should be wrapped in a StageException
See Also:
Stage.preprocess()

process

public final void process(Object obj)
                   throws StageException
Description copied from interface: Stage
Implementations of this method should atomically process a single data object and transfer any feed objects resulting from this processing to the downstream Feeder. This Feeder can be obtained from the stage context made available during initialization. NOTE: Implementations of this method must be thread-safe!

Specified by:
process in interface Stage
Parameters:
obj - an object to be processed
Throws:
StageException - any checked Exception thrown by the implementation should be wrapped in a StageException

emit

public final void emit(Object obj)
Convenience method to feed the specified object to the next stage downstream.


emit

public final void emit(String branch,
                       Object obj)
Convenience method to feed the specified object to the first stage of the specified branch.


postprocess

public final void postprocess()
                       throws StageException
Called when a stage has completed all processing. Subclasses should use the innerPostprocess method, which is called by this method.

Specified by:
postprocess in interface Stage
Throws:
StageException - any checked Exception thrown by the implementation should be wrapped in a StageException
See Also:
Stage.postprocess()

release

public void release()
Description copied from interface: Stage
Implementations of this method should clean up any lingering resources that might otherwise be left allocated if an exception is thrown during processing (or pre/postprocessing).

Specified by:
release in interface Stage

innerProcess

public abstract void innerProcess(Object obj)
                           throws StageException
Throws:
StageException

innerPreprocess

public void innerPreprocess()
                     throws StageException
Throws:
StageException

innerPostprocess

public void innerPostprocess()
                      throws StageException
Throws:
StageException

status

public abstract String status()
Class-specific status message. Null or empty status' will be ignored.


logStatus

public void logStatus()

getStatusMessage

public String getStatusMessage()
Specified by:
getStatusMessage in interface ExtendedBaseStageMBean
Returns:
Log message including both base stage and class specific stats.

formatTotalTimeStat

protected String formatTotalTimeStat(String name,
                                     AtomicLong totalTime)

formatTotalTimeStat

protected String formatTotalTimeStat(String name,
                                     long totalTime)

formatCounterStat

protected String formatCounterStat(String name,
                                   AtomicInteger count)

formatCounterStat

protected String formatCounterStat(String name,
                                   AtomicLong count)

formatCounterStat

protected String formatCounterStat(String name,
                                   long count)

getStatusInterval

public Long getStatusInterval()
Specified by:
getStatusInterval in interface ExtendedBaseStageMBean
Returns:
number of records after which status messages are logged.
See Also:
org.apache.commons.pipeline.ExtendedBaseStageMBean#getStatusInterval()

setStatusInterval

public void setStatusInterval(Long statusInterval)
Specified by:
setStatusInterval in interface ExtendedBaseStageMBean
Parameters:
statusInterval - new status interval
See Also:
org.apache.commons.pipeline.ExtendedBaseStageMBean#setStatusInterval(long)

getStatusBatchSize

public Integer getStatusBatchSize()
Specified by:
getStatusBatchSize in interface ExtendedBaseStageMBean
Returns:
Size of batches processes by this stage (used to adjust throughput statistics)

setStatusBatchSize

public void setStatusBatchSize(Integer statusBatchSize)
Specified by:
setStatusBatchSize in interface ExtendedBaseStageMBean
Parameters:
statusBatchSize - Size of batches processes by this stage (used to adjust throughput statistics)

getObjectsReceived

public long getObjectsReceived()
Specified by:
getObjectsReceived in interface ExtendedBaseStageMBean
Returns:
number of objects received
See Also:
org.apache.commons.pipeline.ExtendedBaseStageMBean#getObjectsReceived()

getTotalServiceTime

public long getTotalServiceTime()
Specified by:
getTotalServiceTime in interface ExtendedBaseStageMBean
Returns:
total number of milliseconds spent processing
See Also:
org.apache.commons.pipeline.ExtendedBaseStageMBean#getTotalServiceTime()

getTotalEmitTime

public long getTotalEmitTime()
Specified by:
getTotalEmitTime in interface ExtendedBaseStageMBean
Returns:
total number of milliseconds spent blocked on downstream queues
See Also:
org.apache.commons.pipeline.ExtendedBaseStageMBean#getTotalEmitTime()

getTotalEmits

public long getTotalEmits()
Specified by:
getTotalEmits in interface ExtendedBaseStageMBean
Returns:
total number of emits to downstream queues
See Also:
org.apache.commons.pipeline.ExtendedBaseStageMBean#getTotalEmits()

getCollectBranchStats

public Boolean getCollectBranchStats()
Specified by:
getCollectBranchStats in interface ExtendedBaseStageMBean
Returns:
true is this stage is collecting branch stats, false otherwise.
See Also:
org.apache.commons.pipeline.ExtendedBaseStageMBean#getCollectBranchStats()

setCollectBranchStats

public void setCollectBranchStats(Boolean collectBranchStats)
Description copied from interface: ExtendedBaseStageMBean
Branch stats are disabled by default because they are slow. Turning this on can have a noticeable effect on stage throughput.

Specified by:
setCollectBranchStats in interface ExtendedBaseStageMBean
Parameters:
collectBranchStats - true if this stage should start collecting branch stats, false otherwise.
See Also:
org.apache.commons.pipeline.ExtendedBaseStageMBean#setCollectBranchStats(Boolean)

getCurrentStatWindowSize

public Integer getCurrentStatWindowSize()
Description copied from interface: ExtendedBaseStageMBean
Get the size of the service time collection window

Specified by:
getCurrentStatWindowSize in interface ExtendedBaseStageMBean

setCurrentStatWindowSize

public void setCurrentStatWindowSize(Integer newStatWindowSize)
Description copied from interface: ExtendedBaseStageMBean
Set the size of the service time collection window

Specified by:
setCurrentStatWindowSize in interface ExtendedBaseStageMBean

getStageName

public String getStageName()

setStageName

public void setStageName(String name)

isJmxEnabled

public boolean isJmxEnabled()

setJmxEnabled

public void setJmxEnabled(boolean jmxEnabled)

getCurrentServiceTimeAverage

public double getCurrentServiceTimeAverage()
Returns a moving average of the service time. This does not yet take into account time spent calling emit, nor does it return minimum, maximum or other statistical information at this time.

Specified by:
getCurrentServiceTimeAverage in interface ExtendedBaseStageMBean
Returns:
Average time to process the last currentStatWindowSize objects.


Copyright © 2004-2009 The Apache Software Foundation. All Rights Reserved.