001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *     http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.commons.pipeline.stage;
018    
019    import java.lang.management.ManagementFactory;
020    import java.text.DecimalFormat;
021    import java.text.NumberFormat;
022    import java.util.HashMap;
023    import java.util.Map;
024    import java.util.concurrent.atomic.AtomicInteger;
025    import java.util.concurrent.atomic.AtomicLong;
026    
027    import javax.management.MBeanServer;
028    import javax.management.ObjectName;
029    import javax.management.StandardMBean;
030    
031    import org.apache.commons.lang.time.StopWatch;
032    import org.apache.commons.logging.Log;
033    import org.apache.commons.logging.LogFactory;
034    import org.apache.commons.math.stat.descriptive.SynchronizedDescriptiveStatistics;
035    import org.apache.commons.pipeline.Feeder;
036    import org.apache.commons.pipeline.Stage;
037    import org.apache.commons.pipeline.StageContext;
038    import org.apache.commons.pipeline.StageException;
039    
040    /**
041     * Base class for pipeline stages. Keeps track of performance statistics and allows
042     * for collection and adjustment via JMX (optional)
043     *
044     * Cannot extend BaseStage because it marks the emit methods as final.
045     *
046     * @author mzsanford
047     */
048    public abstract class ExtendedBaseStage implements Stage, ExtendedBaseStageMBean {
049        /**  Minimum percentage of blocking before we report per-branch stats. */
050        private static final float BRANCH_BLOCK_THRESHOLD = 1.0f;
051        /** Default size of the moving-window average statistics */
052        private static final int DEFAULT_DESCRIPTIVE_STATS_WINDOW_SIZE = 100;
053        /** Default queue name when reporting statistics */
054        private static final String DEFAULT_QUEUE_NAME = "[DefaultQueue]";
055        /** Default number of objects after which a status message is logged */
056        private static final int DEFAULT_STATUS_INTERVAL = 1000;
057        protected final Log log = LogFactory.getLog( getClass() );
058    
059        protected StageContext stageContext;
060        private Feeder downstreamFeeder;
061        private String stageName;
062        private final AtomicLong objectsReceived = new AtomicLong(0);
063        private final AtomicLong unhandledExceptions = new AtomicLong(0);
064        private final AtomicLong totalServiceTime = new AtomicLong(0);
065        private final AtomicLong totalEmitTime = new AtomicLong(0);
066        private final AtomicLong totalEmits = new AtomicLong(0);
067        private final Map<String, AtomicLong> emitTimeByBranch = new HashMap<String, AtomicLong>();
068        private int currentStatWindowSize = DEFAULT_DESCRIPTIVE_STATS_WINDOW_SIZE;
069        private SynchronizedDescriptiveStatistics serviceTimeStatistics;
070        private long statusInterval = DEFAULT_STATUS_INTERVAL;
071        private Integer statusBatchSize = 1;
072        private boolean collectBranchStats = false;
073        private boolean preProcessed = false; // prevent recursion.
074        private boolean postProcessed = false; // prevent recursion.
075        private boolean jmxEnabled = true;
076    
077        /**
078         * Class name for status message. Needed because java.util.logging only
079         * reports the base class name.
080         */
081        private final String className = getClass().getSimpleName();
082    
083        /**
084         * ThreadLocal sum of time spent waiting on blocked queues during the current process call.
085         */
086        protected static ThreadLocal<AtomicLong> emitTotal = new ThreadLocal<AtomicLong>() {
087            protected synchronized AtomicLong initialValue() {
088                return new AtomicLong();
089            }
090        };
091    
092        /**
093         * ThreadLocal sum of time spent waiting on blocked queues during the current process call by queue name.
094         */
095        protected static ThreadLocal<Map<String, AtomicLong>> threadLocalEmitBranchTime = new ThreadLocal<Map<String, AtomicLong>>() {
096            protected synchronized Map<String, AtomicLong> initialValue() {
097                return new HashMap<String, AtomicLong>();
098            }
099        };
100    
101        /**
102         * ThreadLocal count of emit calls during the current process call.
103         */
104        protected static ThreadLocal<AtomicInteger> emitCount = new ThreadLocal<AtomicInteger>() {
105            protected synchronized AtomicInteger initialValue() {
106                return new AtomicInteger();
107            }
108        };
109    
110        /**
111         * ThreadLocal formatter since they are not thread safe.
112         */
113        protected static ThreadLocal<NumberFormat> floatFormatter = new ThreadLocal<NumberFormat>() {
114            protected synchronized NumberFormat initialValue() {
115                return new DecimalFormat("##0.000");
116            }
117        };
118    
119        public ExtendedBaseStage() {
120            // Empty constructor provided for future use.
121        }
122    
123        public void init( StageContext context ) {
124            this.stageContext = context;
125            if (jmxEnabled) {
126                enableJMX(context);
127            }
128        }
129    
130        @SuppressWarnings("unchecked")
131        private final void enableJMX(StageContext context) {
132            MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
133            if (mbs != null) {
134                // Try to build a unique JMX name.
135                StringBuilder sb = new StringBuilder("org.apache.commons.pipeline:");
136                sb.append("class=").append(className);
137                if (stageName != null) {
138                    sb.append(",name=").append(stageName);
139                }
140    
141                try {
142                    ObjectName name = new ObjectName(sb.toString());
143                    if (mbs.isRegistered(name)) {
144                        log.info("JMX Overlap. Multiple instances of '" + name + "'. Only one will be registered.");
145                    } else {
146                        Class mbeanInterface = ExtendedBaseStageMBean.class;
147                        try {
148                            // Find out if the stage has a more specific MBean. Reflection can be slow
149                            // but this operation is pretty fast. Not to mention it's only done at startup.
150                            Class[] interfaces = getClass().getInterfaces();
151                            for (int i=0 ; i < interfaces.length; i++) {
152                                Class current = interfaces[i];
153                                // Only use interfaces that extend ExtendedBaseStageMBean to maintain a minimum
154                                // amount of functionality.
155                                if (current != ExtendedBaseStageMBean.class
156                                    && ExtendedBaseStageMBean.class.isAssignableFrom(current)) {
157                                    mbeanInterface = current;
158                                    break;
159                                }
160                            }
161                        } catch (Exception e) {
162                            // In the event of security or cast exceptions, default back to base.
163                            log.info("Reflection error while checking for JMX interfaces.");
164                            // Reset in the off chance it got hosed.
165                            mbeanInterface = ExtendedBaseStageMBean.class;
166                        }
167    
168                        StandardMBean mbean = new StandardMBean(this,
169                                                                mbeanInterface);
170                        mbs.registerMBean(mbean, name);
171                        log.info("JMX MBean registered: " + name.toString() + " (" + mbeanInterface.getSimpleName() + ")");
172                    }
173                } catch (Exception e) {
174                    log.warn("Failed to register with JMX server",e);
175                }
176            }
177        }
178    
179        /**
180         * Called when a stage has been created but before the first object is
181         * sent to the stage for processing. Subclasses
182         * should use the innerPreprocess method, which is called by this method.
183         *
184         * @see org.apache.commons.pipeline.Stage#preprocess()
185         */
186        public final void preprocess() throws StageException {
187            if ( !preProcessed ) {
188                serviceTimeStatistics = new SynchronizedDescriptiveStatistics();
189                serviceTimeStatistics.setWindowSize(currentStatWindowSize);
190                innerPreprocess();
191            }
192            preProcessed = true;
193        }
194    
195        public final void process( Object obj ) throws StageException {
196            objectsReceived.incrementAndGet();
197            StopWatch stopWatch = new StopWatch();
198            stopWatch.start();
199            try {
200                this.innerProcess( obj );
201            } catch (Exception e) {
202                // Hate to catch Exception, but don't want anything killing off the thread
203                // and hanging the pipeline.
204                log.error("Uncaught exception in " + className + ": " + e.getMessage(), e);
205                unhandledExceptions.incrementAndGet();
206            }
207            stopWatch.stop();
208    
209            long totalTime = stopWatch.getTime();
210            totalServiceTime.addAndGet(totalTime);
211    
212            // I hate to synchronize anything in the base class, but this
213            // call should be very fast and is not thread safe.
214            serviceTimeStatistics.addValue(totalTime);
215    
216            // Use ThreadLocals so that the stats only reflect process
217            // calls that have completed. Otherwise the emit times can
218            // exceed the process times.
219            totalEmits.addAndGet(emitCount.get().intValue());
220            totalEmitTime.addAndGet(emitTotal.get().longValue());
221            emitCount.remove();
222            emitTotal.remove();
223    
224            if (collectBranchStats) {
225                for (Map.Entry<String, AtomicLong> entry : threadLocalEmitBranchTime.get().entrySet()) {
226                    if (emitTimeByBranch.containsKey(entry.getKey())) {
227                        emitTimeByBranch.get(entry.getKey()).addAndGet(entry.getValue().longValue());
228                    } else {
229                        // Race condition. containsKey could return false and another thread could insert
230                        // here. We can synchronize here and we will very rarely hit this block. Only the first
231                        // time each queue is accessed.
232                        synchronized (emitTimeByBranch) {
233                            // Double check for the race condition.
234                            if (emitTimeByBranch.containsKey(entry.getKey())) {
235                                emitTimeByBranch.get(entry.getKey()).addAndGet(entry.getValue().longValue());
236                            } else {
237                                emitTimeByBranch.put(entry.getKey(), new AtomicLong(entry.getValue().longValue()));
238                            }
239                        }
240                    }
241                }
242                threadLocalEmitBranchTime.remove();
243            }
244    
245            if ( objectsReceived.longValue() % statusInterval == 0 ) {
246                logStatus();
247            }
248        }
249    
250        /**
251         * Convenience method to feed the specified object to the next stage downstream.
252         */
253        public final void emit( Object obj ) {
254            if ( log.isDebugEnabled() ) {
255                log.debug( this.getClass() + " is emitting object " + obj );
256            }
257            if ( this.downstreamFeeder == null ) {
258                synchronized (this) {
259                    // Lazy init the default feeder.
260                    this.downstreamFeeder = stageContext.getDownstreamFeeder( this );
261                }
262            }
263            feed( DEFAULT_QUEUE_NAME, downstreamFeeder, obj );
264        }
265    
266        /**
267         * Convenience method to feed the specified object to the first stage of the specified branch.
268         */
269        public final void emit( String branch, Object obj ) {
270            Feeder feeder = this.stageContext.getBranchFeeder( branch );
271            feed( branch, feeder, obj );
272        }
273    
274        private void feed(String name, Feeder feeder, Object obj ) {
275            if ( feeder == null ) {
276                // The pipeline code should never allow this to happen.
277                String objectType = ( obj != null ? obj.getClass().getSimpleName() : "null" );
278                log.error( "Ignoring attempt to emit " + objectType +
279                           " object to invalid feeder" );
280                return;
281            }
282            StopWatch emitWatch = new StopWatch();
283            emitWatch.start();
284    
285            // Pass the emitted object to the next stage (default or branch)
286            feeder.feed( obj );
287    
288            emitWatch.stop();
289    
290            // Use ThreadLocal variables so the emit totals do not
291            // go up until the process call completes.
292            emitTotal.get().addAndGet( emitWatch.getTime() );
293            emitCount.get().incrementAndGet();
294    
295            if (collectBranchStats) {
296                if (! threadLocalEmitBranchTime.get().containsKey(name)) {
297                    AtomicLong currentTotal = new AtomicLong(emitWatch.getTime());
298                    threadLocalEmitBranchTime.get().put(name, currentTotal);
299                } else {
300                    threadLocalEmitBranchTime.get().get(name).addAndGet(emitWatch.getTime());
301                }
302            }
303        }
304    
305        /**
306         * Called when a stage has completed all processing. Subclasses
307         * should use the innerPostprocess method, which is called by this method.
308         *
309         * @see org.apache.commons.pipeline.Stage#postprocess()
310         */
311        public final void postprocess() throws StageException {
312            if ( !postProcessed ) {
313                logStatus();
314                innerPostprocess();
315            }
316            postProcessed = true;
317        }
318    
319        public void release() {
320            // No-op implementation to fulfill interface
321        }
322    
323        public abstract void innerProcess( Object obj )
324          throws StageException;
325    
326        public void innerPreprocess() throws StageException {
327            // No-op default implementation.
328        }
329    
330        public void innerPostprocess() throws StageException {
331            // No-op default implementation.
332        }
333    
334        /**
335         * Class-specific status message. Null or empty status' will be ignored.
336         */
337        public abstract String status();
338    
339        public void logStatus() {
340            String logMessage = getStatusMessage();
341            log.info(logMessage);
342        }
343    
344        /**
345         * @return Log message including both base stage and class specific stats.
346         */
347        public String getStatusMessage() {
348            StringBuilder sb = new StringBuilder( 512 );
349            NumberFormat formatter = floatFormatter.get();
350    
351            float serviceTime = ( totalServiceTime.floatValue() / 1000.0f );
352            float emitTime = ( totalEmitTime.floatValue() / 1000.0f );
353            float netServiceTime = ( serviceTime - emitTime );
354    
355            float emitPercentage = 0.0f;
356            float emitFloat = totalEmits.floatValue();
357            float recvFloat = objectsReceived.floatValue();
358            if (recvFloat > 0) {
359                emitPercentage = (emitFloat / recvFloat)*100.0f;
360            }
361    
362            sb.append( "\n\t === " ).append( className ).append( " Generic Stats === " );
363    
364            if (statusBatchSize > 1) {
365                sb.append("\n\tStatus Batch Size (all /obj and /sec include this): ").append(statusBatchSize);
366            }
367    
368            sb.append( "\n\tTotal objects received:" ).append( objectsReceived );
369            sb.append( "\n\tTotal unhandled exceptions:" ).append( unhandledExceptions );
370            sb.append( "\n\tTotal objects emitted:" ).append( totalEmits );
371            if (emitFloat > 0) {
372                sb.append(" (").append(formatter.format(emitPercentage)).append("%)");
373            }
374            sb.append( "\n\tTotal gross processing time (sec):" )
375              .append( formatter.format( serviceTime ) );
376            sb.append( "\n\tTotal emit blocked time (sec):" )
377              .append( formatter.format( emitTime ) );
378            sb.append( "\n\tTotal net processing time (sec):" )
379              .append( formatter.format( netServiceTime ) );
380    
381            float avgServiceTime = 0;
382            float avgEmitTime = 0;
383            float avgNetServiceTime = 0;
384            if ( objectsReceived.longValue() > 0 ) {
385                avgServiceTime = ( serviceTime / objectsReceived.floatValue()/statusBatchSize );
386                avgEmitTime = ( emitTime / objectsReceived.floatValue()/statusBatchSize );
387                avgNetServiceTime = ( netServiceTime / objectsReceived.floatValue()/statusBatchSize );
388            }
389    
390            sb.append( "\n\tAverage gross processing time (sec/obj):" )
391              .append( formatter.format( avgServiceTime ) );
392            sb.append( "\n\tAverage emit blocked time (sec/obj):" )
393              .append( formatter.format( avgEmitTime ) );
394            sb.append( "\n\tAverage net processing time (sec/obj):" )
395              .append( formatter.format( avgNetServiceTime ) );
396    
397            if (serviceTimeStatistics != null) {
398                long count = serviceTimeStatistics.getN();
399                if (count > 0) {
400                    double avgMillis = getCurrentServiceTimeAverage()/(float)statusBatchSize;
401                    sb.append( "\n\tAverage gross processing time in last ")
402                      .append(count)
403                      .append(" (sec/obj):" )
404                      .append( formatter.format( avgMillis/1000 ) );
405                }
406            }
407    
408            float grossThroughput = 0;
409            if ( avgServiceTime > 0 ) {
410                grossThroughput = ( 1.0f / avgServiceTime );
411            }
412            float netThroughput = 0;
413            if ( avgNetServiceTime > 0 ) {
414                netThroughput = ( 1.0f / avgNetServiceTime );
415            }
416    
417            sb.append( "\n\tGross throughput (obj/sec):" )
418              .append( formatter.format( grossThroughput ) );
419            sb.append( "\n\tNet throughput (obj/sec):" )
420              .append( formatter.format( netThroughput ) );
421    
422            float percWorking = 0;
423            float percBlocking = 0;
424            if ( serviceTime > 0 ) {
425                percWorking = ( netServiceTime / serviceTime ) * 100;
426                percBlocking = ( emitTime / serviceTime ) * 100;
427            }
428    
429            sb.append( "\n\t% time working:" ).append( formatter.format( percWorking ) );
430            sb.append( "\n\t% time blocking:" ).append( formatter.format( percBlocking ) );
431    
432            // No need to output for a non-branching stage or if there was very little
433            // blocking (as defined in the constant)
434            if (collectBranchStats && emitTimeByBranch.size() > 1 && percBlocking >= BRANCH_BLOCK_THRESHOLD) {
435                try {
436                    for (Map.Entry<String, AtomicLong> entry : emitTimeByBranch.entrySet()) {
437                        float branchBlockSec = (entry.getValue().floatValue()/1000.0f);
438                        float branchBlockPerc = (branchBlockSec/emitTime) * 100;
439                        sb.append("\n\t\t% branch ").append(entry.getKey()).append(":").append(formatter.format(branchBlockPerc));
440                    }
441                } catch (RuntimeException e) {
442                    // Synchronizing would be slow, ConcurrentMod is possible but unlikely since the map is
443                    // only modified the first time a stage is emitted to so just catch and
444                    // log it. No need to stop all processing over a reporting failure.
445                    sb.append("\n\t\tproblem getting per-branch stats: ").append(e.getMessage());
446                }
447            }
448    
449            String stageSpecificStatus = this.status();
450            if ( stageSpecificStatus != null && stageSpecificStatus.length() > 0 ) {
451                sb.append( "\n\t === " )
452                  .append( className )
453                  .append( " Specific Stats === " );
454                sb.append( stageSpecificStatus );
455            }
456    
457            return sb.toString();
458        }
459    
460        protected String formatTotalTimeStat( String name, AtomicLong totalTime ) {
461            return formatTotalTimeStat( name, totalTime.longValue() );
462        }
463    
464        protected String formatTotalTimeStat( String name, long totalTime ) {
465            if ( name == null || totalTime < 0 ) {
466                return "";
467            }
468            NumberFormat formatter = floatFormatter.get();
469            StringBuilder sb = new StringBuilder();
470            // Total processing time minus calls to emit.
471            float totalSec = totalTime / 1000.0f;
472            float average = 0;
473            if ( getObjectsReceived() > 0 ) {
474                average = totalSec / getObjectsReceived() / (float)statusBatchSize;
475            }
476    
477            if ( log.isDebugEnabled() ) {
478                sb.append( "\n\tTotal " )
479                  .append( name )
480                  .append( " processing time (sec):" )
481                  .append( formatter.format( totalSec ) );
482            }
483    
484            sb.append( "\n\tAverage " )
485              .append( name )
486              .append( " processing time (sec/obj):" )
487              .append( formatter.format( average ) );
488    
489            if ( log.isDebugEnabled() && average > 0 ) {
490                float throughput = ( 1.0f ) / average * (float)statusBatchSize;
491                sb.append( "\n\tThroughput for " )
492                  .append( name )
493                  .append( " (obj/sec):" )
494                  .append( formatter.format( throughput ) );
495            }
496    
497            return sb.toString();
498        }
499    
500        protected String formatCounterStat( String name, AtomicInteger count ) {
501            return formatCounterStat(name, count.get());
502        }
503    
504        protected String formatCounterStat( String name, AtomicLong count ) {
505            return formatCounterStat(name, count.get());
506        }
507    
508        protected String formatCounterStat( String name, long count ) {
509            if ( name == null || count < 0 || getObjectsReceived() <= 0) {
510                return "";
511            }
512            NumberFormat formatter = floatFormatter.get();
513            StringBuilder sb = new StringBuilder();
514    
515            float perc = ((float)count*(float)statusBatchSize/(float)getObjectsReceived())*100.0f;
516    
517            sb.append( "\n\tNumber of " )
518              .append( name )
519              .append( " (" )
520              .append( formatter.format(perc) )
521              .append( "%) :")
522              .append( count );
523    
524            return sb.toString();
525        }
526    
527        /**
528         * @see org.apache.commons.pipeline.ExtendedBaseStageMBean#getStatusInterval()
529         */
530        public Long getStatusInterval() {
531            return Long.valueOf(statusInterval);
532        }
533    
534        /**
535         * @see org.apache.commons.pipeline.ExtendedBaseStageMBean#setStatusInterval(long)
536         */
537        public void setStatusInterval( Long statusInterval ) {
538            this.statusInterval = statusInterval;
539        }
540    
541        public Integer getStatusBatchSize() {
542            return statusBatchSize;
543        }
544    
545        public void setStatusBatchSize(Integer statusBatchSize) {
546            this.statusBatchSize = statusBatchSize;
547        }
548    
549        /**
550         * @see org.apache.commons.pipeline.ExtendedBaseStageMBean#getObjectsReceived()
551         */
552        public long getObjectsReceived() {
553            return objectsReceived.longValue();
554        }
555    
556        /**
557         * @see org.apache.commons.pipeline.ExtendedBaseStageMBean#getTotalServiceTime()
558         */
559        public long getTotalServiceTime() {
560            return totalServiceTime.longValue();
561        }
562    
563        /**
564         * @see org.apache.commons.pipeline.ExtendedBaseStageMBean#getTotalEmitTime()
565         */
566        public long getTotalEmitTime() {
567            return totalEmitTime.longValue();
568        }
569    
570        /**
571         * @see org.apache.commons.pipeline.ExtendedBaseStageMBean#getTotalEmits()
572         */
573        public long getTotalEmits() {
574            return totalEmits.longValue();
575        }
576    
577        /**
578         * @see org.apache.commons.pipeline.ExtendedBaseStageMBean#getCollectBranchStats()
579         */
580        public Boolean getCollectBranchStats() {
581            return collectBranchStats;
582        }
583    
584        /**
585         * @see org.apache.commons.pipeline.ExtendedBaseStageMBean#setCollectBranchStats(Boolean)
586         */
587        public void setCollectBranchStats(Boolean collectBranchStats) {
588            this.collectBranchStats = collectBranchStats;
589        }
590    
591        public Integer getCurrentStatWindowSize() {
592            return Integer.valueOf(currentStatWindowSize);
593        }
594    
595        public void setCurrentStatWindowSize(Integer newStatWindowSize) {
596            if (serviceTimeStatistics != null
597                    && newStatWindowSize != this.currentStatWindowSize) {
598                synchronized (serviceTimeStatistics) {
599                    serviceTimeStatistics.setWindowSize(newStatWindowSize);
600                }
601            }
602            this.currentStatWindowSize = newStatWindowSize;
603        }
604    
605        public String getStageName() {
606            return stageName;
607        }
608    
609        public void setStageName(String name) {
610            this.stageName = name;
611        }
612    
613        public boolean isJmxEnabled() {
614            return jmxEnabled;
615        }
616    
617        public void setJmxEnabled(boolean jmxEnabled) {
618            this.jmxEnabled = jmxEnabled;
619        }
620    
621        /**
622         * Returns a moving average of the service time. This does not yet take into account time spent
623         * calling emit, nor does it return minimum, maximum or other statistical information at this time.
624         *
625         * @return Average time to process the last <code>currentStatWindowSize</code> objects.
626         */
627        public double getCurrentServiceTimeAverage() {
628            double avg = -1.0d;
629    
630            // Hate to synchronize in the base class, but this should be very quick.
631            avg = serviceTimeStatistics.getMean();
632    
633            return avg;
634        }
635    }