View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.commons.io.input;
19  
20  import java.io.IOException;
21  import java.io.InputStream;
22  import java.io.InterruptedIOException;
23  import java.time.Duration;
24  import java.time.temporal.ChronoUnit;
25  import java.util.concurrent.TimeUnit;
26  
27  import org.apache.commons.io.build.AbstractStreamBuilder;
28  
29  /**
30   * Provides bandwidth throttling on a specified InputStream. It is implemented as a wrapper on top of another InputStream instance. The throttling works by
31   * examining the number of bytes read from the underlying InputStream from the beginning, and sleep()ing for a time interval if the byte-transfer is found
32   * exceed the specified tolerable maximum. (Thus, while the read-rate might exceed the maximum for a short interval, the average tends towards the
33   * specified maximum, overall.)
34   * <p>
35   * To build an instance, see {@link Builder}
36   * </p>
37   * <p>
38   * Inspired by Apache HBase's class of the same name.
39   * </p>
40   *
41   * @see Builder
42   * @since 2.16.0
43   */
44  public final class ThrottledInputStream extends CountingInputStream {
45  
46      // @formatter:off
47      /**
48       * Builds a new {@link ThrottledInputStream}.
49       *
50       * <h2>Using NIO</h2>
51       * <pre>{@code
52       * ThrottledInputStream in = ThrottledInputStream.builder()
53       *   .setPath(Paths.get("MyFile.xml"))
54       *   .setMaxBytesPerSecond(100_000)
55       *   .get();
56       * }
57       * </pre>
58       * <h2>Using IO</h2>
59       * <pre>{@code
60       * ThrottledInputStream in = ThrottledInputStream.builder()
61       *   .setFile(new File("MyFile.xml"))
62       *   .setMaxBytesPerSecond(100_000)
63       *   .get();
64       * }
65       * </pre>
66       * <pre>{@code
67       * ThrottledInputStream in = ThrottledInputStream.builder()
68       *   .setInputStream(inputStream)
69       *   .setMaxBytesPerSecond(100_000)
70       *   .get();
71       * }
72       * </pre>
73       *
74       * @see #get()
75       */
76      // @formatter:on
77      public static class Builder extends AbstractStreamBuilder<ThrottledInputStream, Builder> {
78  
79          /**
80           * Effectively not throttled.
81           */
82          private long maxBytesPerSecond = Long.MAX_VALUE;
83  
84          /**
85           * Builds a new {@link ThrottledInputStream}.
86           * <p>
87           * You must set input that supports {@link #getInputStream()}, otherwise, this method throws an exception.
88           * </p>
89           * <p>
90           * This builder use the following aspects:
91           * </p>
92           * <ul>
93           * <li>{@link #getInputStream()}</li>
94           * <li>maxBytesPerSecond</li>
95           * </ul>
96           *
97           * @return a new instance.
98           * @throws IllegalStateException         if the {@code origin} is {@code null}.
99           * @throws UnsupportedOperationException if the origin cannot be converted to an {@link InputStream}.
100          * @throws IOException                   if an I/O error occurs.
101          * @see #getInputStream()
102          */
103         @SuppressWarnings("resource")
104         @Override
105         public ThrottledInputStream get() throws IOException {
106             return new ThrottledInputStream(getInputStream(), maxBytesPerSecond);
107         }
108 
109         /**
110          * Sets the maximum bytes per second.
111          *
112          * @param maxBytesPerSecond the maximum bytes per second.
113          */
114         public void setMaxBytesPerSecond(final long maxBytesPerSecond) {
115             this.maxBytesPerSecond = maxBytesPerSecond;
116         }
117 
118     }
119 
120     /**
121      * Constructs a new {@link Builder}.
122      *
123      * @return a new {@link Builder}.
124      */
125     public static Builder builder() {
126         return new Builder();
127     }
128 
129     static long toSleepMillis(final long bytesRead, final long maxBytesPerSec, final long elapsedMillis) {
130         assert elapsedMillis >= 0 : "The elapsed time should be greater or equal to zero";
131         if (bytesRead <= 0 || maxBytesPerSec <= 0 || elapsedMillis == 0) {
132             return 0;
133         }
134         // We use this class to load the single source file, so the bytesRead
135         // and maxBytesPerSec aren't greater than Double.MAX_VALUE.
136         // We can get the precise sleep time by using the double value.
137         final long millis = (long) ((double) bytesRead / (double) maxBytesPerSec * 1000 - elapsedMillis);
138         if (millis <= 0) {
139             return 0;
140         }
141         return millis;
142     }
143 
144     private final long maxBytesPerSecond;
145     private final long startTime = System.currentTimeMillis();
146     private Duration totalSleepDuration = Duration.ZERO;
147 
148     private ThrottledInputStream(final InputStream proxy, final long maxBytesPerSecond) {
149         super(proxy);
150         assert maxBytesPerSecond > 0 : "Bandwidth " + maxBytesPerSecond + " is invalid.";
151         this.maxBytesPerSecond = maxBytesPerSecond;
152     }
153 
154     @Override
155     protected void beforeRead(final int n) throws IOException {
156         throttle();
157     }
158 
159     /**
160      * Gets the read-rate from this stream, since creation. Calculated as bytesRead/elapsedTimeSinceStart.
161      *
162      * @return Read rate, in bytes/sec.
163      */
164     private long getBytesPerSecond() {
165         final long elapsedSeconds = (System.currentTimeMillis() - startTime) / 1000;
166         if (elapsedSeconds == 0) {
167             return getByteCount();
168         }
169         return getByteCount() / elapsedSeconds;
170     }
171 
172     private long getSleepMillis() {
173         return toSleepMillis(getByteCount(), maxBytesPerSecond, System.currentTimeMillis() - startTime);
174     }
175 
176     /**
177      * Gets the total duration spent in sleep.
178      *
179      * @return Duration spent in sleep.
180      */
181     Duration getTotalSleepDuration() {
182         return totalSleepDuration;
183     }
184 
185     private void throttle() throws InterruptedIOException {
186         final long sleepMillis = getSleepMillis();
187         if (sleepMillis > 0) {
188             totalSleepDuration = totalSleepDuration.plus(sleepMillis, ChronoUnit.MILLIS);
189             try {
190                 TimeUnit.MILLISECONDS.sleep(sleepMillis);
191             } catch (final InterruptedException e) {
192                 throw new InterruptedIOException("Thread aborted");
193             }
194         }
195     }
196 
197     /** {@inheritDoc} */
198     @Override
199     public String toString() {
200         return "ThrottledInputStream[bytesRead=" + getByteCount() + ", maxBytesPerSec=" + maxBytesPerSecond + ", bytesPerSec=" + getBytesPerSecond()
201                 + ", totalSleepDuration=" + totalSleepDuration + ']';
202     }
203 }