1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 package org.apache.commons.io.input;
19
20 import java.io.IOException;
21 import java.io.InputStream;
22 import java.io.InterruptedIOException;
23 import java.time.Duration;
24 import java.time.temporal.ChronoUnit;
25 import java.util.Objects;
26 import java.util.concurrent.TimeUnit;
27
28 /**
29 * Provides bandwidth throttling on an InputStream as a filter input stream. The throttling examines the number of bytes read from the underlying InputStream,
30 * and sleeps for a time interval if the byte-transfer is found to exceed the specified maximum rate. Thus, while the read-rate might exceed the maximum for a
31 * short interval, the average tends towards the specified maximum, overall.
32 * <p>
33 * To build an instance, call {@link #builder()}.
34 * </p>
35 * <p>
36 * Inspired by Apache HBase's class of the same name.
37 * </p>
38 *
39 * @see Builder
40 * @since 2.16.0
41 */
42 public final class ThrottledInputStream extends CountingInputStream {
43
44 // @formatter:off
45 /**
46 * Builds a new {@link ThrottledInputStream}.
47 *
48 * <h2>Using NIO</h2>
49 * <pre>{@code
50 * ThrottledInputStream in = ThrottledInputStream.builder()
51 * .setPath(Paths.get("MyFile.xml"))
52 * .setMaxBytes(100_000, ChronoUnit.SECONDS)
53 * .get();
54 * }
55 * </pre>
56 * <h2>Using IO</h2>
57 * <pre>{@code
58 * ThrottledInputStream in = ThrottledInputStream.builder()
59 * .setFile(new File("MyFile.xml"))
60 * .setMaxBytes(100_000, ChronoUnit.SECONDS)
61 * .get();
62 * }
63 * </pre>
64 * <pre>{@code
65 * ThrottledInputStream in = ThrottledInputStream.builder()
66 * .setInputStream(inputStream)
67 * .setMaxBytes(100_000, ChronoUnit.SECONDS)
68 * .get();
69 * }
70 * </pre>
71 *
72 * @see #get()
73 */
74 // @formatter:on
75 public static class Builder extends AbstractBuilder<ThrottledInputStream, Builder> {
76
77 /**
78 * Effectively not throttled.
79 */
80 private double maxBytesPerSecond = Double.MAX_VALUE;
81
82 /**
83 * Constructs a new builder of {@link ThrottledInputStream}.
84 */
85 public Builder() {
86 // empty
87 }
88
89 /**
90 * Builds a new {@link ThrottledInputStream}.
91 * <p>
92 * You must set an aspect that supports {@link #getInputStream()}, otherwise, this method throws an exception.
93 * </p>
94 * <p>
95 * This builder uses the following aspects:
96 * </p>
97 * <ul>
98 * <li>{@link #getInputStream()} gets the target aspect.</li>
99 * <li>maxBytesPerSecond</li>
100 * </ul>
101 *
102 * @return a new instance.
103 * @throws IllegalStateException if the {@code origin} is {@code null}.
104 * @throws UnsupportedOperationException if the origin cannot be converted to an {@link InputStream}.
105 * @throws IOException if an I/O error occurs converting to an {@link InputStream} using {@link #getInputStream()}.
106 * @see #getInputStream()
107 * @see #getUnchecked()
108 */
109 @Override
110 public ThrottledInputStream get() throws IOException {
111 return new ThrottledInputStream(this);
112 }
113
114 // package private for testing.
115 double getMaxBytesPerSecond() {
116 return maxBytesPerSecond;
117 }
118
119 /**
120 * Sets the maximum bytes per time period unit.
121 * <p>
122 * For example, to throttle reading to 100K per second, use:
123 * </p>
124 * <pre>
125 * builder.setMaxBytes(100_000, ChronoUnit.SECONDS)
126 * </pre>
127 * <p>
128 * To test idle timeouts for example, use 1 byte per minute, 1 byte per 30 seconds, and so on.
129 * </p>
130 *
131 * @param value the maximum bytes
132 * @param chronoUnit a duration scale goal.
133 * @return {@code this} instance.
134 * @throws IllegalArgumentException Thrown if maxBytesPerSecond <= 0.
135 * @since 2.19.0
136 */
137 public Builder setMaxBytes(final long value, final ChronoUnit chronoUnit) {
138 setMaxBytes(value, chronoUnit.getDuration());
139 return asThis();
140 }
141
142 /**
143 * Sets the maximum bytes per duration.
144 * <p>
145 * For example, to throttle reading to 100K per second, use:
146 * </p>
147 * <pre>
148 * builder.setMaxBytes(100_000, Duration.ofSeconds(1))
149 * </pre>
150 * <p>
151 * To test idle timeouts for example, use 1 byte per minute, 1 byte per 30 seconds, and so on.
152 * </p>
153 *
154 * @param value the maximum bytes
155 * @param duration a duration goal.
156 * @return {@code this} instance.
157 * @throws IllegalArgumentException Thrown if maxBytesPerSecond <= 0.
158 */
159 // Consider making public in the future
160 Builder setMaxBytes(final long value, final Duration duration) {
161 setMaxBytesPerSecond((double) Objects.requireNonNull(duration, "duration").toMillis() / 1_000 * value);
162 return asThis();
163 }
164
165 /**
166 * Sets the maximum bytes per second.
167 *
168 * @param maxBytesPerSecond the maximum bytes per second.
169 * @return {@code this} instance.
170 * @throws IllegalArgumentException Thrown if maxBytesPerSecond <= 0.
171 */
172 private Builder setMaxBytesPerSecond(final double maxBytesPerSecond) {
173 if (maxBytesPerSecond <= 0) {
174 throw new IllegalArgumentException("Bandwidth " + maxBytesPerSecond + " must be > 0.");
175 }
176 this.maxBytesPerSecond = maxBytesPerSecond;
177 return asThis();
178 }
179
180 /**
181 * Sets the maximum bytes per second.
182 *
183 * @param maxBytesPerSecond the maximum bytes per second.
184 * @throws IllegalArgumentException Thrown if maxBytesPerSecond <= 0.
185 */
186 public void setMaxBytesPerSecond(final long maxBytesPerSecond) {
187 setMaxBytesPerSecond((double) maxBytesPerSecond);
188 // TODO 3.0
189 // return asThis();
190 }
191
192 }
193
194 /**
195 * Constructs a new {@link Builder}.
196 *
197 * @return a new {@link Builder}.
198 */
199 public static Builder builder() {
200 return new Builder();
201 }
202
203 // package private for testing
204 static long toSleepMillis(final long bytesRead, final long elapsedMillis, final double maxBytesPerSec) {
205 if (bytesRead <= 0 || maxBytesPerSec <= 0 || elapsedMillis == 0) {
206 return 0;
207 }
208 // We use this class to load the single source file, so the bytesRead
209 // and maxBytesPerSec aren't greater than Double.MAX_VALUE.
210 // We can get the precise sleep time by using the double value.
211 final long millis = (long) (bytesRead / maxBytesPerSec * 1000 - elapsedMillis);
212 if (millis <= 0) {
213 return 0;
214 }
215 return millis;
216 }
217
218 private final double maxBytesPerSecond;
219 private final long startTime = System.currentTimeMillis();
220 private Duration totalSleepDuration = Duration.ZERO;
221
222 private ThrottledInputStream(final Builder builder) throws IOException {
223 super(builder);
224 if (builder.maxBytesPerSecond <= 0) {
225 throw new IllegalArgumentException("Bandwidth " + builder.maxBytesPerSecond + " is invalid.");
226 }
227 this.maxBytesPerSecond = builder.maxBytesPerSecond;
228 }
229
230 @Override
231 protected void beforeRead(final int n) throws IOException {
232 throttle();
233 }
234
235 /**
236 * Gets the read-rate from this stream, since creation. Calculated as bytesRead/elapsedTimeSinceStart.
237 *
238 * @return Read rate, in bytes/sec.
239 */
240 private long getBytesPerSecond() {
241 final long elapsedSeconds = (System.currentTimeMillis() - startTime) / 1000;
242 if (elapsedSeconds == 0) {
243 return getByteCount();
244 }
245 return getByteCount() / elapsedSeconds;
246 }
247
248 // package private for testing.
249 double getMaxBytesPerSecond() {
250 return maxBytesPerSecond;
251 }
252
253 private long getSleepMillis() {
254 return toSleepMillis(getByteCount(), System.currentTimeMillis() - startTime, maxBytesPerSecond);
255 }
256
257 /**
258 * Gets the total duration spent in sleep.
259 *
260 * @return Duration spent in sleep.
261 */
262 // package private for testing
263 Duration getTotalSleepDuration() {
264 return totalSleepDuration;
265 }
266
267 private void throttle() throws InterruptedIOException {
268 final long sleepMillis = getSleepMillis();
269 if (sleepMillis > 0) {
270 totalSleepDuration = totalSleepDuration.plus(sleepMillis, ChronoUnit.MILLIS);
271 try {
272 TimeUnit.MILLISECONDS.sleep(sleepMillis);
273 } catch (final InterruptedException e) {
274 throw new InterruptedIOException("Thread aborted");
275 }
276 }
277 }
278
279 /** {@inheritDoc} */
280 @Override
281 public String toString() {
282 return "ThrottledInputStream[bytesRead=" + getByteCount() + ", maxBytesPerSec=" + maxBytesPerSecond + ", bytesPerSec=" + getBytesPerSecond()
283 + ", totalSleepDuration=" + totalSleepDuration + ']';
284 }
285 }