001 /* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.commons.pipeline.stage; 018 019 import java.lang.management.ManagementFactory; 020 import java.text.DecimalFormat; 021 import java.text.NumberFormat; 022 import java.util.HashMap; 023 import java.util.Map; 024 import java.util.concurrent.atomic.AtomicInteger; 025 import java.util.concurrent.atomic.AtomicLong; 026 027 import javax.management.MBeanServer; 028 import javax.management.ObjectName; 029 import javax.management.StandardMBean; 030 031 import org.apache.commons.lang.time.StopWatch; 032 import org.apache.commons.logging.Log; 033 import org.apache.commons.logging.LogFactory; 034 import org.apache.commons.math.stat.descriptive.SynchronizedDescriptiveStatistics; 035 import org.apache.commons.pipeline.Feeder; 036 import org.apache.commons.pipeline.Stage; 037 import org.apache.commons.pipeline.StageContext; 038 import org.apache.commons.pipeline.StageException; 039 040 /** 041 * Base class for pipeline stages. Keeps track of performance statistics and allows 042 * for collection and adjustment via JMX (optional) 043 * 044 * Cannot extend BaseStage because it marks the emit methods as final. 045 * 046 * @author mzsanford 047 */ 048 public abstract class ExtendedBaseStage implements Stage, ExtendedBaseStageMBean { 049 /** Minimum percentage of blocking before we report per-branch stats. */ 050 private static final float BRANCH_BLOCK_THRESHOLD = 1.0f; 051 /** Default size of the moving-window average statistics */ 052 private static final int DEFAULT_DESCRIPTIVE_STATS_WINDOW_SIZE = 100; 053 /** Default queue name when reporting statistics */ 054 private static final String DEFAULT_QUEUE_NAME = "[DefaultQueue]"; 055 /** Default number of objects after which a status message is logged */ 056 private static final int DEFAULT_STATUS_INTERVAL = 1000; 057 protected final Log log = LogFactory.getLog( getClass() ); 058 059 protected StageContext stageContext; 060 private Feeder downstreamFeeder; 061 private String stageName; 062 private final AtomicLong objectsReceived = new AtomicLong(0); 063 private final AtomicLong unhandledExceptions = new AtomicLong(0); 064 private final AtomicLong totalServiceTime = new AtomicLong(0); 065 private final AtomicLong totalEmitTime = new AtomicLong(0); 066 private final AtomicLong totalEmits = new AtomicLong(0); 067 private final Map<String, AtomicLong> emitTimeByBranch = new HashMap<String, AtomicLong>(); 068 private int currentStatWindowSize = DEFAULT_DESCRIPTIVE_STATS_WINDOW_SIZE; 069 private SynchronizedDescriptiveStatistics serviceTimeStatistics; 070 private long statusInterval = DEFAULT_STATUS_INTERVAL; 071 private Integer statusBatchSize = 1; 072 private boolean collectBranchStats = false; 073 private boolean preProcessed = false; // prevent recursion. 074 private boolean postProcessed = false; // prevent recursion. 075 private boolean jmxEnabled = true; 076 077 /** 078 * Class name for status message. Needed because java.util.logging only 079 * reports the base class name. 080 */ 081 private final String className = getClass().getSimpleName(); 082 083 /** 084 * ThreadLocal sum of time spent waiting on blocked queues during the current process call. 085 */ 086 protected static ThreadLocal<AtomicLong> emitTotal = new ThreadLocal<AtomicLong>() { 087 protected synchronized AtomicLong initialValue() { 088 return new AtomicLong(); 089 } 090 }; 091 092 /** 093 * ThreadLocal sum of time spent waiting on blocked queues during the current process call by queue name. 094 */ 095 protected static ThreadLocal<Map<String, AtomicLong>> threadLocalEmitBranchTime = new ThreadLocal<Map<String, AtomicLong>>() { 096 protected synchronized Map<String, AtomicLong> initialValue() { 097 return new HashMap<String, AtomicLong>(); 098 } 099 }; 100 101 /** 102 * ThreadLocal count of emit calls during the current process call. 103 */ 104 protected static ThreadLocal<AtomicInteger> emitCount = new ThreadLocal<AtomicInteger>() { 105 protected synchronized AtomicInteger initialValue() { 106 return new AtomicInteger(); 107 } 108 }; 109 110 /** 111 * ThreadLocal formatter since they are not thread safe. 112 */ 113 protected static ThreadLocal<NumberFormat> floatFormatter = new ThreadLocal<NumberFormat>() { 114 protected synchronized NumberFormat initialValue() { 115 return new DecimalFormat("##0.000"); 116 } 117 }; 118 119 public ExtendedBaseStage() { 120 // Empty constructor provided for future use. 121 } 122 123 public void init( StageContext context ) { 124 this.stageContext = context; 125 if (jmxEnabled) { 126 enableJMX(context); 127 } 128 } 129 130 @SuppressWarnings("unchecked") 131 private final void enableJMX(StageContext context) { 132 MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); 133 if (mbs != null) { 134 // Try to build a unique JMX name. 135 StringBuilder sb = new StringBuilder("org.apache.commons.pipeline:"); 136 sb.append("class=").append(className); 137 if (stageName != null) { 138 sb.append(",name=").append(stageName); 139 } 140 141 try { 142 ObjectName name = new ObjectName(sb.toString()); 143 if (mbs.isRegistered(name)) { 144 log.info("JMX Overlap. Multiple instances of '" + name + "'. Only one will be registered."); 145 } else { 146 Class mbeanInterface = ExtendedBaseStageMBean.class; 147 try { 148 // Find out if the stage has a more specific MBean. Reflection can be slow 149 // but this operation is pretty fast. Not to mention it's only done at startup. 150 Class[] interfaces = getClass().getInterfaces(); 151 for (int i=0 ; i < interfaces.length; i++) { 152 Class current = interfaces[i]; 153 // Only use interfaces that extend ExtendedBaseStageMBean to maintain a minimum 154 // amount of functionality. 155 if (current != ExtendedBaseStageMBean.class 156 && ExtendedBaseStageMBean.class.isAssignableFrom(current)) { 157 mbeanInterface = current; 158 break; 159 } 160 } 161 } catch (Exception e) { 162 // In the event of security or cast exceptions, default back to base. 163 log.info("Reflection error while checking for JMX interfaces."); 164 // Reset in the off chance it got hosed. 165 mbeanInterface = ExtendedBaseStageMBean.class; 166 } 167 168 StandardMBean mbean = new StandardMBean(this, 169 mbeanInterface); 170 mbs.registerMBean(mbean, name); 171 log.info("JMX MBean registered: " + name.toString() + " (" + mbeanInterface.getSimpleName() + ")"); 172 } 173 } catch (Exception e) { 174 log.warn("Failed to register with JMX server",e); 175 } 176 } 177 } 178 179 /** 180 * Called when a stage has been created but before the first object is 181 * sent to the stage for processing. Subclasses 182 * should use the innerPreprocess method, which is called by this method. 183 * 184 * @see org.apache.commons.pipeline.Stage#preprocess() 185 */ 186 public final void preprocess() throws StageException { 187 if ( !preProcessed ) { 188 serviceTimeStatistics = new SynchronizedDescriptiveStatistics(); 189 serviceTimeStatistics.setWindowSize(currentStatWindowSize); 190 innerPreprocess(); 191 } 192 preProcessed = true; 193 } 194 195 public final void process( Object obj ) throws StageException { 196 objectsReceived.incrementAndGet(); 197 StopWatch stopWatch = new StopWatch(); 198 stopWatch.start(); 199 try { 200 this.innerProcess( obj ); 201 } catch (Exception e) { 202 // Hate to catch Exception, but don't want anything killing off the thread 203 // and hanging the pipeline. 204 log.error("Uncaught exception in " + className + ": " + e.getMessage(), e); 205 unhandledExceptions.incrementAndGet(); 206 } 207 stopWatch.stop(); 208 209 long totalTime = stopWatch.getTime(); 210 totalServiceTime.addAndGet(totalTime); 211 212 // I hate to synchronize anything in the base class, but this 213 // call should be very fast and is not thread safe. 214 serviceTimeStatistics.addValue(totalTime); 215 216 // Use ThreadLocals so that the stats only reflect process 217 // calls that have completed. Otherwise the emit times can 218 // exceed the process times. 219 totalEmits.addAndGet(emitCount.get().intValue()); 220 totalEmitTime.addAndGet(emitTotal.get().longValue()); 221 emitCount.remove(); 222 emitTotal.remove(); 223 224 if (collectBranchStats) { 225 for (Map.Entry<String, AtomicLong> entry : threadLocalEmitBranchTime.get().entrySet()) { 226 if (emitTimeByBranch.containsKey(entry.getKey())) { 227 emitTimeByBranch.get(entry.getKey()).addAndGet(entry.getValue().longValue()); 228 } else { 229 // Race condition. containsKey could return false and another thread could insert 230 // here. We can synchronize here and we will very rarely hit this block. Only the first 231 // time each queue is accessed. 232 synchronized (emitTimeByBranch) { 233 // Double check for the race condition. 234 if (emitTimeByBranch.containsKey(entry.getKey())) { 235 emitTimeByBranch.get(entry.getKey()).addAndGet(entry.getValue().longValue()); 236 } else { 237 emitTimeByBranch.put(entry.getKey(), new AtomicLong(entry.getValue().longValue())); 238 } 239 } 240 } 241 } 242 threadLocalEmitBranchTime.remove(); 243 } 244 245 if ( objectsReceived.longValue() % statusInterval == 0 ) { 246 logStatus(); 247 } 248 } 249 250 /** 251 * Convenience method to feed the specified object to the next stage downstream. 252 */ 253 public final void emit( Object obj ) { 254 if ( log.isDebugEnabled() ) { 255 log.debug( this.getClass() + " is emitting object " + obj ); 256 } 257 if ( this.downstreamFeeder == null ) { 258 synchronized (this) { 259 // Lazy init the default feeder. 260 this.downstreamFeeder = stageContext.getDownstreamFeeder( this ); 261 } 262 } 263 feed( DEFAULT_QUEUE_NAME, downstreamFeeder, obj ); 264 } 265 266 /** 267 * Convenience method to feed the specified object to the first stage of the specified branch. 268 */ 269 public final void emit( String branch, Object obj ) { 270 Feeder feeder = this.stageContext.getBranchFeeder( branch ); 271 feed( branch, feeder, obj ); 272 } 273 274 private void feed(String name, Feeder feeder, Object obj ) { 275 if ( feeder == null ) { 276 // The pipeline code should never allow this to happen. 277 String objectType = ( obj != null ? obj.getClass().getSimpleName() : "null" ); 278 log.error( "Ignoring attempt to emit " + objectType + 279 " object to invalid feeder" ); 280 return; 281 } 282 StopWatch emitWatch = new StopWatch(); 283 emitWatch.start(); 284 285 // Pass the emitted object to the next stage (default or branch) 286 feeder.feed( obj ); 287 288 emitWatch.stop(); 289 290 // Use ThreadLocal variables so the emit totals do not 291 // go up until the process call completes. 292 emitTotal.get().addAndGet( emitWatch.getTime() ); 293 emitCount.get().incrementAndGet(); 294 295 if (collectBranchStats) { 296 if (! threadLocalEmitBranchTime.get().containsKey(name)) { 297 AtomicLong currentTotal = new AtomicLong(emitWatch.getTime()); 298 threadLocalEmitBranchTime.get().put(name, currentTotal); 299 } else { 300 threadLocalEmitBranchTime.get().get(name).addAndGet(emitWatch.getTime()); 301 } 302 } 303 } 304 305 /** 306 * Called when a stage has completed all processing. Subclasses 307 * should use the innerPostprocess method, which is called by this method. 308 * 309 * @see org.apache.commons.pipeline.Stage#postprocess() 310 */ 311 public final void postprocess() throws StageException { 312 if ( !postProcessed ) { 313 logStatus(); 314 innerPostprocess(); 315 } 316 postProcessed = true; 317 } 318 319 public void release() { 320 // No-op implementation to fulfill interface 321 } 322 323 public abstract void innerProcess( Object obj ) 324 throws StageException; 325 326 public void innerPreprocess() throws StageException { 327 // No-op default implementation. 328 } 329 330 public void innerPostprocess() throws StageException { 331 // No-op default implementation. 332 } 333 334 /** 335 * Class-specific status message. Null or empty status' will be ignored. 336 */ 337 public abstract String status(); 338 339 public void logStatus() { 340 String logMessage = getStatusMessage(); 341 log.info(logMessage); 342 } 343 344 /** 345 * @return Log message including both base stage and class specific stats. 346 */ 347 public String getStatusMessage() { 348 StringBuilder sb = new StringBuilder( 512 ); 349 NumberFormat formatter = floatFormatter.get(); 350 351 float serviceTime = ( totalServiceTime.floatValue() / 1000.0f ); 352 float emitTime = ( totalEmitTime.floatValue() / 1000.0f ); 353 float netServiceTime = ( serviceTime - emitTime ); 354 355 float emitPercentage = 0.0f; 356 float emitFloat = totalEmits.floatValue(); 357 float recvFloat = objectsReceived.floatValue(); 358 if (recvFloat > 0) { 359 emitPercentage = (emitFloat / recvFloat)*100.0f; 360 } 361 362 sb.append( "\n\t === " ).append( className ).append( " Generic Stats === " ); 363 364 if (statusBatchSize > 1) { 365 sb.append("\n\tStatus Batch Size (all /obj and /sec include this): ").append(statusBatchSize); 366 } 367 368 sb.append( "\n\tTotal objects received:" ).append( objectsReceived ); 369 sb.append( "\n\tTotal unhandled exceptions:" ).append( unhandledExceptions ); 370 sb.append( "\n\tTotal objects emitted:" ).append( totalEmits ); 371 if (emitFloat > 0) { 372 sb.append(" (").append(formatter.format(emitPercentage)).append("%)"); 373 } 374 sb.append( "\n\tTotal gross processing time (sec):" ) 375 .append( formatter.format( serviceTime ) ); 376 sb.append( "\n\tTotal emit blocked time (sec):" ) 377 .append( formatter.format( emitTime ) ); 378 sb.append( "\n\tTotal net processing time (sec):" ) 379 .append( formatter.format( netServiceTime ) ); 380 381 float avgServiceTime = 0; 382 float avgEmitTime = 0; 383 float avgNetServiceTime = 0; 384 if ( objectsReceived.longValue() > 0 ) { 385 avgServiceTime = ( serviceTime / objectsReceived.floatValue()/statusBatchSize ); 386 avgEmitTime = ( emitTime / objectsReceived.floatValue()/statusBatchSize ); 387 avgNetServiceTime = ( netServiceTime / objectsReceived.floatValue()/statusBatchSize ); 388 } 389 390 sb.append( "\n\tAverage gross processing time (sec/obj):" ) 391 .append( formatter.format( avgServiceTime ) ); 392 sb.append( "\n\tAverage emit blocked time (sec/obj):" ) 393 .append( formatter.format( avgEmitTime ) ); 394 sb.append( "\n\tAverage net processing time (sec/obj):" ) 395 .append( formatter.format( avgNetServiceTime ) ); 396 397 if (serviceTimeStatistics != null) { 398 long count = serviceTimeStatistics.getN(); 399 if (count > 0) { 400 double avgMillis = getCurrentServiceTimeAverage()/(float)statusBatchSize; 401 sb.append( "\n\tAverage gross processing time in last ") 402 .append(count) 403 .append(" (sec/obj):" ) 404 .append( formatter.format( avgMillis/1000 ) ); 405 } 406 } 407 408 float grossThroughput = 0; 409 if ( avgServiceTime > 0 ) { 410 grossThroughput = ( 1.0f / avgServiceTime ); 411 } 412 float netThroughput = 0; 413 if ( avgNetServiceTime > 0 ) { 414 netThroughput = ( 1.0f / avgNetServiceTime ); 415 } 416 417 sb.append( "\n\tGross throughput (obj/sec):" ) 418 .append( formatter.format( grossThroughput ) ); 419 sb.append( "\n\tNet throughput (obj/sec):" ) 420 .append( formatter.format( netThroughput ) ); 421 422 float percWorking = 0; 423 float percBlocking = 0; 424 if ( serviceTime > 0 ) { 425 percWorking = ( netServiceTime / serviceTime ) * 100; 426 percBlocking = ( emitTime / serviceTime ) * 100; 427 } 428 429 sb.append( "\n\t% time working:" ).append( formatter.format( percWorking ) ); 430 sb.append( "\n\t% time blocking:" ).append( formatter.format( percBlocking ) ); 431 432 // No need to output for a non-branching stage or if there was very little 433 // blocking (as defined in the constant) 434 if (collectBranchStats && emitTimeByBranch.size() > 1 && percBlocking >= BRANCH_BLOCK_THRESHOLD) { 435 try { 436 for (Map.Entry<String, AtomicLong> entry : emitTimeByBranch.entrySet()) { 437 float branchBlockSec = (entry.getValue().floatValue()/1000.0f); 438 float branchBlockPerc = (branchBlockSec/emitTime) * 100; 439 sb.append("\n\t\t% branch ").append(entry.getKey()).append(":").append(formatter.format(branchBlockPerc)); 440 } 441 } catch (RuntimeException e) { 442 // Synchronizing would be slow, ConcurrentMod is possible but unlikely since the map is 443 // only modified the first time a stage is emitted to so just catch and 444 // log it. No need to stop all processing over a reporting failure. 445 sb.append("\n\t\tproblem getting per-branch stats: ").append(e.getMessage()); 446 } 447 } 448 449 String stageSpecificStatus = this.status(); 450 if ( stageSpecificStatus != null && stageSpecificStatus.length() > 0 ) { 451 sb.append( "\n\t === " ) 452 .append( className ) 453 .append( " Specific Stats === " ); 454 sb.append( stageSpecificStatus ); 455 } 456 457 return sb.toString(); 458 } 459 460 protected String formatTotalTimeStat( String name, AtomicLong totalTime ) { 461 return formatTotalTimeStat( name, totalTime.longValue() ); 462 } 463 464 protected String formatTotalTimeStat( String name, long totalTime ) { 465 if ( name == null || totalTime < 0 ) { 466 return ""; 467 } 468 NumberFormat formatter = floatFormatter.get(); 469 StringBuilder sb = new StringBuilder(); 470 // Total processing time minus calls to emit. 471 float totalSec = totalTime / 1000.0f; 472 float average = 0; 473 if ( getObjectsReceived() > 0 ) { 474 average = totalSec / getObjectsReceived() / (float)statusBatchSize; 475 } 476 477 if ( log.isDebugEnabled() ) { 478 sb.append( "\n\tTotal " ) 479 .append( name ) 480 .append( " processing time (sec):" ) 481 .append( formatter.format( totalSec ) ); 482 } 483 484 sb.append( "\n\tAverage " ) 485 .append( name ) 486 .append( " processing time (sec/obj):" ) 487 .append( formatter.format( average ) ); 488 489 if ( log.isDebugEnabled() && average > 0 ) { 490 float throughput = ( 1.0f ) / average * (float)statusBatchSize; 491 sb.append( "\n\tThroughput for " ) 492 .append( name ) 493 .append( " (obj/sec):" ) 494 .append( formatter.format( throughput ) ); 495 } 496 497 return sb.toString(); 498 } 499 500 protected String formatCounterStat( String name, AtomicInteger count ) { 501 return formatCounterStat(name, count.get()); 502 } 503 504 protected String formatCounterStat( String name, AtomicLong count ) { 505 return formatCounterStat(name, count.get()); 506 } 507 508 protected String formatCounterStat( String name, long count ) { 509 if ( name == null || count < 0 || getObjectsReceived() <= 0) { 510 return ""; 511 } 512 NumberFormat formatter = floatFormatter.get(); 513 StringBuilder sb = new StringBuilder(); 514 515 float perc = ((float)count*(float)statusBatchSize/(float)getObjectsReceived())*100.0f; 516 517 sb.append( "\n\tNumber of " ) 518 .append( name ) 519 .append( " (" ) 520 .append( formatter.format(perc) ) 521 .append( "%) :") 522 .append( count ); 523 524 return sb.toString(); 525 } 526 527 /** 528 * @see org.apache.commons.pipeline.ExtendedBaseStageMBean#getStatusInterval() 529 */ 530 public Long getStatusInterval() { 531 return Long.valueOf(statusInterval); 532 } 533 534 /** 535 * @see org.apache.commons.pipeline.ExtendedBaseStageMBean#setStatusInterval(long) 536 */ 537 public void setStatusInterval( Long statusInterval ) { 538 this.statusInterval = statusInterval; 539 } 540 541 public Integer getStatusBatchSize() { 542 return statusBatchSize; 543 } 544 545 public void setStatusBatchSize(Integer statusBatchSize) { 546 this.statusBatchSize = statusBatchSize; 547 } 548 549 /** 550 * @see org.apache.commons.pipeline.ExtendedBaseStageMBean#getObjectsReceived() 551 */ 552 public long getObjectsReceived() { 553 return objectsReceived.longValue(); 554 } 555 556 /** 557 * @see org.apache.commons.pipeline.ExtendedBaseStageMBean#getTotalServiceTime() 558 */ 559 public long getTotalServiceTime() { 560 return totalServiceTime.longValue(); 561 } 562 563 /** 564 * @see org.apache.commons.pipeline.ExtendedBaseStageMBean#getTotalEmitTime() 565 */ 566 public long getTotalEmitTime() { 567 return totalEmitTime.longValue(); 568 } 569 570 /** 571 * @see org.apache.commons.pipeline.ExtendedBaseStageMBean#getTotalEmits() 572 */ 573 public long getTotalEmits() { 574 return totalEmits.longValue(); 575 } 576 577 /** 578 * @see org.apache.commons.pipeline.ExtendedBaseStageMBean#getCollectBranchStats() 579 */ 580 public Boolean getCollectBranchStats() { 581 return collectBranchStats; 582 } 583 584 /** 585 * @see org.apache.commons.pipeline.ExtendedBaseStageMBean#setCollectBranchStats(Boolean) 586 */ 587 public void setCollectBranchStats(Boolean collectBranchStats) { 588 this.collectBranchStats = collectBranchStats; 589 } 590 591 public Integer getCurrentStatWindowSize() { 592 return Integer.valueOf(currentStatWindowSize); 593 } 594 595 public void setCurrentStatWindowSize(Integer newStatWindowSize) { 596 if (serviceTimeStatistics != null 597 && newStatWindowSize != this.currentStatWindowSize) { 598 synchronized (serviceTimeStatistics) { 599 serviceTimeStatistics.setWindowSize(newStatWindowSize); 600 } 601 } 602 this.currentStatWindowSize = newStatWindowSize; 603 } 604 605 public String getStageName() { 606 return stageName; 607 } 608 609 public void setStageName(String name) { 610 this.stageName = name; 611 } 612 613 public boolean isJmxEnabled() { 614 return jmxEnabled; 615 } 616 617 public void setJmxEnabled(boolean jmxEnabled) { 618 this.jmxEnabled = jmxEnabled; 619 } 620 621 /** 622 * Returns a moving average of the service time. This does not yet take into account time spent 623 * calling emit, nor does it return minimum, maximum or other statistical information at this time. 624 * 625 * @return Average time to process the last <code>currentStatWindowSize</code> objects. 626 */ 627 public double getCurrentServiceTimeAverage() { 628 double avg = -1.0d; 629 630 // Hate to synchronize in the base class, but this should be very quick. 631 avg = serviceTimeStatistics.getMean(); 632 633 return avg; 634 } 635 }