ThrottledInputStream.java

  1. /*
  2.  * Licensed to the Apache Software Foundation (ASF) under one or more
  3.  * contributor license agreements.  See the NOTICE file distributed with
  4.  * this work for additional information regarding copyright ownership.
  5.  * The ASF licenses this file to You under the Apache License, Version 2.0
  6.  * (the "License"); you may not use this file except in compliance with
  7.  * the License.  You may obtain a copy of the License at
  8.  *
  9.  *      http://www.apache.org/licenses/LICENSE-2.0
  10.  *
  11.  * Unless required by applicable law or agreed to in writing, software
  12.  * distributed under the License is distributed on an "AS IS" BASIS,
  13.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14.  * See the License for the specific language governing permissions and
  15.  * limitations under the License.
  16.  */

  17. package org.apache.commons.io.input;

  18. import java.io.IOException;
  19. import java.io.InputStream;
  20. import java.io.InterruptedIOException;
  21. import java.time.Duration;
  22. import java.time.temporal.ChronoUnit;
  23. import java.util.Objects;
  24. import java.util.concurrent.TimeUnit;

  25. /**
  26.  * Provides bandwidth throttling on an InputStream as a filter input stream. The throttling examines the number of bytes read from the underlying InputStream,
  27.  * and sleeps for a time interval if the byte-transfer is found to exceed the specified maximum rate. Thus, while the read-rate might exceed the maximum for a
  28.  * short interval, the average tends towards the specified maximum, overall.
  29.  * <p>
  30.  * To build an instance, call {@link #builder()}.
  31.  * </p>
  32.  * <p>
  33.  * Inspired by Apache HBase's class of the same name.
  34.  * </p>
  35.  *
  36.  * @see Builder
  37.  * @since 2.16.0
  38.  */
  39. public final class ThrottledInputStream extends CountingInputStream {

  40.     // @formatter:off
  41.     /**
  42.      * Builds a new {@link ThrottledInputStream}.
  43.      *
  44.      * <h2>Using NIO</h2>
  45.      * <pre>{@code
  46.      * ThrottledInputStream in = ThrottledInputStream.builder()
  47.      *   .setPath(Paths.get("MyFile.xml"))
  48.      *   .setMaxBytes(100_000, ChronoUnit.SECONDS)
  49.      *   .get();
  50.      * }
  51.      * </pre>
  52.      * <h2>Using IO</h2>
  53.      * <pre>{@code
  54.      * ThrottledInputStream in = ThrottledInputStream.builder()
  55.      *   .setFile(new File("MyFile.xml"))
  56.      *   .setMaxBytes(100_000, ChronoUnit.SECONDS)
  57.      *   .get();
  58.      * }
  59.      * </pre>
  60.      * <pre>{@code
  61.      * ThrottledInputStream in = ThrottledInputStream.builder()
  62.      *   .setInputStream(inputStream)
  63.      *   .setMaxBytes(100_000, ChronoUnit.SECONDS)
  64.      *   .get();
  65.      * }
  66.      * </pre>
  67.      *
  68.      * @see #get()
  69.      */
  70.     // @formatter:on
  71.     public static class Builder extends AbstractBuilder<ThrottledInputStream, Builder> {

  72.         /**
  73.          * Effectively not throttled.
  74.          */
  75.         private double maxBytesPerSecond = Double.MAX_VALUE;

  76.         /**
  77.          * Constructs a new builder of {@link ThrottledInputStream}.
  78.          */
  79.         public Builder() {
  80.             // empty
  81.         }

  82.         /**
  83.          * Builds a new {@link ThrottledInputStream}.
  84.          * <p>
  85.          * You must set an aspect that supports {@link #getInputStream()}, otherwise, this method throws an exception.
  86.          * </p>
  87.          * <p>
  88.          * This builder uses the following aspects:
  89.          * </p>
  90.          * <ul>
  91.          * <li>{@link #getInputStream()} gets the target aspect.</li>
  92.          * <li>maxBytesPerSecond</li>
  93.          * </ul>
  94.          *
  95.          * @return a new instance.
  96.          * @throws IllegalStateException         if the {@code origin} is {@code null}.
  97.          * @throws UnsupportedOperationException if the origin cannot be converted to an {@link InputStream}.
  98.          * @throws IOException                   if an I/O error occurs converting to an {@link InputStream} using {@link #getInputStream()}.
  99.          * @see #getInputStream()
  100.          * @see #getUnchecked()
  101.          */
  102.         @Override
  103.         public ThrottledInputStream get() throws IOException {
  104.             return new ThrottledInputStream(this);
  105.         }

  106.         // package private for testing.
  107.         double getMaxBytesPerSecond() {
  108.             return maxBytesPerSecond;
  109.         }

  110.         /**
  111.          * Sets the maximum bytes per time period unit.
  112.          * <p>
  113.          * For example, to throttle reading to 100K per second, use:
  114.          * </p>
  115.          * <pre>
  116.          * builder.setMaxBytes(100_000, ChronoUnit.SECONDS)
  117.          * </pre>
  118.          * <p>
  119.          * To test idle timeouts for example, use 1 byte per minute, 1 byte per 30 seconds, and so on.
  120.          * </p>
  121.          *
  122.          * @param value the maximum bytes
  123.          * @param chronoUnit a duration scale goal.
  124.          * @return this instance.
  125.          * @throws IllegalArgumentException Thrown if maxBytesPerSecond &lt;= 0.
  126.          * @since 2.19.0
  127.          */
  128.         public Builder setMaxBytes(final long value, final ChronoUnit chronoUnit) {
  129.             setMaxBytes(value, chronoUnit.getDuration());
  130.             return asThis();
  131.         }

  132.         /**
  133.          * Sets the maximum bytes per duration.
  134.          * <p>
  135.          * For example, to throttle reading to 100K per second, use:
  136.          * </p>
  137.          * <pre>
  138.          * builder.setMaxBytes(100_000, Duration.ofSeconds(1))
  139.          * </pre>
  140.          * <p>
  141.          * To test idle timeouts for example, use 1 byte per minute, 1 byte per 30 seconds, and so on.
  142.          * </p>
  143.          *
  144.          * @param value the maximum bytes
  145.          * @param duration a duration goal.
  146.          * @return this instance.
  147.          * @throws IllegalArgumentException Thrown if maxBytesPerSecond &lt;= 0.
  148.          */
  149.         // Consider making public in the future
  150.         Builder setMaxBytes(final long value, final Duration duration) {
  151.             setMaxBytesPerSecond((double) Objects.requireNonNull(duration, "duration").toMillis() / 1_000 * value);
  152.             return asThis();
  153.         }

  154.         /**
  155.          * Sets the maximum bytes per second.
  156.          *
  157.          * @param maxBytesPerSecond the maximum bytes per second.
  158.          * @return this instance.
  159.          * @throws IllegalArgumentException Thrown if maxBytesPerSecond &lt;= 0.
  160.          */
  161.         private Builder setMaxBytesPerSecond(final double maxBytesPerSecond) {
  162.             if (maxBytesPerSecond <= 0) {
  163.                 throw new IllegalArgumentException("Bandwidth " + maxBytesPerSecond + " must be > 0.");
  164.             }
  165.             this.maxBytesPerSecond = maxBytesPerSecond;
  166.             return asThis();
  167.         }

  168.         /**
  169.          * Sets the maximum bytes per second.
  170.          *
  171.          * @param maxBytesPerSecond the maximum bytes per second.
  172.          * @throws IllegalArgumentException Thrown if maxBytesPerSecond &lt;= 0.
  173.          */
  174.         public void setMaxBytesPerSecond(final long maxBytesPerSecond) {
  175.             setMaxBytesPerSecond((double) maxBytesPerSecond);
  176.             // TODO 3.0
  177.             // return asThis();
  178.         }

  179.     }

  180.     /**
  181.      * Constructs a new {@link Builder}.
  182.      *
  183.      * @return a new {@link Builder}.
  184.      */
  185.     public static Builder builder() {
  186.         return new Builder();
  187.     }

  188.     // package private for testing
  189.     static long toSleepMillis(final long bytesRead, final long elapsedMillis, final double maxBytesPerSec) {
  190.         if (bytesRead <= 0 || maxBytesPerSec <= 0 || elapsedMillis == 0) {
  191.             return 0;
  192.         }
  193.         // We use this class to load the single source file, so the bytesRead
  194.         // and maxBytesPerSec aren't greater than Double.MAX_VALUE.
  195.         // We can get the precise sleep time by using the double value.
  196.         final long millis = (long) (bytesRead / maxBytesPerSec * 1000 - elapsedMillis);
  197.         if (millis <= 0) {
  198.             return 0;
  199.         }
  200.         return millis;
  201.     }

  202.     private final double maxBytesPerSecond;
  203.     private final long startTime = System.currentTimeMillis();
  204.     private Duration totalSleepDuration = Duration.ZERO;

  205.     private ThrottledInputStream(final Builder builder) throws IOException {
  206.         super(builder);
  207.         if (builder.maxBytesPerSecond <= 0) {
  208.             throw new IllegalArgumentException("Bandwidth " + builder.maxBytesPerSecond + " is invalid.");
  209.         }
  210.         this.maxBytesPerSecond = builder.maxBytesPerSecond;
  211.     }

  212.     @Override
  213.     protected void beforeRead(final int n) throws IOException {
  214.         throttle();
  215.     }

  216.     /**
  217.      * Gets the read-rate from this stream, since creation. Calculated as bytesRead/elapsedTimeSinceStart.
  218.      *
  219.      * @return Read rate, in bytes/sec.
  220.      */
  221.     private long getBytesPerSecond() {
  222.         final long elapsedSeconds = (System.currentTimeMillis() - startTime) / 1000;
  223.         if (elapsedSeconds == 0) {
  224.             return getByteCount();
  225.         }
  226.         return getByteCount() / elapsedSeconds;
  227.     }

  228.     // package private for testing.
  229.     double getMaxBytesPerSecond() {
  230.         return maxBytesPerSecond;
  231.     }

  232.     private long getSleepMillis() {
  233.         return toSleepMillis(getByteCount(), System.currentTimeMillis() - startTime, maxBytesPerSecond);
  234.     }

  235.     /**
  236.      * Gets the total duration spent in sleep.
  237.      *
  238.      * @return Duration spent in sleep.
  239.      */
  240.     // package private for testing
  241.     Duration getTotalSleepDuration() {
  242.         return totalSleepDuration;
  243.     }

  244.     private void throttle() throws InterruptedIOException {
  245.         final long sleepMillis = getSleepMillis();
  246.         if (sleepMillis > 0) {
  247.             totalSleepDuration = totalSleepDuration.plus(sleepMillis, ChronoUnit.MILLIS);
  248.             try {
  249.                 TimeUnit.MILLISECONDS.sleep(sleepMillis);
  250.             } catch (final InterruptedException e) {
  251.                 throw new InterruptedIOException("Thread aborted");
  252.             }
  253.         }
  254.     }

  255.     /** {@inheritDoc} */
  256.     @Override
  257.     public String toString() {
  258.         return "ThrottledInputStream[bytesRead=" + getByteCount() + ", maxBytesPerSec=" + maxBytesPerSecond + ", bytesPerSec=" + getBytesPerSecond()
  259.                 + ", totalSleepDuration=" + totalSleepDuration + ']';
  260.     }
  261. }