001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     * 
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    
018    package org.apache.commons.performance;
019    
020    import java.util.logging.Logger;
021    
022    import org.apache.commons.math.random.RandomData;
023    import org.apache.commons.math.random.RandomDataImpl;
024    import org.apache.commons.math.stat.descriptive.SummaryStatistics;
025    
026    /**
027     * <p>Base for performance / load test clients. 
028     * The run method executes init, then setup-execute-cleanup in a loop,
029     * gathering performance statistics, with time between executions based
030     * on configuration parameters. The <code>finish</code> method is executed once
031     * at the end of a run. See {@link #nextDelay()} for details on
032     * inter-arrival time computation.</p>
033     * 
034     * <p>Subclasses <strong>must</strong> implement <code>execute</code>, which
035     * is the basic client request action that is executed, and timed, 
036     * repeatedly. If per-request setup is required, and you do not want the time
037     * associated with this setup to be included in the reported timings, implement 
038     * <code>setUp</code> and put the setup code there.  Similarly for 
039     * <code>cleanUp</code>. Initialization code that needs to be executed once
040     * only, before any requests are initiated, should be put into 
041     * <code>init</code> and cleanup code that needs to be executed only once
042     * at the end of a simulation should be put into <code>finish.</code></p>
043     * 
044     * <p>By default, the only statistics accumulated are for the latency of the 
045     * <code>execute</code> method. Additional metrics can be captured and added
046     * to the {@link Statistics} for the running thread.</p>
047     * 
048     */
049    public abstract class ClientThread implements Runnable {
050    
051        // Inter-arrival time configuration parameters 
052        /** Minimum mean time between requests */
053        private long minDelay;
054        /** Maximum mean time between requests */
055        private long maxDelay;
056        /** Standard deviation of delay distribution */
057        private double sigma;
058        /** Delay type - determines how next start times are computed */
059        private String delayType;
060        /** Ramp length for cyclic mean delay */
061        private long rampPeriod;
062        /** Peak length for cyclic mean delay */
063        private long peakPeriod;
064        /** Trough length for cyclic mean delay */
065        private long troughPeriod;
066        /** Cycle type */
067        private final String cycleType;
068        /** Ramp type */
069        private String rampType;
070        
071        /** Number of iterations */
072        private final long iterations;
073        
074        // State data
075        /** Start time of run */
076        private long startTime;
077        /** Start time of current period */
078        private long periodStart;
079        /** Last mean delay */
080        private double lastMean;
081        /** Cycle state constants */
082        protected static final int RAMPING_UP = 0;
083        protected static final int RAMPING_DOWN = 1;
084        protected static final int PEAK_LOAD = 2;
085        protected static final int TROUGH_LOAD = 3;
086        /** Cycle state */
087        private int cycleState = RAMPING_UP;
088        /** Number of errors */
089        private long numErrors = 0;
090        /** Number of misses */
091        private long numMisses = 0;
092        
093        /** Random data generator */
094        protected RandomData randomData = new RandomDataImpl();
095        /** Statistics container */
096        protected Statistics stats;
097        /** Logger shared by client threads */
098        protected Logger logger;
099        
100        /**
101         * Create a client thread.
102         * 
103         * @param iterations number of iterations
104         * @param minDelay minimum mean time between client requests
105         * @param maxDelay maximum mean time between client requests
106         * @param sigma standard deviation of time between client requests
107         * @param delayType distribution of time between client requests
108         * @param rampPeriod ramp period of cycle for cyclic load
109         * @param peakPeriod peak period of cycle for cyclic load
110         * @param troughPeriod trough period of cycle for cyclic load
111         * @param cycleType type of cycle for mean delay
112         * @param rampType type of ramp (linear or random jumps)
113         * @param logger common logger shared by all clients
114         * @param stats Statistics instance to add results to
115         */
116        public ClientThread(long iterations, long minDelay, long maxDelay,
117                double sigma, String delayType, long rampPeriod, long peakPeriod,
118                long troughPeriod, String cycleType,
119                String rampType, Logger logger,
120                Statistics stats) {
121            this.iterations = iterations;
122            this.minDelay = minDelay;
123            this.maxDelay = maxDelay;
124            this.sigma = sigma;
125            this.delayType = delayType;
126            this.peakPeriod = peakPeriod;
127            this.rampPeriod = rampPeriod;
128            this.troughPeriod = troughPeriod;
129            this.cycleType = cycleType;
130            this.rampType = rampType;
131            this.logger = logger;
132            this.stats = stats;
133        }
134        
135        public void run() {
136            try {
137                init();
138            } catch (Exception ex) {
139                logger.severe("init failed.");
140                ex.printStackTrace();
141                return;
142            }
143            long start = 0;
144            startTime = System.currentTimeMillis();
145            long lastStart = startTime;
146            periodStart = System.currentTimeMillis();
147            lastMean = (double) maxDelay; // Ramp up, if any, starts here
148            SummaryStatistics responseStats = new SummaryStatistics();
149            for (int i = 0; i < iterations; i++) {
150                try {
151                    setUp();
152                    // Generate next interarrival time. If that is in the
153                    // past, go right away and log a miss; otherwise wait.
154                    long elapsed = System.currentTimeMillis() - lastStart;
155                    long nextDelay = nextDelay();
156                    if (elapsed > nextDelay) {
157                        numMisses++;
158                    } else {
159                        try {
160                            Thread.sleep(nextDelay - elapsed);
161                        } catch (InterruptedException ex) {
162                            logger.info("Sleep interrupted");
163                        }
164                    }
165                    
166                    // Fire the request and measure response time
167                    start = System.currentTimeMillis();
168                    execute();
169                } catch (Exception ex) {
170                    ex.printStackTrace();
171                    numErrors++;
172                } finally {
173                    try {
174                        responseStats.addValue(System.currentTimeMillis() - start);
175                        lastStart = start;
176                        cleanUp();
177                    } catch (Exception e) {
178                        e.printStackTrace();                    
179                    }
180                }
181            }
182            
183            try {
184                finish();
185            } catch (Exception ex) {
186                logger.severe("finalize failed.");
187                ex.printStackTrace();
188                return;
189            }
190            
191            // Use thread name as process name
192            String process = Thread.currentThread().getName();
193            
194            // Record latency statistics
195            stats.addStatistics(responseStats, process, "latency");
196            
197            // Log accumulated statistics for this thread
198            logger.info(stats.displayProcessStatistics(process) + 
199              "Number of misses: " + numMisses + "\n" +
200              "Number or errors: " + numErrors + "\n"); 
201        }
202        
203        /** Executed once at the beginning of the run */
204        protected void init() throws Exception {}
205        
206        /** Executed at the beginning of each iteration */
207        protected void setUp() throws Exception {}
208        
209        /** Executed in finally block of iteration try-catch */
210        protected void cleanUp() throws Exception {}
211        
212        /** Executed once after the run finishes */
213        protected void finish() throws Exception {}
214        
215        /** 
216         * Core iteration code.  Timings are based on this,
217         *  so keep it tight.
218         */
219        public abstract void execute() throws Exception;
220        
221        /**
222         * <p>Computes the next interarrival time (time to wait between requests)
223         * based on configured values for min/max delay, delay type, cycle type, 
224         * ramp type and period. Currently supports constant (always returning
225         * <code>minDelay</code> delay time), Poisson and Gaussian distributed
226         * random time delays, linear and random ramps, and oscillating / 
227         * non-oscillating cycle types.</p>
228         * 
229         * <p><strong>loadType</strong> determines whether returned times are
230         * deterministic or random. If <code>loadType</code> is not "constant",
231         * a random value with the specified distribution and mean determined by
232         * the other parameters is returned. For "gaussian" <code>loadType</code>,
233         * <code>sigma</code> is used as used as the standard deviation. </p>
234         * 
235         * <p><strong>cycleType</strong> determines how the returned times vary
236         * over time. "oscillating", means times ramp up and down between 
237         * <code>minDelay</code> and <code>maxDelay.</code> Ramp type is controlled
238         * by <code>rampType.</code>  Linear <code>rampType</code> means the means
239         * increase or decrease linearly over the time of the period.  Random
240         * makes random jumps up or down toward the next peak or trough. "None" for
241         * <code>rampType</code> under oscillating <code>cycleType</code> makes the
242         * means alternate between peak (<code>minDelay</code>) and trough 
243         *(<code>maxDelay</code>) with no ramp between. </p>
244         * 
245         * <p>Oscillating loads cycle through RAMPING_UP, PEAK_LOAD, RAMPING_DOWN
246         * and TROUGH_LOAD states, with the amount of time spent in each state
247         * determined by <code>rampPeriod</code> (time spent increasing on the way
248         * up and decreasing on the way down), <code>peakPeriod</code> (time spent
249         * at peak load, i.e., <code>minDelay</code> mean delay) and 
250         * <code>troughPeriod</code> (time spent at minimum load, i.e., 
251         * <code>maxDelay</code> mean delay). All times are specified in
252         * milliseconds. </p>
253         * 
254         * <p><strong>Examples:</strong><ol>
255         * 
256         * <li>Given<pre>
257         * delayType = "constant"
258         * minDelay = 250
259         * maxDelay = 500
260         * cycleType = "oscillating"
261         * rampType = "linear" 
262         * rampPeriod = 10000
263         * peakPeriod = 20000
264         * troughPeriod = 30000</pre> load will start at one request every 500 ms,
265         * which is "trough load." Load then ramps up linearly over the next 10
266         * seconds unil it reaches one request per 250 milliseconds, which is 
267         * "peak load."  Peak load is sustained for 20 seconds and then load ramps
268         * back down, again taking 10 seconds to get down to "trough load," which
269         * is sustained for 30 seconds.  The cycle then repeats.</li>
270         * 
271         * <li><pre>
272         * delayType = "gaussian"
273         * minDelay = 250
274         * maxDelay = 500
275         * cycleType = "oscillating"
276         * rampType = "linear" 
277         * rampPeriod = 10000
278         * peakPeriod = 20000
279         * troughPeriod = 30000
280         * sigma = 100 </pre> produces a load pattern similar to example 1, but in
281         * this case the computed delay value is fed into a gaussian random number
282         * generator as the mean and 100 as the standard deviation - i.e., 
283         * <code>nextDelay</code> returns random, gaussian distributed values with
284         * means moving according to the cyclic pattern in example 1.</li>
285         * 
286         * <li><pre>
287         * delayType = "constant"
288         * minDelay = 250
289         * maxDelay = 500
290         * cycleType = "none"
291         * rampType = "linear" 
292         * rampPeriod = 10000</pre> produces a load pattern that increases linearly
293         * from one request every 500ms to one request every 250ms and then stays
294         * constant at that level until the run is over.  Other parameters are
295         * ignored in this case.</li>
296         * 
297         * <li><pre>
298         * delayType = "poisson"
299         * minDelay = 250
300         * maxDelay = 500
301         * cycleType = "none"
302         * rampType = "none" 
303         * </pre> produces inter-arrival times that are poisson distributed with
304         * mean 250ms. Note that when rampType is "none," the value of 
305         * <code>minDelay</code> is used as the (constant) mean delay.</li></ol>
306         * 
307         * @return next value for delay
308         */
309        protected long nextDelay() throws ConfigurationException {
310            double targetDelay = 0; 
311            double dMinDelay = (double) minDelay;
312            double dMaxDelay = (double) maxDelay;
313            double delayDifference = dMaxDelay - dMinDelay;
314            long currentTime = System.currentTimeMillis();
315            if (cycleType.equals("none")) {
316                if (rampType.equals("none") || 
317                        (currentTime - startTime) > rampPeriod) { // ramped up
318                    targetDelay = dMinDelay;
319                } else if (rampType.equals("linear")) { // single period linear
320                    double prop = 
321                        (double) (currentTime - startTime) / (double) rampPeriod;
322                    targetDelay =  dMaxDelay - delayDifference * prop;
323                } else { // Random jumps down to delay - single period
324                    // TODO: govern size of jumps as in oscillating
325                    // Where we last were as proportion of way down to minDelay
326                    double lastProp = 
327                        (dMaxDelay - lastMean) / delayDifference;
328                    // Make a random jump toward 1 (1 = all the way down)
329                    double prop = randomData.nextUniform(lastProp, 1);
330                    targetDelay = dMaxDelay - delayDifference * prop;
331                }
332            } else if (cycleType.equals("oscillating")) {
333                // First change cycle state if we need to
334                adjustState(currentTime);
335                targetDelay = computeCyclicDelay(
336                        currentTime, dMinDelay, dMaxDelay);
337            } else {
338                throw new ConfigurationException(
339                        "Cycle type not supported: " + cycleType);
340            }
341    
342            // Remember last mean for ramp up / down
343            lastMean = targetDelay;
344    
345            if (delayType.equals("constant")) { 
346                return Math.round(targetDelay);
347            }
348    
349            // Generate and return random deviate
350            if (delayType.equals("gaussian")) {
351                return Math.round(randomData.nextGaussian(targetDelay, sigma));
352            } else { // must be Poisson
353                return randomData.nextPoisson(targetDelay);
354            } 
355        }
356        
357        /**
358         * Adjusts cycleState, periodStart and lastMean if a cycle state
359         * transition needs to happen.
360         * 
361         * @param currentTime current time
362         */
363        protected void adjustState(long currentTime) {
364            long timeInPeriod = currentTime - periodStart;
365            if ( ((cycleState == RAMPING_UP || cycleState == RAMPING_DOWN) && 
366                    timeInPeriod < rampPeriod) ||
367                 (cycleState == PEAK_LOAD && timeInPeriod < peakPeriod) ||
368                 (cycleState == TROUGH_LOAD && timeInPeriod < troughPeriod)) {
369                return; // No state change
370            }
371            switch (cycleState) {
372                case RAMPING_UP: 
373                    if (peakPeriod > 0) {
374                        cycleState = PEAK_LOAD;
375                    } else {
376                        cycleState = RAMPING_DOWN;
377                    }
378                    lastMean = (double) minDelay;
379                    periodStart = currentTime;
380                    break;
381                
382                case RAMPING_DOWN: 
383                    if (troughPeriod > 0) {
384                        cycleState = TROUGH_LOAD;
385                    } else {
386                        cycleState = RAMPING_UP;
387                    }
388                    lastMean = (double) maxDelay;
389                    periodStart = currentTime;
390                    break;
391                
392                case PEAK_LOAD: 
393                    if (rampPeriod > 0) {
394                        cycleState = RAMPING_DOWN;
395                        lastMean = (double) minDelay;
396                    } else {
397                        cycleState = TROUGH_LOAD;
398                        lastMean = (double) maxDelay;
399                    }
400                    periodStart = currentTime;
401                    break;
402                
403                case TROUGH_LOAD: 
404                    if (rampPeriod > 0) {
405                        cycleState = RAMPING_UP;
406                        lastMean = (double) maxDelay;
407                    } else {
408                        cycleState = PEAK_LOAD;
409                        lastMean = (double) minDelay;
410                    }
411                    periodStart = currentTime;
412                    break;
413                
414                default: 
415                    throw new IllegalStateException(
416                            "Illegal cycle state: " + cycleState);
417            }
418        }
419        
420        protected double computeCyclicDelay(
421                long currentTime, double min, double max) {
422            
423            // Constant load states
424            if (cycleState == PEAK_LOAD) { 
425                return min;
426            } 
427            if (cycleState == TROUGH_LOAD) {
428                return max;
429            } 
430    
431            // No ramp - stay at min or max load during ramp
432            if (rampType.equals("none")) { // min or max, no ramp
433                if (cycleState == RAMPING_UP) {
434                    return max;
435                } else {
436                    return min;
437                }
438            } 
439    
440            // Linear ramp type and ramping up or down
441            double diff = max - min;
442            if (rampType.equals("linear")) {
443                double prop = 
444                    (double)(currentTime - periodStart) / (double) rampPeriod;
445                if (cycleState == RAMPING_UP) {
446                    return max - diff * prop; 
447                } else {
448                    return min + diff * prop;
449                }
450            } else { // random jumps down, then back up
451                // Where we last were as proportion of way down to minDelay
452                double lastProp = 
453                    (max - lastMean) / diff;
454                // Where we would be if this were a linear ramp
455                double linearProp = 
456                    (double)(currentTime - periodStart) / (double) rampPeriod;
457                // Need to govern size of jumps, otherwise "convergence"
458                // can be too fast - use linear ramp as governor
459                if ((cycleState == RAMPING_UP && (lastProp > linearProp)) || 
460                        (cycleState == RAMPING_DOWN && 
461                                ((1 - lastProp) > linearProp))) 
462                    lastProp = (cycleState == RAMPING_UP) ? linearProp : 
463                        (1 - linearProp);
464                double prop = 0;
465                if (cycleState == RAMPING_UP) { // Random jump toward 1
466                    prop = randomData.nextUniform(lastProp, 1);
467                } else { // Random jump toward 0
468                    prop = randomData.nextUniform(0, lastProp);
469                }
470                // Make sure sequence is monotone
471                if (cycleState == RAMPING_UP) {
472                    return Math.min(lastMean, max - diff * prop);
473                } else {
474                    return Math.max(lastMean, min + diff * prop);
475                }
476            }
477        }
478    
479        public long getMinDelay() {
480            return minDelay;
481        }
482    
483        public long getMaxDelay() {
484            return maxDelay;
485        }
486    
487        public double getSigma() {
488            return sigma;
489        }
490    
491        public String getDelayType() {
492            return delayType;
493        }
494    
495        public long getRampPeriod() {
496            return rampPeriod;
497        }
498    
499        public long getPeakPeriod() {
500            return peakPeriod;
501        }
502    
503        public long getTroughPeriod() {
504            return troughPeriod;
505        }
506    
507        public String getCycleType() {
508            return cycleType;
509        }
510    
511        public String getRampType() {
512            return rampType;
513        }
514    
515        public long getIterations() {
516            return iterations;
517        }
518    
519        public long getStartTime() {
520            return startTime;
521        }
522    
523        public long getPeriodStart() {
524            return periodStart;
525        }
526    
527        public double getLastMean() {
528            return lastMean;
529        }
530    
531        public int getCycleState() {
532            return cycleState;
533        }
534    
535        public long getNumErrors() {
536            return numErrors;
537        }
538    
539        public long getNumMisses() {
540            return numMisses;
541        }
542    
543        public Statistics getStats() {
544            return stats;
545        }
546    
547        public void setStartTime(long startTime) {
548            this.startTime = startTime;
549        }
550    
551        public void setPeriodStart(long periodStart) {
552            this.periodStart = periodStart;
553        }
554    
555        public void setLastMean(double lastMean) {
556            this.lastMean = lastMean;
557        }
558    
559        public void setCycleState(int cycleState) {
560            this.cycleState = cycleState;
561        }
562    
563        public void setNumErrors(long numErrors) {
564            this.numErrors = numErrors;
565        }
566    
567        public void setNumMisses(long numMisses) {
568            this.numMisses = numMisses;
569        }
570    
571        public void setRampType(String rampType) {
572            this.rampType = rampType;
573        }
574    
575        public void setMinDelay(long minDelay) {
576            this.minDelay = minDelay;
577        }
578    
579        public void setMaxDelay(long maxDelay) {
580            this.maxDelay = maxDelay;
581        }
582    
583        public void setSigma(double sigma) {
584            this.sigma = sigma;
585        }
586    
587        public void setDelayType(String delayType) {
588            this.delayType = delayType;
589        }
590    
591        public void setRampPeriod(long rampPeriod) {
592            this.rampPeriod = rampPeriod;
593        }
594    
595        public void setPeakPeriod(long peakPeriod) {
596            this.peakPeriod = peakPeriod;
597        }
598    
599        public void setTroughPeriod(long troughPeriod) {
600            this.troughPeriod = troughPeriod;
601        }
602        
603    }