001 /*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.commons.pipeline.stage;
018
019 import java.lang.management.ManagementFactory;
020 import java.text.DecimalFormat;
021 import java.text.NumberFormat;
022 import java.util.HashMap;
023 import java.util.Map;
024 import java.util.concurrent.atomic.AtomicInteger;
025 import java.util.concurrent.atomic.AtomicLong;
026
027 import javax.management.MBeanServer;
028 import javax.management.ObjectName;
029 import javax.management.StandardMBean;
030
031 import org.apache.commons.lang.time.StopWatch;
032 import org.apache.commons.logging.Log;
033 import org.apache.commons.logging.LogFactory;
034 import org.apache.commons.math.stat.descriptive.SynchronizedDescriptiveStatistics;
035 import org.apache.commons.pipeline.Feeder;
036 import org.apache.commons.pipeline.Stage;
037 import org.apache.commons.pipeline.StageContext;
038 import org.apache.commons.pipeline.StageException;
039
040 /**
041 * Base class for pipeline stages. Keeps track of performance statistics and allows
042 * for collection and adjustment via JMX (optional)
043 *
044 * Cannot extend BaseStage because it marks the emit methods as final.
045 *
046 * @author mzsanford
047 */
048 public abstract class ExtendedBaseStage implements Stage, ExtendedBaseStageMBean {
049 /** Minimum percentage of blocking before we report per-branch stats. */
050 private static final float BRANCH_BLOCK_THRESHOLD = 1.0f;
051 /** Default size of the moving-window average statistics */
052 private static final int DEFAULT_DESCRIPTIVE_STATS_WINDOW_SIZE = 100;
053 /** Default queue name when reporting statistics */
054 private static final String DEFAULT_QUEUE_NAME = "[DefaultQueue]";
055 /** Default number of objects after which a status message is logged */
056 private static final int DEFAULT_STATUS_INTERVAL = 1000;
057 protected final Log log = LogFactory.getLog( getClass() );
058
059 protected StageContext stageContext;
060 private Feeder downstreamFeeder;
061 private String stageName;
062 private final AtomicLong objectsReceived = new AtomicLong(0);
063 private final AtomicLong unhandledExceptions = new AtomicLong(0);
064 private final AtomicLong totalServiceTime = new AtomicLong(0);
065 private final AtomicLong totalEmitTime = new AtomicLong(0);
066 private final AtomicLong totalEmits = new AtomicLong(0);
067 private final Map<String, AtomicLong> emitTimeByBranch = new HashMap<String, AtomicLong>();
068 private int currentStatWindowSize = DEFAULT_DESCRIPTIVE_STATS_WINDOW_SIZE;
069 private SynchronizedDescriptiveStatistics serviceTimeStatistics;
070 private long statusInterval = DEFAULT_STATUS_INTERVAL;
071 private Integer statusBatchSize = 1;
072 private boolean collectBranchStats = false;
073 private boolean preProcessed = false; // prevent recursion.
074 private boolean postProcessed = false; // prevent recursion.
075 private boolean jmxEnabled = true;
076
077 /**
078 * Class name for status message. Needed because java.util.logging only
079 * reports the base class name.
080 */
081 private final String className = getClass().getSimpleName();
082
083 /**
084 * ThreadLocal sum of time spent waiting on blocked queues during the current process call.
085 */
086 protected static ThreadLocal<AtomicLong> emitTotal = new ThreadLocal<AtomicLong>() {
087 protected synchronized AtomicLong initialValue() {
088 return new AtomicLong();
089 }
090 };
091
092 /**
093 * ThreadLocal sum of time spent waiting on blocked queues during the current process call by queue name.
094 */
095 protected static ThreadLocal<Map<String, AtomicLong>> threadLocalEmitBranchTime = new ThreadLocal<Map<String, AtomicLong>>() {
096 protected synchronized Map<String, AtomicLong> initialValue() {
097 return new HashMap<String, AtomicLong>();
098 }
099 };
100
101 /**
102 * ThreadLocal count of emit calls during the current process call.
103 */
104 protected static ThreadLocal<AtomicInteger> emitCount = new ThreadLocal<AtomicInteger>() {
105 protected synchronized AtomicInteger initialValue() {
106 return new AtomicInteger();
107 }
108 };
109
110 /**
111 * ThreadLocal formatter since they are not thread safe.
112 */
113 protected static ThreadLocal<NumberFormat> floatFormatter = new ThreadLocal<NumberFormat>() {
114 protected synchronized NumberFormat initialValue() {
115 return new DecimalFormat("##0.000");
116 }
117 };
118
119 public ExtendedBaseStage() {
120 // Empty constructor provided for future use.
121 }
122
123 public void init( StageContext context ) {
124 this.stageContext = context;
125 if (jmxEnabled) {
126 enableJMX(context);
127 }
128 }
129
130 @SuppressWarnings("unchecked")
131 private final void enableJMX(StageContext context) {
132 MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
133 if (mbs != null) {
134 // Try to build a unique JMX name.
135 StringBuilder sb = new StringBuilder("org.apache.commons.pipeline:");
136 sb.append("class=").append(className);
137 if (stageName != null) {
138 sb.append(",name=").append(stageName);
139 }
140
141 try {
142 ObjectName name = new ObjectName(sb.toString());
143 if (mbs.isRegistered(name)) {
144 log.info("JMX Overlap. Multiple instances of '" + name + "'. Only one will be registered.");
145 } else {
146 Class mbeanInterface = ExtendedBaseStageMBean.class;
147 try {
148 // Find out if the stage has a more specific MBean. Reflection can be slow
149 // but this operation is pretty fast. Not to mention it's only done at startup.
150 Class[] interfaces = getClass().getInterfaces();
151 for (int i=0 ; i < interfaces.length; i++) {
152 Class current = interfaces[i];
153 // Only use interfaces that extend ExtendedBaseStageMBean to maintain a minimum
154 // amount of functionality.
155 if (current != ExtendedBaseStageMBean.class
156 && ExtendedBaseStageMBean.class.isAssignableFrom(current)) {
157 mbeanInterface = current;
158 break;
159 }
160 }
161 } catch (Exception e) {
162 // In the event of security or cast exceptions, default back to base.
163 log.info("Reflection error while checking for JMX interfaces.");
164 // Reset in the off chance it got hosed.
165 mbeanInterface = ExtendedBaseStageMBean.class;
166 }
167
168 StandardMBean mbean = new StandardMBean(this,
169 mbeanInterface);
170 mbs.registerMBean(mbean, name);
171 log.info("JMX MBean registered: " + name.toString() + " (" + mbeanInterface.getSimpleName() + ")");
172 }
173 } catch (Exception e) {
174 log.warn("Failed to register with JMX server",e);
175 }
176 }
177 }
178
179 /**
180 * Called when a stage has been created but before the first object is
181 * sent to the stage for processing. Subclasses
182 * should use the innerPreprocess method, which is called by this method.
183 *
184 * @see org.apache.commons.pipeline.Stage#preprocess()
185 */
186 public final void preprocess() throws StageException {
187 if ( !preProcessed ) {
188 serviceTimeStatistics = new SynchronizedDescriptiveStatistics();
189 serviceTimeStatistics.setWindowSize(currentStatWindowSize);
190 innerPreprocess();
191 }
192 preProcessed = true;
193 }
194
195 public final void process( Object obj ) throws StageException {
196 objectsReceived.incrementAndGet();
197 StopWatch stopWatch = new StopWatch();
198 stopWatch.start();
199 try {
200 this.innerProcess( obj );
201 } catch (Exception e) {
202 // Hate to catch Exception, but don't want anything killing off the thread
203 // and hanging the pipeline.
204 log.error("Uncaught exception in " + className + ": " + e.getMessage(), e);
205 unhandledExceptions.incrementAndGet();
206 }
207 stopWatch.stop();
208
209 long totalTime = stopWatch.getTime();
210 totalServiceTime.addAndGet(totalTime);
211
212 // I hate to synchronize anything in the base class, but this
213 // call should be very fast and is not thread safe.
214 serviceTimeStatistics.addValue(totalTime);
215
216 // Use ThreadLocals so that the stats only reflect process
217 // calls that have completed. Otherwise the emit times can
218 // exceed the process times.
219 totalEmits.addAndGet(emitCount.get().intValue());
220 totalEmitTime.addAndGet(emitTotal.get().longValue());
221 emitCount.remove();
222 emitTotal.remove();
223
224 if (collectBranchStats) {
225 for (Map.Entry<String, AtomicLong> entry : threadLocalEmitBranchTime.get().entrySet()) {
226 if (emitTimeByBranch.containsKey(entry.getKey())) {
227 emitTimeByBranch.get(entry.getKey()).addAndGet(entry.getValue().longValue());
228 } else {
229 // Race condition. containsKey could return false and another thread could insert
230 // here. We can synchronize here and we will very rarely hit this block. Only the first
231 // time each queue is accessed.
232 synchronized (emitTimeByBranch) {
233 // Double check for the race condition.
234 if (emitTimeByBranch.containsKey(entry.getKey())) {
235 emitTimeByBranch.get(entry.getKey()).addAndGet(entry.getValue().longValue());
236 } else {
237 emitTimeByBranch.put(entry.getKey(), new AtomicLong(entry.getValue().longValue()));
238 }
239 }
240 }
241 }
242 threadLocalEmitBranchTime.remove();
243 }
244
245 if ( objectsReceived.longValue() % statusInterval == 0 ) {
246 logStatus();
247 }
248 }
249
250 /**
251 * Convenience method to feed the specified object to the next stage downstream.
252 */
253 public final void emit( Object obj ) {
254 if ( log.isDebugEnabled() ) {
255 log.debug( this.getClass() + " is emitting object " + obj );
256 }
257 if ( this.downstreamFeeder == null ) {
258 synchronized (this) {
259 // Lazy init the default feeder.
260 this.downstreamFeeder = stageContext.getDownstreamFeeder( this );
261 }
262 }
263 feed( DEFAULT_QUEUE_NAME, downstreamFeeder, obj );
264 }
265
266 /**
267 * Convenience method to feed the specified object to the first stage of the specified branch.
268 */
269 public final void emit( String branch, Object obj ) {
270 Feeder feeder = this.stageContext.getBranchFeeder( branch );
271 feed( branch, feeder, obj );
272 }
273
274 private void feed(String name, Feeder feeder, Object obj ) {
275 if ( feeder == null ) {
276 // The pipeline code should never allow this to happen.
277 String objectType = ( obj != null ? obj.getClass().getSimpleName() : "null" );
278 log.error( "Ignoring attempt to emit " + objectType +
279 " object to invalid feeder" );
280 return;
281 }
282 StopWatch emitWatch = new StopWatch();
283 emitWatch.start();
284
285 // Pass the emitted object to the next stage (default or branch)
286 feeder.feed( obj );
287
288 emitWatch.stop();
289
290 // Use ThreadLocal variables so the emit totals do not
291 // go up until the process call completes.
292 emitTotal.get().addAndGet( emitWatch.getTime() );
293 emitCount.get().incrementAndGet();
294
295 if (collectBranchStats) {
296 if (! threadLocalEmitBranchTime.get().containsKey(name)) {
297 AtomicLong currentTotal = new AtomicLong(emitWatch.getTime());
298 threadLocalEmitBranchTime.get().put(name, currentTotal);
299 } else {
300 threadLocalEmitBranchTime.get().get(name).addAndGet(emitWatch.getTime());
301 }
302 }
303 }
304
305 /**
306 * Called when a stage has completed all processing. Subclasses
307 * should use the innerPostprocess method, which is called by this method.
308 *
309 * @see org.apache.commons.pipeline.Stage#postprocess()
310 */
311 public final void postprocess() throws StageException {
312 if ( !postProcessed ) {
313 logStatus();
314 innerPostprocess();
315 }
316 postProcessed = true;
317 }
318
319 public void release() {
320 // No-op implementation to fulfill interface
321 }
322
323 public abstract void innerProcess( Object obj )
324 throws StageException;
325
326 public void innerPreprocess() throws StageException {
327 // No-op default implementation.
328 }
329
330 public void innerPostprocess() throws StageException {
331 // No-op default implementation.
332 }
333
334 /**
335 * Class-specific status message. Null or empty status' will be ignored.
336 */
337 public abstract String status();
338
339 public void logStatus() {
340 String logMessage = getStatusMessage();
341 log.info(logMessage);
342 }
343
344 /**
345 * @return Log message including both base stage and class specific stats.
346 */
347 public String getStatusMessage() {
348 StringBuilder sb = new StringBuilder( 512 );
349 NumberFormat formatter = floatFormatter.get();
350
351 float serviceTime = ( totalServiceTime.floatValue() / 1000.0f );
352 float emitTime = ( totalEmitTime.floatValue() / 1000.0f );
353 float netServiceTime = ( serviceTime - emitTime );
354
355 float emitPercentage = 0.0f;
356 float emitFloat = totalEmits.floatValue();
357 float recvFloat = objectsReceived.floatValue();
358 if (recvFloat > 0) {
359 emitPercentage = (emitFloat / recvFloat)*100.0f;
360 }
361
362 sb.append( "\n\t === " ).append( className ).append( " Generic Stats === " );
363
364 if (statusBatchSize > 1) {
365 sb.append("\n\tStatus Batch Size (all /obj and /sec include this): ").append(statusBatchSize);
366 }
367
368 sb.append( "\n\tTotal objects received:" ).append( objectsReceived );
369 sb.append( "\n\tTotal unhandled exceptions:" ).append( unhandledExceptions );
370 sb.append( "\n\tTotal objects emitted:" ).append( totalEmits );
371 if (emitFloat > 0) {
372 sb.append(" (").append(formatter.format(emitPercentage)).append("%)");
373 }
374 sb.append( "\n\tTotal gross processing time (sec):" )
375 .append( formatter.format( serviceTime ) );
376 sb.append( "\n\tTotal emit blocked time (sec):" )
377 .append( formatter.format( emitTime ) );
378 sb.append( "\n\tTotal net processing time (sec):" )
379 .append( formatter.format( netServiceTime ) );
380
381 float avgServiceTime = 0;
382 float avgEmitTime = 0;
383 float avgNetServiceTime = 0;
384 if ( objectsReceived.longValue() > 0 ) {
385 avgServiceTime = ( serviceTime / objectsReceived.floatValue()/statusBatchSize );
386 avgEmitTime = ( emitTime / objectsReceived.floatValue()/statusBatchSize );
387 avgNetServiceTime = ( netServiceTime / objectsReceived.floatValue()/statusBatchSize );
388 }
389
390 sb.append( "\n\tAverage gross processing time (sec/obj):" )
391 .append( formatter.format( avgServiceTime ) );
392 sb.append( "\n\tAverage emit blocked time (sec/obj):" )
393 .append( formatter.format( avgEmitTime ) );
394 sb.append( "\n\tAverage net processing time (sec/obj):" )
395 .append( formatter.format( avgNetServiceTime ) );
396
397 if (serviceTimeStatistics != null) {
398 long count = serviceTimeStatistics.getN();
399 if (count > 0) {
400 double avgMillis = getCurrentServiceTimeAverage()/(float)statusBatchSize;
401 sb.append( "\n\tAverage gross processing time in last ")
402 .append(count)
403 .append(" (sec/obj):" )
404 .append( formatter.format( avgMillis/1000 ) );
405 }
406 }
407
408 float grossThroughput = 0;
409 if ( avgServiceTime > 0 ) {
410 grossThroughput = ( 1.0f / avgServiceTime );
411 }
412 float netThroughput = 0;
413 if ( avgNetServiceTime > 0 ) {
414 netThroughput = ( 1.0f / avgNetServiceTime );
415 }
416
417 sb.append( "\n\tGross throughput (obj/sec):" )
418 .append( formatter.format( grossThroughput ) );
419 sb.append( "\n\tNet throughput (obj/sec):" )
420 .append( formatter.format( netThroughput ) );
421
422 float percWorking = 0;
423 float percBlocking = 0;
424 if ( serviceTime > 0 ) {
425 percWorking = ( netServiceTime / serviceTime ) * 100;
426 percBlocking = ( emitTime / serviceTime ) * 100;
427 }
428
429 sb.append( "\n\t% time working:" ).append( formatter.format( percWorking ) );
430 sb.append( "\n\t% time blocking:" ).append( formatter.format( percBlocking ) );
431
432 // No need to output for a non-branching stage or if there was very little
433 // blocking (as defined in the constant)
434 if (collectBranchStats && emitTimeByBranch.size() > 1 && percBlocking >= BRANCH_BLOCK_THRESHOLD) {
435 try {
436 for (Map.Entry<String, AtomicLong> entry : emitTimeByBranch.entrySet()) {
437 float branchBlockSec = (entry.getValue().floatValue()/1000.0f);
438 float branchBlockPerc = (branchBlockSec/emitTime) * 100;
439 sb.append("\n\t\t% branch ").append(entry.getKey()).append(":").append(formatter.format(branchBlockPerc));
440 }
441 } catch (RuntimeException e) {
442 // Synchronizing would be slow, ConcurrentMod is possible but unlikely since the map is
443 // only modified the first time a stage is emitted to so just catch and
444 // log it. No need to stop all processing over a reporting failure.
445 sb.append("\n\t\tproblem getting per-branch stats: ").append(e.getMessage());
446 }
447 }
448
449 String stageSpecificStatus = this.status();
450 if ( stageSpecificStatus != null && stageSpecificStatus.length() > 0 ) {
451 sb.append( "\n\t === " )
452 .append( className )
453 .append( " Specific Stats === " );
454 sb.append( stageSpecificStatus );
455 }
456
457 return sb.toString();
458 }
459
460 protected String formatTotalTimeStat( String name, AtomicLong totalTime ) {
461 return formatTotalTimeStat( name, totalTime.longValue() );
462 }
463
464 protected String formatTotalTimeStat( String name, long totalTime ) {
465 if ( name == null || totalTime < 0 ) {
466 return "";
467 }
468 NumberFormat formatter = floatFormatter.get();
469 StringBuilder sb = new StringBuilder();
470 // Total processing time minus calls to emit.
471 float totalSec = totalTime / 1000.0f;
472 float average = 0;
473 if ( getObjectsReceived() > 0 ) {
474 average = totalSec / getObjectsReceived() / (float)statusBatchSize;
475 }
476
477 if ( log.isDebugEnabled() ) {
478 sb.append( "\n\tTotal " )
479 .append( name )
480 .append( " processing time (sec):" )
481 .append( formatter.format( totalSec ) );
482 }
483
484 sb.append( "\n\tAverage " )
485 .append( name )
486 .append( " processing time (sec/obj):" )
487 .append( formatter.format( average ) );
488
489 if ( log.isDebugEnabled() && average > 0 ) {
490 float throughput = ( 1.0f ) / average * (float)statusBatchSize;
491 sb.append( "\n\tThroughput for " )
492 .append( name )
493 .append( " (obj/sec):" )
494 .append( formatter.format( throughput ) );
495 }
496
497 return sb.toString();
498 }
499
500 protected String formatCounterStat( String name, AtomicInteger count ) {
501 return formatCounterStat(name, count.get());
502 }
503
504 protected String formatCounterStat( String name, AtomicLong count ) {
505 return formatCounterStat(name, count.get());
506 }
507
508 protected String formatCounterStat( String name, long count ) {
509 if ( name == null || count < 0 || getObjectsReceived() <= 0) {
510 return "";
511 }
512 NumberFormat formatter = floatFormatter.get();
513 StringBuilder sb = new StringBuilder();
514
515 float perc = ((float)count*(float)statusBatchSize/(float)getObjectsReceived())*100.0f;
516
517 sb.append( "\n\tNumber of " )
518 .append( name )
519 .append( " (" )
520 .append( formatter.format(perc) )
521 .append( "%) :")
522 .append( count );
523
524 return sb.toString();
525 }
526
527 /**
528 * @see org.apache.commons.pipeline.ExtendedBaseStageMBean#getStatusInterval()
529 */
530 public Long getStatusInterval() {
531 return Long.valueOf(statusInterval);
532 }
533
534 /**
535 * @see org.apache.commons.pipeline.ExtendedBaseStageMBean#setStatusInterval(long)
536 */
537 public void setStatusInterval( Long statusInterval ) {
538 this.statusInterval = statusInterval;
539 }
540
541 public Integer getStatusBatchSize() {
542 return statusBatchSize;
543 }
544
545 public void setStatusBatchSize(Integer statusBatchSize) {
546 this.statusBatchSize = statusBatchSize;
547 }
548
549 /**
550 * @see org.apache.commons.pipeline.ExtendedBaseStageMBean#getObjectsReceived()
551 */
552 public long getObjectsReceived() {
553 return objectsReceived.longValue();
554 }
555
556 /**
557 * @see org.apache.commons.pipeline.ExtendedBaseStageMBean#getTotalServiceTime()
558 */
559 public long getTotalServiceTime() {
560 return totalServiceTime.longValue();
561 }
562
563 /**
564 * @see org.apache.commons.pipeline.ExtendedBaseStageMBean#getTotalEmitTime()
565 */
566 public long getTotalEmitTime() {
567 return totalEmitTime.longValue();
568 }
569
570 /**
571 * @see org.apache.commons.pipeline.ExtendedBaseStageMBean#getTotalEmits()
572 */
573 public long getTotalEmits() {
574 return totalEmits.longValue();
575 }
576
577 /**
578 * @see org.apache.commons.pipeline.ExtendedBaseStageMBean#getCollectBranchStats()
579 */
580 public Boolean getCollectBranchStats() {
581 return collectBranchStats;
582 }
583
584 /**
585 * @see org.apache.commons.pipeline.ExtendedBaseStageMBean#setCollectBranchStats(Boolean)
586 */
587 public void setCollectBranchStats(Boolean collectBranchStats) {
588 this.collectBranchStats = collectBranchStats;
589 }
590
591 public Integer getCurrentStatWindowSize() {
592 return Integer.valueOf(currentStatWindowSize);
593 }
594
595 public void setCurrentStatWindowSize(Integer newStatWindowSize) {
596 if (serviceTimeStatistics != null
597 && newStatWindowSize != this.currentStatWindowSize) {
598 synchronized (serviceTimeStatistics) {
599 serviceTimeStatistics.setWindowSize(newStatWindowSize);
600 }
601 }
602 this.currentStatWindowSize = newStatWindowSize;
603 }
604
605 public String getStageName() {
606 return stageName;
607 }
608
609 public void setStageName(String name) {
610 this.stageName = name;
611 }
612
613 public boolean isJmxEnabled() {
614 return jmxEnabled;
615 }
616
617 public void setJmxEnabled(boolean jmxEnabled) {
618 this.jmxEnabled = jmxEnabled;
619 }
620
621 /**
622 * Returns a moving average of the service time. This does not yet take into account time spent
623 * calling emit, nor does it return minimum, maximum or other statistical information at this time.
624 *
625 * @return Average time to process the last <code>currentStatWindowSize</code> objects.
626 */
627 public double getCurrentServiceTimeAverage() {
628 double avg = -1.0d;
629
630 // Hate to synchronize in the base class, but this should be very quick.
631 avg = serviceTimeStatistics.getMean();
632
633 return avg;
634 }
635 }