View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.commons.pipeline.stage;
18  
19  import java.lang.management.ManagementFactory;
20  import java.text.DecimalFormat;
21  import java.text.NumberFormat;
22  import java.util.HashMap;
23  import java.util.Map;
24  import java.util.concurrent.atomic.AtomicInteger;
25  import java.util.concurrent.atomic.AtomicLong;
26  
27  import javax.management.MBeanServer;
28  import javax.management.ObjectName;
29  import javax.management.StandardMBean;
30  
31  import org.apache.commons.lang.time.StopWatch;
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.commons.math.stat.descriptive.SynchronizedDescriptiveStatistics;
35  import org.apache.commons.pipeline.Feeder;
36  import org.apache.commons.pipeline.Stage;
37  import org.apache.commons.pipeline.StageContext;
38  import org.apache.commons.pipeline.StageException;
39  
40  /**
41   * Base class for pipeline stages. Keeps track of performance statistics and allows
42   * for collection and adjustment via JMX (optional)
43   *
44   * Cannot extend BaseStage because it marks the emit methods as final.
45   *
46   * @author mzsanford
47   */
48  public abstract class ExtendedBaseStage implements Stage, ExtendedBaseStageMBean {
49      /**  Minimum percentage of blocking before we report per-branch stats. */
50      private static final float BRANCH_BLOCK_THRESHOLD = 1.0f;
51      /** Default size of the moving-window average statistics */
52      private static final int DEFAULT_DESCRIPTIVE_STATS_WINDOW_SIZE = 100;
53      /** Default queue name when reporting statistics */
54      private static final String DEFAULT_QUEUE_NAME = "[DefaultQueue]";
55      /** Default number of objects after which a status message is logged */
56      private static final int DEFAULT_STATUS_INTERVAL = 1000;
57      protected final Log log = LogFactory.getLog( getClass() );
58  
59      protected StageContext stageContext;
60      private Feeder downstreamFeeder;
61      private String stageName;
62      private final AtomicLong objectsReceived = new AtomicLong(0);
63      private final AtomicLong unhandledExceptions = new AtomicLong(0);
64      private final AtomicLong totalServiceTime = new AtomicLong(0);
65      private final AtomicLong totalEmitTime = new AtomicLong(0);
66      private final AtomicLong totalEmits = new AtomicLong(0);
67      private final Map<String, AtomicLong> emitTimeByBranch = new HashMap<String, AtomicLong>();
68      private int currentStatWindowSize = DEFAULT_DESCRIPTIVE_STATS_WINDOW_SIZE;
69      private SynchronizedDescriptiveStatistics serviceTimeStatistics;
70      private long statusInterval = DEFAULT_STATUS_INTERVAL;
71      private Integer statusBatchSize = 1;
72      private boolean collectBranchStats = false;
73      private boolean preProcessed = false; // prevent recursion.
74      private boolean postProcessed = false; // prevent recursion.
75      private boolean jmxEnabled = true;
76  
77      /**
78       * Class name for status message. Needed because java.util.logging only
79       * reports the base class name.
80       */
81      private final String className = getClass().getSimpleName();
82  
83      /**
84       * ThreadLocal sum of time spent waiting on blocked queues during the current process call.
85       */
86      protected static ThreadLocal<AtomicLong> emitTotal = new ThreadLocal<AtomicLong>() {
87          protected synchronized AtomicLong initialValue() {
88              return new AtomicLong();
89          }
90      };
91  
92      /**
93       * ThreadLocal sum of time spent waiting on blocked queues during the current process call by queue name.
94       */
95      protected static ThreadLocal<Map<String, AtomicLong>> threadLocalEmitBranchTime = new ThreadLocal<Map<String, AtomicLong>>() {
96          protected synchronized Map<String, AtomicLong> initialValue() {
97              return new HashMap<String, AtomicLong>();
98          }
99      };
100 
101     /**
102      * ThreadLocal count of emit calls during the current process call.
103      */
104     protected static ThreadLocal<AtomicInteger> emitCount = new ThreadLocal<AtomicInteger>() {
105         protected synchronized AtomicInteger initialValue() {
106             return new AtomicInteger();
107         }
108     };
109 
110     /**
111      * ThreadLocal formatter since they are not thread safe.
112      */
113     protected static ThreadLocal<NumberFormat> floatFormatter = new ThreadLocal<NumberFormat>() {
114         protected synchronized NumberFormat initialValue() {
115             return new DecimalFormat("##0.000");
116         }
117     };
118 
119     public ExtendedBaseStage() {
120         // Empty constructor provided for future use.
121     }
122 
123     public void init( StageContext context ) {
124         this.stageContext = context;
125         if (jmxEnabled) {
126             enableJMX(context);
127         }
128     }
129 
130     @SuppressWarnings("unchecked")
131     private final void enableJMX(StageContext context) {
132         MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
133         if (mbs != null) {
134             // Try to build a unique JMX name.
135             StringBuilder sb = new StringBuilder("org.apache.commons.pipeline:");
136             sb.append("class=").append(className);
137             if (stageName != null) {
138                 sb.append(",name=").append(stageName);
139             }
140 
141             try {
142                 ObjectName name = new ObjectName(sb.toString());
143                 if (mbs.isRegistered(name)) {
144                     log.info("JMX Overlap. Multiple instances of '" + name + "'. Only one will be registered.");
145                 } else {
146                     Class mbeanInterface = ExtendedBaseStageMBean.class;
147                     try {
148                         // Find out if the stage has a more specific MBean. Reflection can be slow
149                         // but this operation is pretty fast. Not to mention it's only done at startup.
150                         Class[] interfaces = getClass().getInterfaces();
151                         for (int i=0 ; i < interfaces.length; i++) {
152                             Class current = interfaces[i];
153                             // Only use interfaces that extend ExtendedBaseStageMBean to maintain a minimum
154                             // amount of functionality.
155                             if (current != ExtendedBaseStageMBean.class
156                                 && ExtendedBaseStageMBean.class.isAssignableFrom(current)) {
157                                 mbeanInterface = current;
158                                 break;
159                             }
160                         }
161                     } catch (Exception e) {
162                         // In the event of security or cast exceptions, default back to base.
163                         log.info("Reflection error while checking for JMX interfaces.");
164                         // Reset in the off chance it got hosed.
165                         mbeanInterface = ExtendedBaseStageMBean.class;
166                     }
167 
168                     StandardMBean mbean = new StandardMBean(this,
169                                                             mbeanInterface);
170                     mbs.registerMBean(mbean, name);
171                     log.info("JMX MBean registered: " + name.toString() + " (" + mbeanInterface.getSimpleName() + ")");
172                 }
173             } catch (Exception e) {
174                 log.warn("Failed to register with JMX server",e);
175             }
176         }
177     }
178 
179     /**
180      * Called when a stage has been created but before the first object is
181      * sent to the stage for processing. Subclasses
182      * should use the innerPreprocess method, which is called by this method.
183      *
184      * @see org.apache.commons.pipeline.Stage#preprocess()
185      */
186     public final void preprocess() throws StageException {
187         if ( !preProcessed ) {
188             serviceTimeStatistics = new SynchronizedDescriptiveStatistics();
189             serviceTimeStatistics.setWindowSize(currentStatWindowSize);
190             innerPreprocess();
191         }
192         preProcessed = true;
193     }
194 
195     public final void process( Object obj ) throws StageException {
196         objectsReceived.incrementAndGet();
197         StopWatch stopWatch = new StopWatch();
198         stopWatch.start();
199         try {
200             this.innerProcess( obj );
201         } catch (Exception e) {
202             // Hate to catch Exception, but don't want anything killing off the thread
203             // and hanging the pipeline.
204             log.error("Uncaught exception in " + className + ": " + e.getMessage(), e);
205             unhandledExceptions.incrementAndGet();
206         }
207         stopWatch.stop();
208 
209         long totalTime = stopWatch.getTime();
210         totalServiceTime.addAndGet(totalTime);
211 
212         // I hate to synchronize anything in the base class, but this
213         // call should be very fast and is not thread safe.
214         serviceTimeStatistics.addValue(totalTime);
215 
216         // Use ThreadLocals so that the stats only reflect process
217         // calls that have completed. Otherwise the emit times can
218         // exceed the process times.
219         totalEmits.addAndGet(emitCount.get().intValue());
220         totalEmitTime.addAndGet(emitTotal.get().longValue());
221         emitCount.remove();
222         emitTotal.remove();
223 
224         if (collectBranchStats) {
225             for (Map.Entry<String, AtomicLong> entry : threadLocalEmitBranchTime.get().entrySet()) {
226                 if (emitTimeByBranch.containsKey(entry.getKey())) {
227                     emitTimeByBranch.get(entry.getKey()).addAndGet(entry.getValue().longValue());
228                 } else {
229                     // Race condition. containsKey could return false and another thread could insert
230                     // here. We can synchronize here and we will very rarely hit this block. Only the first
231                     // time each queue is accessed.
232                     synchronized (emitTimeByBranch) {
233                         // Double check for the race condition.
234                         if (emitTimeByBranch.containsKey(entry.getKey())) {
235                             emitTimeByBranch.get(entry.getKey()).addAndGet(entry.getValue().longValue());
236                         } else {
237                             emitTimeByBranch.put(entry.getKey(), new AtomicLong(entry.getValue().longValue()));
238                         }
239                     }
240                 }
241             }
242             threadLocalEmitBranchTime.remove();
243         }
244 
245         if ( objectsReceived.longValue() % statusInterval == 0 ) {
246             logStatus();
247         }
248     }
249 
250     /**
251      * Convenience method to feed the specified object to the next stage downstream.
252      */
253     public final void emit( Object obj ) {
254         if ( log.isDebugEnabled() ) {
255             log.debug( this.getClass() + " is emitting object " + obj );
256         }
257         if ( this.downstreamFeeder == null ) {
258             synchronized (this) {
259                 // Lazy init the default feeder.
260                 this.downstreamFeeder = stageContext.getDownstreamFeeder( this );
261             }
262         }
263         feed( DEFAULT_QUEUE_NAME, downstreamFeeder, obj );
264     }
265 
266     /**
267      * Convenience method to feed the specified object to the first stage of the specified branch.
268      */
269     public final void emit( String branch, Object obj ) {
270         Feeder feeder = this.stageContext.getBranchFeeder( branch );
271         feed( branch, feeder, obj );
272     }
273 
274     private void feed(String name, Feeder feeder, Object obj ) {
275         if ( feeder == null ) {
276             // The pipeline code should never allow this to happen.
277             String objectType = ( obj != null ? obj.getClass().getSimpleName() : "null" );
278             log.error( "Ignoring attempt to emit " + objectType +
279                        " object to invalid feeder" );
280             return;
281         }
282         StopWatch emitWatch = new StopWatch();
283         emitWatch.start();
284 
285         // Pass the emitted object to the next stage (default or branch)
286         feeder.feed( obj );
287 
288         emitWatch.stop();
289 
290         // Use ThreadLocal variables so the emit totals do not
291         // go up until the process call completes.
292         emitTotal.get().addAndGet( emitWatch.getTime() );
293         emitCount.get().incrementAndGet();
294 
295         if (collectBranchStats) {
296             if (! threadLocalEmitBranchTime.get().containsKey(name)) {
297                 AtomicLong currentTotal = new AtomicLong(emitWatch.getTime());
298                 threadLocalEmitBranchTime.get().put(name, currentTotal);
299             } else {
300                 threadLocalEmitBranchTime.get().get(name).addAndGet(emitWatch.getTime());
301             }
302         }
303     }
304 
305     /**
306      * Called when a stage has completed all processing. Subclasses
307      * should use the innerPostprocess method, which is called by this method.
308      *
309      * @see org.apache.commons.pipeline.Stage#postprocess()
310      */
311     public final void postprocess() throws StageException {
312         if ( !postProcessed ) {
313             logStatus();
314             innerPostprocess();
315         }
316         postProcessed = true;
317     }
318 
319     public void release() {
320         // No-op implementation to fulfill interface
321     }
322 
323     public abstract void innerProcess( Object obj )
324       throws StageException;
325 
326     public void innerPreprocess() throws StageException {
327         // No-op default implementation.
328     }
329 
330     public void innerPostprocess() throws StageException {
331         // No-op default implementation.
332     }
333 
334     /**
335      * Class-specific status message. Null or empty status' will be ignored.
336      */
337     public abstract String status();
338 
339     public void logStatus() {
340         String logMessage = getStatusMessage();
341         log.info(logMessage);
342     }
343 
344     /**
345      * @return Log message including both base stage and class specific stats.
346      */
347     public String getStatusMessage() {
348         StringBuilder sb = new StringBuilder( 512 );
349         NumberFormat formatter = floatFormatter.get();
350 
351         float serviceTime = ( totalServiceTime.floatValue() / 1000.0f );
352         float emitTime = ( totalEmitTime.floatValue() / 1000.0f );
353         float netServiceTime = ( serviceTime - emitTime );
354 
355         float emitPercentage = 0.0f;
356         float emitFloat = totalEmits.floatValue();
357         float recvFloat = objectsReceived.floatValue();
358         if (recvFloat > 0) {
359             emitPercentage = (emitFloat / recvFloat)*100.0f;
360         }
361 
362         sb.append( "\n\t === " ).append( className ).append( " Generic Stats === " );
363 
364         if (statusBatchSize > 1) {
365             sb.append("\n\tStatus Batch Size (all /obj and /sec include this): ").append(statusBatchSize);
366         }
367 
368         sb.append( "\n\tTotal objects received:" ).append( objectsReceived );
369         sb.append( "\n\tTotal unhandled exceptions:" ).append( unhandledExceptions );
370         sb.append( "\n\tTotal objects emitted:" ).append( totalEmits );
371         if (emitFloat > 0) {
372             sb.append(" (").append(formatter.format(emitPercentage)).append("%)");
373         }
374         sb.append( "\n\tTotal gross processing time (sec):" )
375           .append( formatter.format( serviceTime ) );
376         sb.append( "\n\tTotal emit blocked time (sec):" )
377           .append( formatter.format( emitTime ) );
378         sb.append( "\n\tTotal net processing time (sec):" )
379           .append( formatter.format( netServiceTime ) );
380 
381         float avgServiceTime = 0;
382         float avgEmitTime = 0;
383         float avgNetServiceTime = 0;
384         if ( objectsReceived.longValue() > 0 ) {
385             avgServiceTime = ( serviceTime / objectsReceived.floatValue()/statusBatchSize );
386             avgEmitTime = ( emitTime / objectsReceived.floatValue()/statusBatchSize );
387             avgNetServiceTime = ( netServiceTime / objectsReceived.floatValue()/statusBatchSize );
388         }
389 
390         sb.append( "\n\tAverage gross processing time (sec/obj):" )
391           .append( formatter.format( avgServiceTime ) );
392         sb.append( "\n\tAverage emit blocked time (sec/obj):" )
393           .append( formatter.format( avgEmitTime ) );
394         sb.append( "\n\tAverage net processing time (sec/obj):" )
395           .append( formatter.format( avgNetServiceTime ) );
396 
397         if (serviceTimeStatistics != null) {
398             long count = serviceTimeStatistics.getN();
399             if (count > 0) {
400                 double avgMillis = getCurrentServiceTimeAverage()/(float)statusBatchSize;
401                 sb.append( "\n\tAverage gross processing time in last ")
402                   .append(count)
403                   .append(" (sec/obj):" )
404                   .append( formatter.format( avgMillis/1000 ) );
405             }
406         }
407 
408         float grossThroughput = 0;
409         if ( avgServiceTime > 0 ) {
410             grossThroughput = ( 1.0f / avgServiceTime );
411         }
412         float netThroughput = 0;
413         if ( avgNetServiceTime > 0 ) {
414             netThroughput = ( 1.0f / avgNetServiceTime );
415         }
416 
417         sb.append( "\n\tGross throughput (obj/sec):" )
418           .append( formatter.format( grossThroughput ) );
419         sb.append( "\n\tNet throughput (obj/sec):" )
420           .append( formatter.format( netThroughput ) );
421 
422         float percWorking = 0;
423         float percBlocking = 0;
424         if ( serviceTime > 0 ) {
425             percWorking = ( netServiceTime / serviceTime ) * 100;
426             percBlocking = ( emitTime / serviceTime ) * 100;
427         }
428 
429         sb.append( "\n\t% time working:" ).append( formatter.format( percWorking ) );
430         sb.append( "\n\t% time blocking:" ).append( formatter.format( percBlocking ) );
431 
432         // No need to output for a non-branching stage or if there was very little
433         // blocking (as defined in the constant)
434         if (collectBranchStats && emitTimeByBranch.size() > 1 && percBlocking >= BRANCH_BLOCK_THRESHOLD) {
435             try {
436                 for (Map.Entry<String, AtomicLong> entry : emitTimeByBranch.entrySet()) {
437                     float branchBlockSec = (entry.getValue().floatValue()/1000.0f);
438                     float branchBlockPerc = (branchBlockSec/emitTime) * 100;
439                     sb.append("\n\t\t% branch ").append(entry.getKey()).append(":").append(formatter.format(branchBlockPerc));
440                 }
441             } catch (RuntimeException e) {
442                 // Synchronizing would be slow, ConcurrentMod is possible but unlikely since the map is
443                 // only modified the first time a stage is emitted to so just catch and
444                 // log it. No need to stop all processing over a reporting failure.
445                 sb.append("\n\t\tproblem getting per-branch stats: ").append(e.getMessage());
446             }
447         }
448 
449         String stageSpecificStatus = this.status();
450         if ( stageSpecificStatus != null && stageSpecificStatus.length() > 0 ) {
451             sb.append( "\n\t === " )
452               .append( className )
453               .append( " Specific Stats === " );
454             sb.append( stageSpecificStatus );
455         }
456 
457         return sb.toString();
458     }
459 
460     protected String formatTotalTimeStat( String name, AtomicLong totalTime ) {
461         return formatTotalTimeStat( name, totalTime.longValue() );
462     }
463 
464     protected String formatTotalTimeStat( String name, long totalTime ) {
465         if ( name == null || totalTime < 0 ) {
466             return "";
467         }
468         NumberFormat formatter = floatFormatter.get();
469         StringBuilder sb = new StringBuilder();
470         // Total processing time minus calls to emit.
471         float totalSec = totalTime / 1000.0f;
472         float average = 0;
473         if ( getObjectsReceived() > 0 ) {
474             average = totalSec / getObjectsReceived() / (float)statusBatchSize;
475         }
476 
477         if ( log.isDebugEnabled() ) {
478             sb.append( "\n\tTotal " )
479               .append( name )
480               .append( " processing time (sec):" )
481               .append( formatter.format( totalSec ) );
482         }
483 
484         sb.append( "\n\tAverage " )
485           .append( name )
486           .append( " processing time (sec/obj):" )
487           .append( formatter.format( average ) );
488 
489         if ( log.isDebugEnabled() && average > 0 ) {
490             float throughput = ( 1.0f ) / average * (float)statusBatchSize;
491             sb.append( "\n\tThroughput for " )
492               .append( name )
493               .append( " (obj/sec):" )
494               .append( formatter.format( throughput ) );
495         }
496 
497         return sb.toString();
498     }
499 
500     protected String formatCounterStat( String name, AtomicInteger count ) {
501         return formatCounterStat(name, count.get());
502     }
503 
504     protected String formatCounterStat( String name, AtomicLong count ) {
505         return formatCounterStat(name, count.get());
506     }
507 
508     protected String formatCounterStat( String name, long count ) {
509         if ( name == null || count < 0 || getObjectsReceived() <= 0) {
510             return "";
511         }
512         NumberFormat formatter = floatFormatter.get();
513         StringBuilder sb = new StringBuilder();
514 
515         float perc = ((float)count*(float)statusBatchSize/(float)getObjectsReceived())*100.0f;
516 
517         sb.append( "\n\tNumber of " )
518           .append( name )
519           .append( " (" )
520           .append( formatter.format(perc) )
521           .append( "%) :")
522           .append( count );
523 
524         return sb.toString();
525     }
526 
527     /**
528      * @see org.apache.commons.pipeline.ExtendedBaseStageMBean#getStatusInterval()
529      */
530     public Long getStatusInterval() {
531         return Long.valueOf(statusInterval);
532     }
533 
534     /**
535      * @see org.apache.commons.pipeline.ExtendedBaseStageMBean#setStatusInterval(long)
536      */
537     public void setStatusInterval( Long statusInterval ) {
538         this.statusInterval = statusInterval;
539     }
540 
541     public Integer getStatusBatchSize() {
542         return statusBatchSize;
543     }
544 
545     public void setStatusBatchSize(Integer statusBatchSize) {
546         this.statusBatchSize = statusBatchSize;
547     }
548 
549     /**
550      * @see org.apache.commons.pipeline.ExtendedBaseStageMBean#getObjectsReceived()
551      */
552     public long getObjectsReceived() {
553         return objectsReceived.longValue();
554     }
555 
556     /**
557      * @see org.apache.commons.pipeline.ExtendedBaseStageMBean#getTotalServiceTime()
558      */
559     public long getTotalServiceTime() {
560         return totalServiceTime.longValue();
561     }
562 
563     /**
564      * @see org.apache.commons.pipeline.ExtendedBaseStageMBean#getTotalEmitTime()
565      */
566     public long getTotalEmitTime() {
567         return totalEmitTime.longValue();
568     }
569 
570     /**
571      * @see org.apache.commons.pipeline.ExtendedBaseStageMBean#getTotalEmits()
572      */
573     public long getTotalEmits() {
574         return totalEmits.longValue();
575     }
576 
577     /**
578      * @see org.apache.commons.pipeline.ExtendedBaseStageMBean#getCollectBranchStats()
579      */
580     public Boolean getCollectBranchStats() {
581         return collectBranchStats;
582     }
583 
584     /**
585      * @see org.apache.commons.pipeline.ExtendedBaseStageMBean#setCollectBranchStats(Boolean)
586      */
587     public void setCollectBranchStats(Boolean collectBranchStats) {
588         this.collectBranchStats = collectBranchStats;
589     }
590 
591     public Integer getCurrentStatWindowSize() {
592         return Integer.valueOf(currentStatWindowSize);
593     }
594 
595     public void setCurrentStatWindowSize(Integer newStatWindowSize) {
596         if (serviceTimeStatistics != null
597                 && newStatWindowSize != this.currentStatWindowSize) {
598             synchronized (serviceTimeStatistics) {
599                 serviceTimeStatistics.setWindowSize(newStatWindowSize);
600             }
601         }
602         this.currentStatWindowSize = newStatWindowSize;
603     }
604 
605     public String getStageName() {
606         return stageName;
607     }
608 
609     public void setStageName(String name) {
610         this.stageName = name;
611     }
612 
613     public boolean isJmxEnabled() {
614         return jmxEnabled;
615     }
616 
617     public void setJmxEnabled(boolean jmxEnabled) {
618         this.jmxEnabled = jmxEnabled;
619     }
620 
621     /**
622      * Returns a moving average of the service time. This does not yet take into account time spent
623      * calling emit, nor does it return minimum, maximum or other statistical information at this time.
624      *
625      * @return Average time to process the last <code>currentStatWindowSize</code> objects.
626      */
627     public double getCurrentServiceTimeAverage() {
628         double avg = -1.0d;
629 
630         // Hate to synchronize in the base class, but this should be very quick.
631         avg = serviceTimeStatistics.getMean();
632 
633         return avg;
634     }
635 }