1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17 package org.apache.commons.rng.examples.stress;
18
19 import org.apache.commons.rng.UniformRandomProvider;
20 import org.apache.commons.rng.core.source64.RandomLongSource;
21 import org.apache.commons.rng.simple.RandomSource;
22
23 import picocli.CommandLine.Command;
24 import picocli.CommandLine.Mixin;
25 import picocli.CommandLine.Option;
26 import picocli.CommandLine.Parameters;
27
28 import java.io.BufferedReader;
29 import java.io.BufferedWriter;
30 import java.io.File;
31 import java.io.IOException;
32 import java.nio.ByteOrder;
33 import java.nio.file.Files;
34 import java.nio.file.StandardOpenOption;
35 import java.text.SimpleDateFormat;
36 import java.time.Instant;
37 import java.time.LocalDateTime;
38 import java.time.ZoneId;
39 import java.util.ArrayList;
40 import java.util.Arrays;
41 import java.util.Date;
42 import java.util.Formatter;
43 import java.util.List;
44 import java.util.Locale;
45 import java.util.concurrent.Callable;
46 import java.util.concurrent.ExecutionException;
47 import java.util.concurrent.ExecutorService;
48 import java.util.concurrent.Executors;
49 import java.util.concurrent.Future;
50 import java.util.concurrent.TimeUnit;
51 import java.util.concurrent.atomic.AtomicLong;
52 import java.util.concurrent.locks.ReentrantLock;
53
54 /**
55 * Specification for the "stress" command.
56 *
57 * <p>This command loads a list of random generators and tests each generator by
58 * piping the values returned by its {@link UniformRandomProvider#nextInt()}
59 * method to a program that reads {@code int} values from its standard input and
60 * writes an analysis report to standard output.</p>
61 */
62 @Command(name = "stress",
63 description = {"Run repeat trials of random data generators using a provided test application.",
64 "Data is transferred to the application sub-process via standard input."})
65 class StressTestCommand implements Callable<Void> {
66 /** 1000. Any value below this can be exactly represented to 3 significant figures. */
67 private static final int ONE_THOUSAND = 1000;
68
69 /** The standard options. */
70 @Mixin
71 private StandardOptions reusableOptions;
72
73 /** The executable. */
74 @Parameters(index = "0",
75 description = "The stress test executable.")
76 private File executable;
77
78 /** The executable arguments. */
79 @Parameters(index = "1..*",
80 description = "The arguments to pass to the executable.",
81 paramLabel = "<argument>")
82 private List<String> executableArguments = new ArrayList<>();
83
84 /** The file output prefix. */
85 @Option(names = {"--prefix"},
86 description = "Results file prefix (default: ${DEFAULT-VALUE}).")
87 private File fileOutputPrefix = new File("test_");
88
89 /** The stop file. */
90 @Option(names = {"--stop-file"},
91 description = {"Stop file (default: <Results file prefix>.stop).",
92 "When created it will prevent new tasks from starting " +
93 "but running tasks will complete."})
94 private File stopFile;
95
96 /** The output mode for existing files. */
97 @Option(names = {"-o", "--output-mode"},
98 description = {"Output mode for existing files (default: ${DEFAULT-VALUE}).",
99 "Valid values: ${COMPLETION-CANDIDATES}."})
100 private StressTestCommand.OutputMode outputMode = OutputMode.ERROR;
101
102 /** The list of random generators. */
103 @Option(names = {"-l", "--list"},
104 description = {"List of random generators.",
105 "The default list is all known generators."},
106 paramLabel = "<genList>")
107 private File generatorsListFile;
108
109 /** The number of trials to put in the template list of random generators. */
110 @Option(names = {"-t", "--trials"},
111 description = {"The number of trials for each random generator.",
112 "Used only for the default list (default: ${DEFAULT-VALUE})."})
113 private int trials = 1;
114
115 /** The trial offset. */
116 @Option(names = {"--trial-offset"},
117 description = {"Offset to add to the trial number for output files (default: ${DEFAULT-VALUE}).",
118 "Use for parallel tests with the same output prefix."})
119 private int trialOffset;
120
121 /** The number of available processors. */
122 @Option(names = {"-p", "--processors"},
123 description = {"Number of available processors (default: ${DEFAULT-VALUE}).",
124 "Number of concurrent tasks = ceil(processors / threadsPerTask)",
125 "threadsPerTask = applicationThreads + (ignoreJavaThread ? 0 : 1)"})
126 private int processors = Math.max(1, Runtime.getRuntime().availableProcessors());
127
128 /** The number of threads to use for each test task. */
129 @Option(names = {"--ignore-java-thread"},
130 description = {"Ignore the java RNG thread when computing concurrent tasks."})
131 private boolean ignoreJavaThread;
132
133 /** The number of threads to use for each testing application. */
134 @Option(names = {"--threads"},
135 description = {"Number of threads to use for each application (default: ${DEFAULT-VALUE}).",
136 "Total threads per task includes an optional java thread."})
137 private int applicationThreads = 1;
138
139 /** The size of the byte buffer for the binary data. */
140 @Option(names = {"--buffer-size"},
141 description = {"Byte-buffer size for the transferred data (default: ${DEFAULT-VALUE})."})
142 private int bufferSize = 8192;
143
144 /** The output byte order of the binary data. */
145 @Option(names = {"-b", "--byte-order"},
146 description = {"Byte-order of the transferred data (default: ${DEFAULT-VALUE}).",
147 "Valid values: BIG_ENDIAN, LITTLE_ENDIAN."})
148 private ByteOrder byteOrder = ByteOrder.nativeOrder();
149
150 /** Flag to indicate the output should be bit-reversed. */
151 @Option(names = {"-r", "--reverse-bits"},
152 description = {"Reverse the bits in the data (default: ${DEFAULT-VALUE}).",
153 "Note: Generators may fail tests for a reverse sequence " +
154 "when passing using the standard sequence."})
155 private boolean reverseBits;
156
157 /** Flag to use 64-bit long output. */
158 @Option(names = {"--raw64"},
159 description = {"Use 64-bit output (default is 32-bit).",
160 "This is ignored if not a native 64-bit generator.",
161 "Set to true sets the source64 mode to LONG."})
162 private boolean raw64;
163
164 /** Output mode for 64-bit long output.
165 *
166 * <p>Note: The default is set as the default caching implementation.
167 * It passes the full output of the RNG to the stress test application.
168 * Any combination random sources are performed on the full 64-bit output.
169 *
170 * <p>If using INT this will use the RNG's nextInt method.
171 * Any combination random sources are performed on the 32-bit output. Without
172 * a combination random source the output should be the same as the default if
173 * the generator uses the default caching implementation.
174 *
175 * <p>LONG and LO_HI should match binary output when LITTLE_ENDIAN. LONG and HI_LO
176 * should match binary output when BIG_ENDIAN.
177 *
178 * <p>Changing from HI_LO to LO_HI should not effect the stress test as many values are consumed
179 * per test. Using HI or LO may have a different outcome as parts of the generator output
180 * may be weak, e.g. the lower bits of linear congruential generators.
181 */
182 @Option(names = {"--source64"},
183 description = {"Output mode for 64-bit generators (default: ${DEFAULT-VALUE}).",
184 "This is ignored if not a native 64-bit generator.",
185 "In 32-bit mode the output uses a combination of upper and " +
186 "lower bits of the 64-bit value.",
187 "Valid values: ${COMPLETION-CANDIDATES}."})
188 private Source64Mode source64 = RNGUtils.getSource64Default();
189
190 /** The random seed as a byte[]. */
191 @Option(names = {"-x", "--hex-seed"},
192 description = {"The hex-encoded random seed.",
193 "Seed conversion for multi-byte primitives use little-endian format.",
194 "Use to repeat tests. Not recommended for batch testing."})
195 private String byteSeed;
196
197 /**
198 * Flag to indicate the output should be combined with a hash code from a new object.
199 * This is a method previously used in the
200 * {@link org.apache.commons.rng.simple.internal.SeedFactory SeedFactory}.
201 *
202 * @see System#identityHashCode(Object)
203 */
204 @Option(names = {"--hashcode"},
205 description = {"Combine the bits with a hash code (default: ${DEFAULT-VALUE}).",
206 "System.identityHashCode(new Object()) ^ rng.nextInt()."})
207 private boolean xorHashCode;
208
209 /**
210 * Flag to indicate the output should be combined with output from ThreadLocalRandom.
211 */
212 @Option(names = {"--local-random"},
213 description = {"Combine the bits with ThreadLocalRandom (default: ${DEFAULT-VALUE}).",
214 "ThreadLocalRandom.current().nextInt() ^ rng.nextInt()."})
215 private boolean xorThreadLocalRandom;
216
217 /**
218 * Optional second generator to be combined with the primary generator.
219 */
220 @Option(names = {"--xor-rng"},
221 description = {"Combine the bits with a second generator.",
222 "xorRng.nextInt() ^ rng.nextInt().",
223 "Valid values: Any known RandomSource enum value."})
224 private RandomSource xorRandomSource;
225
226 /** The flag to indicate a dry run. */
227 @Option(names = {"--dry-run"},
228 description = "Perform a dry run where the generators and output files are created " +
229 "but the stress test is not executed.")
230 private boolean dryRun;
231
232 /** The locl to hold when checking the stop file. */
233 private ReentrantLock stopFileLock = new ReentrantLock(false);
234 /** The stop file exists flag. This should be read/updated when holding the lock. */
235 private boolean stopFileExists;
236 /**
237 * The timestamp when the stop file was last checked.
238 * This should be read/updated when holding the lock.
239 */
240 private long stopFileTimestamp;
241
242 /**
243 * The output mode for existing files.
244 */
245 enum OutputMode {
246 /** Error if the files exists. */
247 ERROR,
248 /** Skip existing files. */
249 SKIP,
250 /** Append to existing files. */
251 APPEND,
252 /** Overwrite existing files. */
253 OVERWRITE
254 }
255
256 /**
257 * Validates the run command arguments, creates the list of generators and runs the
258 * stress test tasks.
259 */
260 @Override
261 public Void call() {
262 LogUtils.setLogLevel(reusableOptions.logLevel);
263 ProcessUtils.checkExecutable(executable);
264 ProcessUtils.checkOutputDirectory(fileOutputPrefix);
265 checkStopFileDoesNotExist();
266 final Iterable<StressTestData> stressTestData = createStressTestData();
267 printStressTestData(stressTestData);
268 runStressTest(stressTestData);
269 return null;
270 }
271
272 /**
273 * Initialise the stop file to a default unless specified by the user, then check it
274 * does not currently exist.
275 *
276 * @throws ApplicationException If the stop file exists
277 */
278 private void checkStopFileDoesNotExist() {
279 if (stopFile == null) {
280 stopFile = new File(fileOutputPrefix + ".stop");
281 }
282 if (stopFile.exists()) {
283 throw new ApplicationException("Stop file exists: " + stopFile);
284 }
285 }
286
287 /**
288 * Check if the stop file exists.
289 *
290 * <p>This method is thread-safe. It will log a message if the file exists one time only.
291 *
292 * @return true if the stop file exists
293 */
294 private boolean isStopFileExists() {
295 stopFileLock.lock();
296 try {
297 if (!stopFileExists) {
298 // This should hit the filesystem each time it is called.
299 // To prevent this happening a lot when all the first set of tasks run use
300 // a timestamp to limit the check to 1 time each interval.
301 final long timestamp = System.currentTimeMillis();
302 if (timestamp > stopFileTimestamp) {
303 checkStopFile(timestamp);
304 }
305 }
306 return stopFileExists;
307 } finally {
308 stopFileLock.unlock();
309 }
310 }
311
312 /**
313 * Check if the stop file exists. Update the timestamp for the next check. If the stop file
314 * does exists then log a message.
315 *
316 * @param timestamp Timestamp of the last check.
317 */
318 private void checkStopFile(final long timestamp) {
319 stopFileTimestamp = timestamp + TimeUnit.SECONDS.toMillis(2);
320 stopFileExists = stopFile.exists();
321 if (stopFileExists) {
322 LogUtils.info("Stop file detected: %s", stopFile);
323 LogUtils.info("No further tasks will start");
324 }
325 }
326
327 /**
328 * Creates the test data.
329 *
330 * <p>If the input file is null then a default list is created.
331 *
332 * @return the stress test data
333 * @throws ApplicationException if an error occurred during the file read.
334 */
335 private Iterable<StressTestData> createStressTestData() {
336 if (generatorsListFile == null) {
337 return new StressTestDataList("", trials);
338 }
339 // Read data into a list
340 try (BufferedReader reader = Files.newBufferedReader(generatorsListFile.toPath())) {
341 return ListCommand.readStressTestData(reader);
342 } catch (final IOException ex) {
343 throw new ApplicationException("Failed to read generators list: " + generatorsListFile, ex);
344 }
345 }
346
347 /**
348 * Prints the stress test data if the verbosity allows. This is used to debug the list
349 * of generators to be tested.
350 *
351 * @param stressTestData List of generators to be tested.
352 */
353 private static void printStressTestData(Iterable<StressTestData> stressTestData) {
354 if (!LogUtils.isLoggable(LogUtils.LogLevel.DEBUG)) {
355 return;
356 }
357 try {
358 final StringBuilder sb = new StringBuilder("Testing generators").append(System.lineSeparator());
359 ListCommand.writeStressTestData(sb, stressTestData);
360 LogUtils.debug(sb.toString());
361 } catch (final IOException ex) {
362 throw new ApplicationException("Failed to show list of generators", ex);
363 }
364 }
365
366 /**
367 * Creates the tasks and starts the processes.
368 *
369 * @param stressTestData List of generators to be tested.
370 */
371 private void runStressTest(Iterable<StressTestData> stressTestData) {
372 final List<String> command = ProcessUtils.buildSubProcessCommand(executable, executableArguments);
373
374 LogUtils.info("Set-up stress test ...");
375
376 // Check existing output files before starting the tasks.
377 final String basePath = fileOutputPrefix.getAbsolutePath();
378 checkExistingOutputFiles(basePath, stressTestData);
379
380 final int parallelTasks = getParallelTasks();
381
382 final ProgressTracker progressTracker = new ProgressTracker(parallelTasks);
383 final List<Runnable> tasks = createTasks(command, basePath, stressTestData, progressTracker);
384
385 // Run tasks with parallel execution.
386 final ExecutorService service = Executors.newFixedThreadPool(parallelTasks);
387
388 LogUtils.info("Running stress test ...");
389 LogUtils.info("Shutdown by creating stop file: %s", stopFile);
390 progressTracker.setTotal(tasks.size());
391 final List<Future<?>> taskList = submitTasks(service, tasks);
392
393 // Wait for completion (ignoring return value).
394 try {
395 for (final Future<?> f : taskList) {
396 try {
397 f.get();
398 } catch (final ExecutionException ex) {
399 // Log the error. Do not re-throw as other tasks may be processing that
400 // can still complete successfully.
401 LogUtils.error(ex.getCause(), ex.getMessage());
402 }
403 }
404 } catch (final InterruptedException ex) {
405 // Restore interrupted state...
406 Thread.currentThread().interrupt();
407 throw new ApplicationException("Unexpected interruption: " + ex.getMessage(), ex);
408 } finally {
409 // Terminate all threads.
410 service.shutdown();
411 }
412
413 LogUtils.info("Finished stress test");
414 }
415
416 /**
417 * Check for existing output files.
418 *
419 * @param basePath The base path to the output results files.
420 * @param stressTestData List of generators to be tested.
421 * @throws ApplicationException If an output file exists and the output mode is error
422 */
423 private void checkExistingOutputFiles(String basePath,
424 Iterable<StressTestData> stressTestData) {
425 if (outputMode == StressTestCommand.OutputMode.ERROR) {
426 for (final StressTestData testData : stressTestData) {
427 for (int trial = 1; trial <= testData.getTrials(); trial++) {
428 // Create the output file
429 final File output = createOutputFile(basePath, testData, trial);
430 if (output.exists()) {
431 throw new ApplicationException(createExistingFileMessage(output));
432 }
433 }
434 }
435 }
436 }
437
438 /**
439 * Creates the named output file.
440 *
441 * <p>Note: The trial will be combined with the trial offset to create the file name.
442 *
443 * @param basePath The base path to the output results files.
444 * @param testData The test data.
445 * @param trial The trial.
446 * @return the file
447 */
448 private File createOutputFile(String basePath,
449 StressTestData testData,
450 int trial) {
451 return new File(String.format("%s%s_%d", basePath, testData.getId(), trial + trialOffset));
452 }
453
454 /**
455 * Creates the existing file message.
456 *
457 * @param output The output file.
458 * @return the message
459 */
460 private static String createExistingFileMessage(File output) {
461 return "Existing output file: " + output;
462 }
463
464 /**
465 * Gets the number of parallel tasks. This uses the number of available processors and
466 * the number of threads to use per task.
467 *
468 * <pre>
469 * threadsPerTask = applicationThreads + (ignoreJavaThread ? 0 : 1)
470 * parallelTasks = ceil(processors / threadsPerTask)
471 * </pre>
472 *
473 * @return the parallel tasks
474 */
475 private int getParallelTasks() {
476 // Avoid zeros in the fraction numberator and denominator
477 final int availableProcessors = Math.max(1, processors);
478 final int threadsPerTask = Math.max(1, applicationThreads + (ignoreJavaThread ? 0 : 1));
479 final int parallelTasks = (int) Math.ceil((double) availableProcessors / threadsPerTask);
480 LogUtils.debug("Parallel tasks = %d (%d / %d)",
481 parallelTasks, availableProcessors, threadsPerTask);
482 return parallelTasks;
483 }
484
485 /**
486 * Create the tasks for the test data. The output file for the sub-process will be
487 * constructed using the base path, the test identifier and the trial number.
488 *
489 * @param command The command for the test application.
490 * @param basePath The base path to the output results files.
491 * @param stressTestData List of generators to be tested.
492 * @param progressTracker Progress tracker.
493 * @return the list of tasks
494 */
495 private List<Runnable> createTasks(List<String> command,
496 String basePath,
497 Iterable<StressTestData> stressTestData,
498 ProgressTracker progressTracker) {
499 // raw64 flag overrides the source64 mode
500 if (raw64) {
501 source64 = Source64Mode.LONG;
502 }
503
504 final List<Runnable> tasks = new ArrayList<>();
505 for (final StressTestData testData : stressTestData) {
506 for (int trial = 1; trial <= testData.getTrials(); trial++) {
507 // Create the output file
508 final File output = createOutputFile(basePath, testData, trial);
509 if (output.exists()) {
510 // In case the file was created since the last check
511 if (outputMode == StressTestCommand.OutputMode.ERROR) {
512 throw new ApplicationException(createExistingFileMessage(output));
513 }
514 // Log the decision
515 LogUtils.info("%s existing output file: %s", outputMode, output);
516 if (outputMode == StressTestCommand.OutputMode.SKIP) {
517 continue;
518 }
519 }
520 // Create the generator. Explicitly create a seed so it can be recorded.
521 final byte[] seed = createSeed(testData.getRandomSource());
522 UniformRandomProvider rng = testData.createRNG(seed);
523
524 if (source64 == Source64Mode.LONG && !(rng instanceof RandomLongSource)) {
525 throw new ApplicationException("Not a 64-bit RNG: " + rng);
526 }
527
528 // Upper or lower bits from 64-bit generators must be created first before
529 // any further combination operators.
530 // Note this does not test source64 != Source64Mode.LONG as the full long
531 // output split into hi-lo or lo-hi is supported by the RngDataOutput.
532 if (rng instanceof RandomLongSource &&
533 (source64 == Source64Mode.HI || source64 == Source64Mode.LO || source64 == Source64Mode.INT)) {
534 rng = RNGUtils.createIntProvider((UniformRandomProvider & RandomLongSource) rng, source64);
535 }
536
537 // Combination generators. Mainly used for testing.
538 // These operations maintain the native output type (int/long).
539 if (xorHashCode) {
540 rng = RNGUtils.createHashCodeProvider(rng);
541 }
542 if (xorThreadLocalRandom) {
543 rng = RNGUtils.createThreadLocalRandomProvider(rng);
544 }
545 if (xorRandomSource != null) {
546 rng = RNGUtils.createXorProvider(
547 xorRandomSource.create(),
548 rng);
549 }
550 if (reverseBits) {
551 rng = RNGUtils.createReverseBitsProvider(rng);
552 }
553
554 // -------
555 // Note: Manipulation of the byte order for the platform is done during output.
556 // -------
557
558 // Run the test
559 final Runnable r = new StressTestTask(testData.getRandomSource(), rng, seed,
560 output, command, this, progressTracker);
561 tasks.add(r);
562 }
563 }
564 return tasks;
565 }
566
567 /**
568 * Creates the seed. This will call {@link RandomSource#createSeed()} unless a hex seed has
569 * been explicitly specified on the command line.
570 *
571 * @param randomSource Random source.
572 * @return the seed
573 */
574 private byte[] createSeed(RandomSource randomSource) {
575 if (byteSeed != null) {
576 try {
577 return Hex.decodeHex(byteSeed);
578 } catch (IllegalArgumentException ex) {
579 throw new ApplicationException("Invalid hex seed: " + ex.getMessage(), ex);
580 }
581 }
582 return randomSource.createSeed();
583 }
584
585 /**
586 * Submit the tasks to the executor service.
587 *
588 * @param service The executor service.
589 * @param tasks The list of tasks.
590 * @return the list of submitted tasks
591 */
592 private static List<Future<?>> submitTasks(ExecutorService service,
593 List<Runnable> tasks) {
594 final List<Future<?>> taskList = new ArrayList<>(tasks.size());
595 tasks.forEach(r -> taskList.add(service.submit(r)));
596 return taskList;
597 }
598
599 /**
600 * Class for reporting total progress of tasks to the console.
601 *
602 * <p>This stores the start and end time of tasks to allow it to estimate the time remaining
603 * for all the tests.
604 */
605 static class ProgressTracker {
606 /** The interval at which to report progress (in milliseconds). */
607 private static final long PROGRESS_INTERVAL = 500;
608
609 /** The total. */
610 private int total;
611 /** The level of parallelisation. */
612 private final int parallelTasks;
613 /** The task id. */
614 private int taskId;
615 /** The start time of tasks (in milliseconds from the epoch). */
616 private long[] startTimes;
617 /** The durations of all completed tasks (in milliseconds). This is sorted. */
618 private long[] sortedDurations;
619 /** The number of completed tasks. */
620 private int completed;
621 /** The timestamp of the next progress report. */
622 private long nextReportTimestamp;
623
624 /**
625 * Create a new instance. The total number of tasks must be initialized before use.
626 *
627 * @param parallelTasks The number of parallel tasks.
628 */
629 ProgressTracker(int parallelTasks) {
630 this.parallelTasks = parallelTasks;
631 }
632
633 /**
634 * Sets the total number of tasks to track.
635 *
636 * @param total The total tasks.
637 */
638 synchronized void setTotal(int total) {
639 this.total = total;
640 startTimes = new long[total];
641 sortedDurations = new long[total];
642 }
643
644 /**
645 * Submit a task for progress tracking. The task start time is recorded and the
646 * task is allocated an identifier.
647 *
648 * @return the task Id
649 */
650 int submitTask() {
651 int id;
652 synchronized (this) {
653 final long current = System.currentTimeMillis();
654 id = taskId++;
655 startTimes[id] = current;
656 reportProgress(current);
657 }
658 return id;
659 }
660
661 /**
662 * Signal that a task has completed. The task duration will be returned.
663 *
664 * @param id Task Id.
665 * @return the task time in milliseconds
666 */
667 long endTask(int id) {
668 long duration;
669 synchronized (this) {
670 final long current = System.currentTimeMillis();
671 duration = current - startTimes[id];
672 sortedDurations[completed++] = duration;
673 reportProgress(current);
674 }
675 return duration;
676 }
677
678 /**
679 * Report the progress. This uses the current state and should be done within a
680 * synchronized block.
681 *
682 * @param current Current time (in milliseconds).
683 */
684 private void reportProgress(long current) {
685 // Determine the current state of tasks
686 final int pending = total - taskId;
687 final int running = taskId - completed;
688
689 // Report progress in the following conditions:
690 // - All tasks have completed (i.e. the end); or
691 // - The current timestamp is above the next reporting time and either:
692 // -- The number of running tasks is equal to the level of parallel tasks
693 // (i.e. the system is running at capacity, so not the end of a task but the start
694 // of a new one)
695 // -- There are no pending tasks (i.e. the final submission or the end of a final task)
696 if (completed >= total ||
697 current >= nextReportTimestamp && (running == parallelTasks || pending == 0)) {
698 // Report
699 nextReportTimestamp = current + PROGRESS_INTERVAL;
700 final StringBuilder sb = createStringBuilderWithTimestamp(current, pending, running, completed);
701 try (Formatter formatter = new Formatter(sb)) {
702 formatter.format(" (%.2f%%)", 100.0 * completed / total);
703 appendRemaining(sb, current, pending, running);
704 LogUtils.info(sb.toString());
705 }
706 }
707 }
708
709 /**
710 * Creates the string builder for the progress message with a timestamp prefix.
711 *
712 * <pre>
713 * [HH:mm:ss] Pending [pending]. Running [running]. Completed [completed]
714 * </pre>
715 *
716 * @param current Current time (in milliseconds)
717 * @param pending Pending tasks.
718 * @param running Running tasks.
719 * @param completed Completed tasks.
720 * @return the string builder
721 */
722 private static StringBuilder createStringBuilderWithTimestamp(long current,
723 int pending, int running, int completed) {
724 final StringBuilder sb = new StringBuilder(80);
725 // Use local time to adjust for timezone
726 final LocalDateTime time = LocalDateTime.ofInstant(
727 Instant.ofEpochMilli(current), ZoneId.systemDefault());
728 sb.append('[');
729 append00(sb, time.getHour()).append(':');
730 append00(sb, time.getMinute()).append(':');
731 append00(sb, time.getSecond());
732 return sb.append("] Pending ").append(pending)
733 .append(". Running ").append(running)
734 .append(". Completed ").append(completed);
735 }
736
737 /**
738 * Compute an estimate of the time remaining and append to the progress. Updates
739 * the estimated time of arrival (ETA).
740 *
741 * @param sb String Builder.
742 * @param current Current time (in milliseconds)
743 * @param pending Pending tasks.
744 * @param running Running tasks.
745 * @return the string builder
746 */
747 private StringBuilder appendRemaining(StringBuilder sb, long current, int pending, int running) {
748 final long millis = getRemainingTime(current, pending, running);
749 if (millis == 0) {
750 // Unknown.
751 return sb;
752 }
753
754 // HH:mm:ss format
755 sb.append(". Remaining = ");
756 hms(sb, millis);
757 return sb;
758 }
759
760 /**
761 * Gets the remaining time (in milliseconds).
762 *
763 * @param current Current time (in milliseconds)
764 * @param pending Pending tasks.
765 * @param running Running tasks.
766 * @return the remaining time
767 */
768 private long getRemainingTime(long current, int pending, int running) {
769 final long taskTime = getEstimatedTaskTime();
770 if (taskTime == 0) {
771 // No estimate possible
772 return 0;
773 }
774
775 // The start times are sorted. This method assumes the most recent start times
776 // are still running tasks.
777 // If this is wrong (more recently submitted tasks finished early) the result
778 // is the estimate is too high. This could be corrected by storing the tasks
779 // that have finished and finding the times of only running tasks.
780
781 // The remaining time is:
782 // The time for all running tasks to finish
783 // + The time for pending tasks to run
784
785 // The id of the most recently submitted task.
786 // Guard with a minimum index of zero to get a valid index.
787 final int id = Math.max(0, taskId - 1);
788
789 // If there is a running task assume the youngest task is still running
790 // and estimate the time left.
791 long millis = (running == 0) ? 0 : getTimeRemaining(taskTime, current, startTimes[id]);
792
793 // If additional tasks must also be submitted then the time must include
794 // the estimated time for running tasks to finish before new submissions
795 // in the batch can be made.
796 // now
797 // s1 --------------->|
798 // s2 -----------|-------->
799 // s3 -------|------------>
800 // s4 -------------->
801 //
802
803 // Assume parallel batch execution.
804 // E.g. 3 additional tasks with parallelisation 4 is 0 batches
805 final int batches = pending / parallelTasks;
806 millis += batches * taskTime;
807
808 // Compute the expected end time of the final batch based on it starting when
809 // a currently running task ends.
810 // E.g. 3 remaining tasks requires the end time of the 3rd oldest running task.
811 final int remainder = pending % parallelTasks;
812 if (remainder != 0) {
813 // Guard with a minimum index of zero to get a valid index.
814 final int nthOldest = Math.max(0, id - parallelTasks + remainder);
815 millis += getTimeRemaining(taskTime, current, startTimes[nthOldest]);
816 }
817
818 return millis;
819 }
820
821 /**
822 * Gets the estimated task time.
823 *
824 * @return the estimated task time
825 */
826 private long getEstimatedTaskTime() {
827 Arrays.sort(sortedDurations, 0, completed);
828
829 // Return median of small lists. If no tasks have finished this returns zero.
830 // as the durations is zero initialized.
831 if (completed < 4) {
832 return sortedDurations[completed / 2];
833 }
834
835 // Dieharder and BigCrush run in approximately constant time.
836 // Speed varies with the speed of the RNG by about 2-fold, and
837 // for Dieharder it may repeat suspicious tests.
838 // PractRand may fail very fast for bad generators which skews
839 // using the mean or even the median. So look at the longest
840 // running tests.
841
842 // Find long running tests (>50% of the max run-time)
843 int upper = completed - 1;
844 final long halfMax = sortedDurations[upper] / 2;
845 // Binary search for the approximate cut-off
846 int lower = 0;
847 while (lower + 1 < upper) {
848 final int mid = (lower + upper) >>> 1;
849 if (sortedDurations[mid] < halfMax) {
850 lower = mid;
851 } else {
852 upper = mid;
853 }
854 }
855 // Use the median of all tasks within approximately 50% of the max.
856 return sortedDurations[(upper + completed - 1) / 2];
857 }
858
859 /**
860 * Gets the time remaining for the task.
861 *
862 * @param taskTime Estimated task time.
863 * @param current Current time.
864 * @param startTime Start time.
865 * @return the time remaining
866 */
867 private static long getTimeRemaining(long taskTime, long current, long startTime) {
868 final long endTime = startTime + taskTime;
869 // Ensure the time is positive in the case where the estimate is too low.
870 return Math.max(0, endTime - current);
871 }
872
873 /**
874 * Append the milliseconds using {@code HH::mm:ss} format.
875 *
876 * @param sb String Builder.
877 * @param millis Milliseconds.
878 * @return the string builder
879 */
880 static StringBuilder hms(StringBuilder sb, final long millis) {
881 final long hours = TimeUnit.MILLISECONDS.toHours(millis);
882 long minutes = TimeUnit.MILLISECONDS.toMinutes(millis);
883 long seconds = TimeUnit.MILLISECONDS.toSeconds(millis);
884 // Truncate to interval [0,59]
885 seconds -= TimeUnit.MINUTES.toSeconds(minutes);
886 minutes -= TimeUnit.HOURS.toMinutes(hours);
887
888 append00(sb, hours).append(':');
889 append00(sb, minutes).append(':');
890 return append00(sb, seconds);
891 }
892
893 /**
894 * Append the ticks to the string builder in the format {@code %02d}.
895 *
896 * @param sb String Builder.
897 * @param ticks Ticks.
898 * @return the string builder
899 */
900 static StringBuilder append00(StringBuilder sb, long ticks) {
901 if (ticks == 0) {
902 sb.append("00");
903 } else {
904 if (ticks < 10) {
905 sb.append('0');
906 }
907 sb.append(ticks);
908 }
909 return sb;
910 }
911 }
912
913 /**
914 * Pipes random numbers to the standard input of an analyzer executable.
915 */
916 private static class StressTestTask implements Runnable {
917 /** Comment prefix. */
918 private static final String C = "# ";
919 /** New line. */
920 private static final String N = System.lineSeparator();
921 /** The date format. */
922 private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
923 /** The SI units for bytes in increments of 10^3. */
924 private static final String[] SI_UNITS = {"B", "kB", "MB", "GB", "TB", "PB", "EB"};
925 /** The SI unit base for bytes (10^3). */
926 private static final long SI_UNIT_BASE = 1000;
927
928 /** The random source. */
929 private final RandomSource randomSource;
930 /** RNG to be tested. */
931 private final UniformRandomProvider rng;
932 /** The seed used to create the RNG. */
933 private final byte[] seed;
934 /** Output report file of the sub-process. */
935 private final File output;
936 /** The sub-process command to run. */
937 private final List<String> command;
938 /** The stress test command. */
939 private final StressTestCommand cmd;
940 /** The progress tracker. */
941 private final ProgressTracker progressTracker;
942
943 /** The count of bytes used by the sub-process. */
944 private AtomicLong bytesUsed = new AtomicLong();
945
946 /**
947 * Creates the task.
948 *
949 * @param randomSource The random source.
950 * @param rng RNG to be tested.
951 * @param seed The seed used to create the RNG.
952 * @param output Output report file.
953 * @param command The sub-process command to run.
954 * @param cmd The run command.
955 * @param progressTracker The progress tracker.
956 */
957 StressTestTask(RandomSource randomSource,
958 UniformRandomProvider rng,
959 byte[] seed,
960 File output,
961 List<String> command,
962 StressTestCommand cmd,
963 ProgressTracker progressTracker) {
964 this.randomSource = randomSource;
965 this.rng = rng;
966 this.seed = seed;
967 this.output = output;
968 this.command = command;
969 this.cmd = cmd;
970 this.progressTracker = progressTracker;
971 }
972
973 /** {@inheritDoc} */
974 @Override
975 public void run() {
976 if (cmd.isStopFileExists()) {
977 // Do nothing
978 return;
979 }
980
981 try {
982 printHeader();
983
984 Object exitValue;
985 long millis;
986 final int taskId = progressTracker.submitTask();
987 if (cmd.dryRun) {
988 // Do not do anything. Ignore the runtime.
989 exitValue = "N/A";
990 progressTracker.endTask(taskId);
991 millis = 0;
992 } else {
993 // Run the sub-process
994 exitValue = runSubProcess();
995 millis = progressTracker.endTask(taskId);
996 }
997
998 printFooter(millis, exitValue);
999
1000 } catch (final IOException ex) {
1001 throw new ApplicationException("Failed to run task: " + ex.getMessage(), ex);
1002 }
1003 }
1004
1005 /**
1006 * Run the analyzer sub-process command.
1007 *
1008 * @return The exit value.
1009 * @throws IOException Signals that an I/O exception has occurred.
1010 */
1011 private Integer runSubProcess() throws IOException {
1012 // Start test suite.
1013 final ProcessBuilder builder = new ProcessBuilder(command);
1014 builder.redirectOutput(ProcessBuilder.Redirect.appendTo(output));
1015 builder.redirectErrorStream(true);
1016 final Process testingProcess = builder.start();
1017
1018 // Use a custom data output to write the RNG.
1019 long writeCount = 0;
1020 try (RngDataOutput sink = RNGUtils.createDataOutput(rng, cmd.source64,
1021 testingProcess.getOutputStream(), cmd.bufferSize, cmd.byteOrder)) {
1022 for (;;) {
1023 sink.write(rng);
1024 writeCount++;
1025 }
1026 } catch (final IOException ignored) {
1027 // Hopefully getting here when the analyzing software terminates.
1028 }
1029
1030 bytesUsed.set(writeCount * cmd.bufferSize);
1031
1032 // Get the exit value.
1033 // Wait for up to 60 seconds.
1034 // If an application does not exit after this time then something is wrong.
1035 // Dieharder and TestU01 BigCrush exit within 1 second.
1036 // PractRand has been observed to take longer than 1 second. It calls std::exit(0)
1037 // when failing a test so the length of time may be related to freeing memory.
1038 return ProcessUtils.getExitValue(testingProcess, TimeUnit.SECONDS.toMillis(60));
1039 }
1040
1041 /**
1042 * Prints the header.
1043 *
1044 * @throws IOException if there was a problem opening or writing to the
1045 * {@code output} file.
1046 */
1047 private void printHeader() throws IOException {
1048 final StringBuilder sb = new StringBuilder(200);
1049 sb.append(C).append(N)
1050 .append(C).append("RandomSource: ").append(randomSource.name()).append(N)
1051 .append(C).append("RNG: ").append(rng.toString()).append(N)
1052 .append(C).append("Seed: ").append(Hex.encodeHex(seed)).append(N)
1053 .append(C).append(N)
1054
1055 // Match the output of 'java -version', e.g.
1056 // java version "1.8.0_131"
1057 // Java(TM) SE Runtime Environment (build 1.8.0_131-b11)
1058 // Java HotSpot(TM) 64-Bit Server VM (build 25.131-b11, mixed mode)
1059 .append(C).append("Java: ").append(System.getProperty("java.version")).append(N);
1060 appendNameAndVersion(sb, "Runtime", "java.runtime.name", "java.runtime.version");
1061 appendNameAndVersion(sb, "JVM", "java.vm.name", "java.vm.version", "java.vm.info");
1062
1063 sb.append(C).append("OS: ").append(System.getProperty("os.name"))
1064 .append(' ').append(System.getProperty("os.version"))
1065 .append(' ').append(System.getProperty("os.arch")).append(N)
1066 .append(C).append("Native byte-order: ").append(ByteOrder.nativeOrder()).append(N)
1067 .append(C).append("Output byte-order: ").append(cmd.byteOrder).append(N);
1068 if (rng instanceof RandomLongSource) {
1069 sb.append(C).append("64-bit output: ").append(cmd.source64).append(N);
1070 }
1071 sb.append(C).append(N)
1072 .append(C).append("Analyzer: ");
1073 for (final String s : command) {
1074 sb.append(s).append(' ');
1075 }
1076 sb.append(N)
1077 .append(C).append(N);
1078
1079 appendDate(sb, "Start").append(C).append(N);
1080
1081 write(sb, output, cmd.outputMode == StressTestCommand.OutputMode.APPEND);
1082 }
1083
1084 /**
1085 * Prints the footer.
1086 *
1087 * @param millis Duration of the run (in milliseconds).
1088 * @param exitValue The process exit value.
1089 * @throws IOException if there was a problem opening or writing to the
1090 * {@code output} file.
1091 */
1092 private void printFooter(long millis,
1093 Object exitValue) throws IOException {
1094 final StringBuilder sb = new StringBuilder(200);
1095 sb.append(C).append(N);
1096
1097 appendDate(sb, "End").append(C).append(N);
1098
1099 final long bytes = bytesUsed.get();
1100 sb.append(C).append("Exit value: ").append(exitValue).append(N)
1101 .append(C).append("Bytes used: ").append(bytes)
1102 .append(" >= 2^").append(log2(bytes))
1103 .append(" (").append(bytesToString(bytes)).append(')').append(N)
1104 .append(C).append(N);
1105
1106 final double duration = millis * 1e-3 / 60;
1107 sb.append(C).append("Test duration: ").append(duration).append(" minutes").append(N)
1108 .append(C).append(N);
1109
1110 write(sb, output, true);
1111 }
1112
1113 /**
1114 * Write the string builder to the output file.
1115 *
1116 * @param sb The string builder.
1117 * @param output The output file.
1118 * @param append Set to {@code true} to append to the file.
1119 * @throws IOException Signals that an I/O exception has occurred.
1120 */
1121 private static void write(StringBuilder sb,
1122 File output,
1123 boolean append) throws IOException {
1124 try (BufferedWriter w = append ?
1125 Files.newBufferedWriter(output.toPath(), StandardOpenOption.APPEND) :
1126 Files.newBufferedWriter(output.toPath())) {
1127 w.write(sb.toString());
1128 }
1129 }
1130
1131 /**
1132 * Append prefix and then name and version from System properties, finished with
1133 * a new line. The format is:
1134 *
1135 * <pre>{@code # <prefix>: <name> (build <version>[, <info>, ...])}</pre>
1136 *
1137 * @param sb The string builder.
1138 * @param prefix The prefix.
1139 * @param nameKey The name key.
1140 * @param versionKey The version key.
1141 * @param infoKeys The additional information keys.
1142 * @return the StringBuilder.
1143 */
1144 private static StringBuilder appendNameAndVersion(StringBuilder sb,
1145 String prefix,
1146 String nameKey,
1147 String versionKey,
1148 String... infoKeys) {
1149 appendPrefix(sb, prefix)
1150 .append(System.getProperty(nameKey, "?"))
1151 .append(" (build ")
1152 .append(System.getProperty(versionKey, "?"));
1153 for (final String key : infoKeys) {
1154 final String value = System.getProperty(key, "");
1155 if (!value.isEmpty()) {
1156 sb.append(", ").append(value);
1157 }
1158 }
1159 return sb.append(')').append(N);
1160 }
1161
1162 /**
1163 * Append a comment with the current date to the {@link StringBuilder}, finished with
1164 * a new line. The format is:
1165 *
1166 * <pre>{@code # <prefix>: yyyy-MM-dd HH:mm:ss}</pre>
1167 *
1168 * @param sb The StringBuilder.
1169 * @param prefix The prefix used before the formatted date, e.g. "Start".
1170 * @return the StringBuilder.
1171 */
1172 private static StringBuilder appendDate(StringBuilder sb,
1173 String prefix) {
1174 // Use local date format. It is not thread safe.
1175 final SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT, Locale.US);
1176 return appendPrefix(sb, prefix).append(dateFormat.format(new Date())).append(N);
1177 }
1178
1179 /**
1180 * Append a comment with the current date to the {@link StringBuilder}.
1181 *
1182 * <pre>
1183 * {@code # <prefix>: yyyy-MM-dd HH:mm:ss}
1184 * </pre>
1185 *
1186 * @param sb The StringBuilder.
1187 * @param prefix The prefix used before the formatted date, e.g. "Start".
1188 * @return the StringBuilder.
1189 */
1190 private static StringBuilder appendPrefix(StringBuilder sb,
1191 String prefix) {
1192 return sb.append(C).append(prefix).append(": ");
1193 }
1194
1195 /**
1196 * Convert bytes to a human readable string. Example output:
1197 *
1198 * <pre>
1199 * SI
1200 * 0: 0 B
1201 * 27: 27 B
1202 * 999: 999 B
1203 * 1000: 1.0 kB
1204 * 1023: 1.0 kB
1205 * 1024: 1.0 kB
1206 * 1728: 1.7 kB
1207 * 110592: 110.6 kB
1208 * 7077888: 7.1 MB
1209 * 452984832: 453.0 MB
1210 * 28991029248: 29.0 GB
1211 * 1855425871872: 1.9 TB
1212 * 9223372036854775807: 9.2 EB (Long.MAX_VALUE)
1213 * </pre>
1214 *
1215 * @param bytes the bytes
1216 * @return the string
1217 * @see <a
1218 * href="https://stackoverflow.com/questions/3758606/how-to-convert-byte-size-into-human-readable-format-in-java">How
1219 * to convert byte size into human readable format in java?</a>
1220 */
1221 static String bytesToString(long bytes) {
1222 // When using the smallest unit no decimal point is needed, because it's the exact number.
1223 if (bytes < ONE_THOUSAND) {
1224 return bytes + " " + SI_UNITS[0];
1225 }
1226
1227 final int exponent = (int) (Math.log(bytes) / Math.log(SI_UNIT_BASE));
1228 final String unit = SI_UNITS[exponent];
1229 return String.format(Locale.US, "%.1f %s", bytes / Math.pow(SI_UNIT_BASE, exponent), unit);
1230 }
1231
1232 /**
1233 * Return the log2 of a {@code long} value rounded down to a power of 2.
1234 *
1235 * @param x the value
1236 * @return {@code floor(log2(x))}
1237 */
1238 static int log2(long x) {
1239 return 63 - Long.numberOfLeadingZeros(x);
1240 }
1241 }
1242 }