|
||||||||||
PREV CLASS NEXT CLASS | FRAMES NO FRAMES | |||||||||
SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD |
java.lang.Object org.apache.commons.pipeline.stage.ExtendedBaseStage
public abstract class ExtendedBaseStage
Base class for pipeline stages. Keeps track of performance statistics and allows for collection and adjustment via JMX (optional) Cannot extend BaseStage because it marks the emit methods as final.
Field Summary | |
---|---|
protected static ThreadLocal<AtomicInteger> |
emitCount
ThreadLocal count of emit calls during the current process call. |
protected static ThreadLocal<AtomicLong> |
emitTotal
ThreadLocal sum of time spent waiting on blocked queues during the current process call. |
protected static ThreadLocal<NumberFormat> |
floatFormatter
ThreadLocal formatter since they are not thread safe. |
protected org.apache.commons.logging.Log |
log
|
protected StageContext |
stageContext
|
protected static ThreadLocal<Map<String,AtomicLong>> |
threadLocalEmitBranchTime
ThreadLocal sum of time spent waiting on blocked queues during the current process call by queue name. |
Constructor Summary | |
---|---|
ExtendedBaseStage()
|
Method Summary | |
---|---|
void |
emit(Object obj)
Convenience method to feed the specified object to the next stage downstream. |
void |
emit(String branch,
Object obj)
Convenience method to feed the specified object to the first stage of the specified branch. |
protected String |
formatCounterStat(String name,
AtomicInteger count)
|
protected String |
formatCounterStat(String name,
AtomicLong count)
|
protected String |
formatCounterStat(String name,
long count)
|
protected String |
formatTotalTimeStat(String name,
AtomicLong totalTime)
|
protected String |
formatTotalTimeStat(String name,
long totalTime)
|
Boolean |
getCollectBranchStats()
|
double |
getCurrentServiceTimeAverage()
Returns a moving average of the service time. |
Integer |
getCurrentStatWindowSize()
Get the size of the service time collection window |
long |
getObjectsReceived()
|
String |
getStageName()
|
Integer |
getStatusBatchSize()
|
Long |
getStatusInterval()
|
String |
getStatusMessage()
|
long |
getTotalEmits()
|
long |
getTotalEmitTime()
|
long |
getTotalServiceTime()
|
void |
init(StageContext context)
Initialization takes place when the stage is added to a pipeline. |
void |
innerPostprocess()
|
void |
innerPreprocess()
|
abstract void |
innerProcess(Object obj)
|
boolean |
isJmxEnabled()
|
void |
logStatus()
|
void |
postprocess()
Called when a stage has completed all processing. |
void |
preprocess()
Called when a stage has been created but before the first object is sent to the stage for processing. |
void |
process(Object obj)
Implementations of this method should atomically process a single data object and transfer any feed objects resulting from this processing to the downstream Feeder . |
void |
release()
Implementations of this method should clean up any lingering resources that might otherwise be left allocated if an exception is thrown during processing (or pre/postprocessing). |
void |
setCollectBranchStats(Boolean collectBranchStats)
Branch stats are disabled by default because they are slow. |
void |
setCurrentStatWindowSize(Integer newStatWindowSize)
Set the size of the service time collection window |
void |
setJmxEnabled(boolean jmxEnabled)
|
void |
setStageName(String name)
|
void |
setStatusBatchSize(Integer statusBatchSize)
|
void |
setStatusInterval(Long statusInterval)
|
abstract String |
status()
Class-specific status message. |
Methods inherited from class java.lang.Object |
---|
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait |
Field Detail |
---|
protected final org.apache.commons.logging.Log log
protected StageContext stageContext
protected static ThreadLocal<AtomicLong> emitTotal
protected static ThreadLocal<Map<String,AtomicLong>> threadLocalEmitBranchTime
protected static ThreadLocal<AtomicInteger> emitCount
protected static ThreadLocal<NumberFormat> floatFormatter
Constructor Detail |
---|
public ExtendedBaseStage()
Method Detail |
---|
public void init(StageContext context)
Stage
Initialization takes place when the stage is added to a pipeline. Implementations of this method should perform any necessary setup that is required for the driver to be able to correctly run the stage.
NOTE: Since this method is run when the stage is added to the pipeline, certain information (such as the downstream feeder for the stage) may not yet be available until the pipeline is fully constructoed.
init
in interface Stage
context
- the StageContext
within which the stage sill be runpublic final void preprocess() throws StageException
preprocess
in interface Stage
StageException
- any checked Exception thrown by the implementation should
be wrapped in a StageException
Stage.preprocess()
public final void process(Object obj) throws StageException
Stage
Feeder
. This Feeder
can be obtained from
the stage context made available during initialization
.
NOTE: Implementations of this method must be thread-safe!
process
in interface Stage
obj
- an object to be processed
StageException
- any checked Exception thrown by the implementation should
be wrapped in a StageException
public final void emit(Object obj)
public final void emit(String branch, Object obj)
public final void postprocess() throws StageException
postprocess
in interface Stage
StageException
- any checked Exception thrown by the implementation should
be wrapped in a StageException
Stage.postprocess()
public void release()
Stage
release
in interface Stage
public abstract void innerProcess(Object obj) throws StageException
StageException
public void innerPreprocess() throws StageException
StageException
public void innerPostprocess() throws StageException
StageException
public abstract String status()
public void logStatus()
public String getStatusMessage()
getStatusMessage
in interface ExtendedBaseStageMBean
protected String formatTotalTimeStat(String name, AtomicLong totalTime)
protected String formatTotalTimeStat(String name, long totalTime)
protected String formatCounterStat(String name, AtomicInteger count)
protected String formatCounterStat(String name, AtomicLong count)
protected String formatCounterStat(String name, long count)
public Long getStatusInterval()
getStatusInterval
in interface ExtendedBaseStageMBean
org.apache.commons.pipeline.ExtendedBaseStageMBean#getStatusInterval()
public void setStatusInterval(Long statusInterval)
setStatusInterval
in interface ExtendedBaseStageMBean
statusInterval
- new status intervalorg.apache.commons.pipeline.ExtendedBaseStageMBean#setStatusInterval(long)
public Integer getStatusBatchSize()
getStatusBatchSize
in interface ExtendedBaseStageMBean
public void setStatusBatchSize(Integer statusBatchSize)
setStatusBatchSize
in interface ExtendedBaseStageMBean
statusBatchSize
- Size of batches processes by this stage (used to adjust throughput statistics)public long getObjectsReceived()
getObjectsReceived
in interface ExtendedBaseStageMBean
org.apache.commons.pipeline.ExtendedBaseStageMBean#getObjectsReceived()
public long getTotalServiceTime()
getTotalServiceTime
in interface ExtendedBaseStageMBean
org.apache.commons.pipeline.ExtendedBaseStageMBean#getTotalServiceTime()
public long getTotalEmitTime()
getTotalEmitTime
in interface ExtendedBaseStageMBean
org.apache.commons.pipeline.ExtendedBaseStageMBean#getTotalEmitTime()
public long getTotalEmits()
getTotalEmits
in interface ExtendedBaseStageMBean
org.apache.commons.pipeline.ExtendedBaseStageMBean#getTotalEmits()
public Boolean getCollectBranchStats()
getCollectBranchStats
in interface ExtendedBaseStageMBean
org.apache.commons.pipeline.ExtendedBaseStageMBean#getCollectBranchStats()
public void setCollectBranchStats(Boolean collectBranchStats)
ExtendedBaseStageMBean
setCollectBranchStats
in interface ExtendedBaseStageMBean
collectBranchStats
- true if this stage should start collecting branch stats,
false otherwise.org.apache.commons.pipeline.ExtendedBaseStageMBean#setCollectBranchStats(Boolean)
public Integer getCurrentStatWindowSize()
ExtendedBaseStageMBean
getCurrentStatWindowSize
in interface ExtendedBaseStageMBean
public void setCurrentStatWindowSize(Integer newStatWindowSize)
ExtendedBaseStageMBean
setCurrentStatWindowSize
in interface ExtendedBaseStageMBean
public String getStageName()
public void setStageName(String name)
public boolean isJmxEnabled()
public void setJmxEnabled(boolean jmxEnabled)
public double getCurrentServiceTimeAverage()
getCurrentServiceTimeAverage
in interface ExtendedBaseStageMBean
currentStatWindowSize
objects.
|
||||||||||
PREV CLASS NEXT CLASS | FRAMES NO FRAMES | |||||||||
SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD |