|
||||||||||
| PREV CLASS NEXT CLASS | FRAMES NO FRAMES | |||||||||
| SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD | |||||||||
java.lang.Objectorg.apache.commons.pipeline.stage.ExtendedBaseStage
public abstract class ExtendedBaseStage
Base class for pipeline stages. Keeps track of performance statistics and allows for collection and adjustment via JMX (optional) Cannot extend BaseStage because it marks the emit methods as final.
| Field Summary | |
|---|---|
protected static ThreadLocal<AtomicInteger> |
emitCount
ThreadLocal count of emit calls during the current process call. |
protected static ThreadLocal<AtomicLong> |
emitTotal
ThreadLocal sum of time spent waiting on blocked queues during the current process call. |
protected static ThreadLocal<NumberFormat> |
floatFormatter
ThreadLocal formatter since they are not thread safe. |
protected org.apache.commons.logging.Log |
log
|
protected StageContext |
stageContext
|
protected static ThreadLocal<Map<String,AtomicLong>> |
threadLocalEmitBranchTime
ThreadLocal sum of time spent waiting on blocked queues during the current process call by queue name. |
| Constructor Summary | |
|---|---|
ExtendedBaseStage()
|
|
| Method Summary | |
|---|---|
void |
emit(Object obj)
Convenience method to feed the specified object to the next stage downstream. |
void |
emit(String branch,
Object obj)
Convenience method to feed the specified object to the first stage of the specified branch. |
protected String |
formatCounterStat(String name,
AtomicInteger count)
|
protected String |
formatCounterStat(String name,
AtomicLong count)
|
protected String |
formatCounterStat(String name,
long count)
|
protected String |
formatTotalTimeStat(String name,
AtomicLong totalTime)
|
protected String |
formatTotalTimeStat(String name,
long totalTime)
|
Boolean |
getCollectBranchStats()
|
double |
getCurrentServiceTimeAverage()
Returns a moving average of the service time. |
Integer |
getCurrentStatWindowSize()
Get the size of the service time collection window |
long |
getObjectsReceived()
|
String |
getStageName()
|
Integer |
getStatusBatchSize()
|
Long |
getStatusInterval()
|
String |
getStatusMessage()
|
long |
getTotalEmits()
|
long |
getTotalEmitTime()
|
long |
getTotalServiceTime()
|
void |
init(StageContext context)
Initialization takes place when the stage is added to a pipeline. |
void |
innerPostprocess()
|
void |
innerPreprocess()
|
abstract void |
innerProcess(Object obj)
|
boolean |
isJmxEnabled()
|
void |
logStatus()
|
void |
postprocess()
Called when a stage has completed all processing. |
void |
preprocess()
Called when a stage has been created but before the first object is sent to the stage for processing. |
void |
process(Object obj)
Implementations of this method should atomically process a single data object and transfer any feed objects resulting from this processing to the downstream Feeder. |
void |
release()
Implementations of this method should clean up any lingering resources that might otherwise be left allocated if an exception is thrown during processing (or pre/postprocessing). |
void |
setCollectBranchStats(Boolean collectBranchStats)
Branch stats are disabled by default because they are slow. |
void |
setCurrentStatWindowSize(Integer newStatWindowSize)
Set the size of the service time collection window |
void |
setJmxEnabled(boolean jmxEnabled)
|
void |
setStageName(String name)
|
void |
setStatusBatchSize(Integer statusBatchSize)
|
void |
setStatusInterval(Long statusInterval)
|
abstract String |
status()
Class-specific status message. |
| Methods inherited from class java.lang.Object |
|---|
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait |
| Field Detail |
|---|
protected final org.apache.commons.logging.Log log
protected StageContext stageContext
protected static ThreadLocal<AtomicLong> emitTotal
protected static ThreadLocal<Map<String,AtomicLong>> threadLocalEmitBranchTime
protected static ThreadLocal<AtomicInteger> emitCount
protected static ThreadLocal<NumberFormat> floatFormatter
| Constructor Detail |
|---|
public ExtendedBaseStage()
| Method Detail |
|---|
public void init(StageContext context)
StageInitialization takes place when the stage is added to a pipeline. Implementations of this method should perform any necessary setup that is required for the driver to be able to correctly run the stage.
NOTE: Since this method is run when the stage is added to the pipeline, certain information (such as the downstream feeder for the stage) may not yet be available until the pipeline is fully constructoed.
init in interface Stagecontext - the StageContext within which the stage sill be run
public final void preprocess()
throws StageException
preprocess in interface StageStageException - any checked Exception thrown by the implementation should
be wrapped in a StageExceptionStage.preprocess()
public final void process(Object obj)
throws StageException
StageFeeder. This Feeder can be obtained from
the stage context made available during initialization.
NOTE: Implementations of this method must be thread-safe!
process in interface Stageobj - an object to be processed
StageException - any checked Exception thrown by the implementation should
be wrapped in a StageExceptionpublic final void emit(Object obj)
public final void emit(String branch,
Object obj)
public final void postprocess()
throws StageException
postprocess in interface StageStageException - any checked Exception thrown by the implementation should
be wrapped in a StageExceptionStage.postprocess()public void release()
Stage
release in interface Stage
public abstract void innerProcess(Object obj)
throws StageException
StageException
public void innerPreprocess()
throws StageException
StageException
public void innerPostprocess()
throws StageException
StageExceptionpublic abstract String status()
public void logStatus()
public String getStatusMessage()
getStatusMessage in interface ExtendedBaseStageMBean
protected String formatTotalTimeStat(String name,
AtomicLong totalTime)
protected String formatTotalTimeStat(String name,
long totalTime)
protected String formatCounterStat(String name,
AtomicInteger count)
protected String formatCounterStat(String name,
AtomicLong count)
protected String formatCounterStat(String name,
long count)
public Long getStatusInterval()
getStatusInterval in interface ExtendedBaseStageMBeanorg.apache.commons.pipeline.ExtendedBaseStageMBean#getStatusInterval()public void setStatusInterval(Long statusInterval)
setStatusInterval in interface ExtendedBaseStageMBeanstatusInterval - new status intervalorg.apache.commons.pipeline.ExtendedBaseStageMBean#setStatusInterval(long)public Integer getStatusBatchSize()
getStatusBatchSize in interface ExtendedBaseStageMBeanpublic void setStatusBatchSize(Integer statusBatchSize)
setStatusBatchSize in interface ExtendedBaseStageMBeanstatusBatchSize - Size of batches processes by this stage (used to adjust throughput statistics)public long getObjectsReceived()
getObjectsReceived in interface ExtendedBaseStageMBeanorg.apache.commons.pipeline.ExtendedBaseStageMBean#getObjectsReceived()public long getTotalServiceTime()
getTotalServiceTime in interface ExtendedBaseStageMBeanorg.apache.commons.pipeline.ExtendedBaseStageMBean#getTotalServiceTime()public long getTotalEmitTime()
getTotalEmitTime in interface ExtendedBaseStageMBeanorg.apache.commons.pipeline.ExtendedBaseStageMBean#getTotalEmitTime()public long getTotalEmits()
getTotalEmits in interface ExtendedBaseStageMBeanorg.apache.commons.pipeline.ExtendedBaseStageMBean#getTotalEmits()public Boolean getCollectBranchStats()
getCollectBranchStats in interface ExtendedBaseStageMBeanorg.apache.commons.pipeline.ExtendedBaseStageMBean#getCollectBranchStats()public void setCollectBranchStats(Boolean collectBranchStats)
ExtendedBaseStageMBean
setCollectBranchStats in interface ExtendedBaseStageMBeancollectBranchStats - true if this stage should start collecting branch stats,
false otherwise.org.apache.commons.pipeline.ExtendedBaseStageMBean#setCollectBranchStats(Boolean)public Integer getCurrentStatWindowSize()
ExtendedBaseStageMBean
getCurrentStatWindowSize in interface ExtendedBaseStageMBeanpublic void setCurrentStatWindowSize(Integer newStatWindowSize)
ExtendedBaseStageMBean
setCurrentStatWindowSize in interface ExtendedBaseStageMBeanpublic String getStageName()
public void setStageName(String name)
public boolean isJmxEnabled()
public void setJmxEnabled(boolean jmxEnabled)
public double getCurrentServiceTimeAverage()
getCurrentServiceTimeAverage in interface ExtendedBaseStageMBeancurrentStatWindowSize objects.
|
||||||||||
| PREV CLASS NEXT CLASS | FRAMES NO FRAMES | |||||||||
| SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD | |||||||||