View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.commons.io.input;
19  
20  import java.io.IOException;
21  import java.io.InputStream;
22  import java.io.InterruptedIOException;
23  import java.time.Duration;
24  import java.time.temporal.ChronoUnit;
25  import java.util.Objects;
26  import java.util.concurrent.TimeUnit;
27  
28  /**
29   * Provides bandwidth throttling on an InputStream as a filter input stream. The throttling examines the number of bytes read from the underlying InputStream,
30   * and sleeps for a time interval if the byte-transfer is found to exceed the specified maximum rate. Thus, while the read-rate might exceed the maximum for a
31   * short interval, the average tends towards the specified maximum, overall.
32   * <p>
33   * To build an instance, call {@link #builder()}.
34   * </p>
35   * <p>
36   * Inspired by Apache HBase's class of the same name.
37   * </p>
38   *
39   * @see Builder
40   * @since 2.16.0
41   */
42  public final class ThrottledInputStream extends CountingInputStream {
43  
44      // @formatter:off
45      /**
46       * Builds a new {@link ThrottledInputStream}.
47       *
48       * <h2>Using NIO</h2>
49       * <pre>{@code
50       * ThrottledInputStream in = ThrottledInputStream.builder()
51       *   .setPath(Paths.get("MyFile.xml"))
52       *   .setMaxBytes(100_000, ChronoUnit.SECONDS)
53       *   .get();
54       * }
55       * </pre>
56       * <h2>Using IO</h2>
57       * <pre>{@code
58       * ThrottledInputStream in = ThrottledInputStream.builder()
59       *   .setFile(new File("MyFile.xml"))
60       *   .setMaxBytes(100_000, ChronoUnit.SECONDS)
61       *   .get();
62       * }
63       * </pre>
64       * <pre>{@code
65       * ThrottledInputStream in = ThrottledInputStream.builder()
66       *   .setInputStream(inputStream)
67       *   .setMaxBytes(100_000, ChronoUnit.SECONDS)
68       *   .get();
69       * }
70       * </pre>
71       *
72       * @see #get()
73       */
74      // @formatter:on
75      public static class Builder extends AbstractBuilder<ThrottledInputStream, Builder> {
76  
77          /**
78           * Effectively not throttled.
79           */
80          private double maxBytesPerSecond = Double.MAX_VALUE;
81  
82          /**
83           * Constructs a new builder of {@link ThrottledInputStream}.
84           */
85          public Builder() {
86              // empty
87          }
88  
89          /**
90           * Builds a new {@link ThrottledInputStream}.
91           * <p>
92           * You must set an aspect that supports {@link #getInputStream()}, otherwise, this method throws an exception.
93           * </p>
94           * <p>
95           * This builder uses the following aspects:
96           * </p>
97           * <ul>
98           * <li>{@link #getInputStream()} gets the target aspect.</li>
99           * <li>maxBytesPerSecond</li>
100          * </ul>
101          *
102          * @return a new instance.
103          * @throws IllegalStateException         if the {@code origin} is {@code null}.
104          * @throws UnsupportedOperationException if the origin cannot be converted to an {@link InputStream}.
105          * @throws IOException                   if an I/O error occurs converting to an {@link InputStream} using {@link #getInputStream()}.
106          * @see #getInputStream()
107          * @see #getUnchecked()
108          */
109         @Override
110         public ThrottledInputStream get() throws IOException {
111             return new ThrottledInputStream(this);
112         }
113 
114         // package private for testing.
115         double getMaxBytesPerSecond() {
116             return maxBytesPerSecond;
117         }
118 
119         /**
120          * Sets the maximum bytes per time period unit.
121          * <p>
122          * For example, to throttle reading to 100K per second, use:
123          * </p>
124          * <pre>
125          * builder.setMaxBytes(100_000, ChronoUnit.SECONDS)
126          * </pre>
127          * <p>
128          * To test idle timeouts for example, use 1 byte per minute, 1 byte per 30 seconds, and so on.
129          * </p>
130          *
131          * @param value the maximum bytes
132          * @param chronoUnit a duration scale goal.
133          * @return this instance.
134          * @throws IllegalArgumentException Thrown if maxBytesPerSecond &lt;= 0.
135          * @since 2.19.0
136          */
137         public Builder setMaxBytes(final long value, final ChronoUnit chronoUnit) {
138             setMaxBytes(value, chronoUnit.getDuration());
139             return asThis();
140         }
141 
142         /**
143          * Sets the maximum bytes per duration.
144          * <p>
145          * For example, to throttle reading to 100K per second, use:
146          * </p>
147          * <pre>
148          * builder.setMaxBytes(100_000, Duration.ofSeconds(1))
149          * </pre>
150          * <p>
151          * To test idle timeouts for example, use 1 byte per minute, 1 byte per 30 seconds, and so on.
152          * </p>
153          *
154          * @param value the maximum bytes
155          * @param duration a duration goal.
156          * @return this instance.
157          * @throws IllegalArgumentException Thrown if maxBytesPerSecond &lt;= 0.
158          */
159         // Consider making public in the future
160         Builder setMaxBytes(final long value, final Duration duration) {
161             setMaxBytesPerSecond((double) Objects.requireNonNull(duration, "duration").toMillis() / 1_000 * value);
162             return asThis();
163         }
164 
165         /**
166          * Sets the maximum bytes per second.
167          *
168          * @param maxBytesPerSecond the maximum bytes per second.
169          * @return this instance.
170          * @throws IllegalArgumentException Thrown if maxBytesPerSecond &lt;= 0.
171          */
172         private Builder setMaxBytesPerSecond(final double maxBytesPerSecond) {
173             if (maxBytesPerSecond <= 0) {
174                 throw new IllegalArgumentException("Bandwidth " + maxBytesPerSecond + " must be > 0.");
175             }
176             this.maxBytesPerSecond = maxBytesPerSecond;
177             return asThis();
178         }
179 
180         /**
181          * Sets the maximum bytes per second.
182          *
183          * @param maxBytesPerSecond the maximum bytes per second.
184          * @throws IllegalArgumentException Thrown if maxBytesPerSecond &lt;= 0.
185          */
186         public void setMaxBytesPerSecond(final long maxBytesPerSecond) {
187             setMaxBytesPerSecond((double) maxBytesPerSecond);
188             // TODO 3.0
189             // return asThis();
190         }
191 
192     }
193 
194     /**
195      * Constructs a new {@link Builder}.
196      *
197      * @return a new {@link Builder}.
198      */
199     public static Builder builder() {
200         return new Builder();
201     }
202 
203     // package private for testing
204     static long toSleepMillis(final long bytesRead, final long elapsedMillis, final double maxBytesPerSec) {
205         if (bytesRead <= 0 || maxBytesPerSec <= 0 || elapsedMillis == 0) {
206             return 0;
207         }
208         // We use this class to load the single source file, so the bytesRead
209         // and maxBytesPerSec aren't greater than Double.MAX_VALUE.
210         // We can get the precise sleep time by using the double value.
211         final long millis = (long) (bytesRead / maxBytesPerSec * 1000 - elapsedMillis);
212         if (millis <= 0) {
213             return 0;
214         }
215         return millis;
216     }
217 
218     private final double maxBytesPerSecond;
219     private final long startTime = System.currentTimeMillis();
220     private Duration totalSleepDuration = Duration.ZERO;
221 
222     private ThrottledInputStream(final Builder builder) throws IOException {
223         super(builder);
224         if (builder.maxBytesPerSecond <= 0) {
225             throw new IllegalArgumentException("Bandwidth " + builder.maxBytesPerSecond + " is invalid.");
226         }
227         this.maxBytesPerSecond = builder.maxBytesPerSecond;
228     }
229 
230     @Override
231     protected void beforeRead(final int n) throws IOException {
232         throttle();
233     }
234 
235     /**
236      * Gets the read-rate from this stream, since creation. Calculated as bytesRead/elapsedTimeSinceStart.
237      *
238      * @return Read rate, in bytes/sec.
239      */
240     private long getBytesPerSecond() {
241         final long elapsedSeconds = (System.currentTimeMillis() - startTime) / 1000;
242         if (elapsedSeconds == 0) {
243             return getByteCount();
244         }
245         return getByteCount() / elapsedSeconds;
246     }
247 
248     // package private for testing.
249     double getMaxBytesPerSecond() {
250         return maxBytesPerSecond;
251     }
252 
253     private long getSleepMillis() {
254         return toSleepMillis(getByteCount(), System.currentTimeMillis() - startTime, maxBytesPerSecond);
255     }
256 
257     /**
258      * Gets the total duration spent in sleep.
259      *
260      * @return Duration spent in sleep.
261      */
262     // package private for testing
263     Duration getTotalSleepDuration() {
264         return totalSleepDuration;
265     }
266 
267     private void throttle() throws InterruptedIOException {
268         final long sleepMillis = getSleepMillis();
269         if (sleepMillis > 0) {
270             totalSleepDuration = totalSleepDuration.plus(sleepMillis, ChronoUnit.MILLIS);
271             try {
272                 TimeUnit.MILLISECONDS.sleep(sleepMillis);
273             } catch (final InterruptedException e) {
274                 throw new InterruptedIOException("Thread aborted");
275             }
276         }
277     }
278 
279     /** {@inheritDoc} */
280     @Override
281     public String toString() {
282         return "ThrottledInputStream[bytesRead=" + getByteCount() + ", maxBytesPerSec=" + maxBytesPerSecond + ", bytesPerSec=" + getBytesPerSecond()
283                 + ", totalSleepDuration=" + totalSleepDuration + ']';
284     }
285 }