1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18 package org.apache.commons.io.input; 19 20 import java.io.IOException; 21 import java.io.InputStream; 22 import java.io.InterruptedIOException; 23 import java.time.Duration; 24 import java.time.temporal.ChronoUnit; 25 import java.util.Objects; 26 import java.util.concurrent.TimeUnit; 27 28 /** 29 * Provides bandwidth throttling on an InputStream as a filter input stream. The throttling examines the number of bytes read from the underlying InputStream, 30 * and sleeps for a time interval if the byte-transfer is found to exceed the specified maximum rate. Thus, while the read-rate might exceed the maximum for a 31 * short interval, the average tends towards the specified maximum, overall. 32 * <p> 33 * To build an instance, call {@link #builder()}. 34 * </p> 35 * <p> 36 * Inspired by Apache HBase's class of the same name. 37 * </p> 38 * 39 * @see Builder 40 * @since 2.16.0 41 */ 42 public final class ThrottledInputStream extends CountingInputStream { 43 44 // @formatter:off 45 /** 46 * Builds a new {@link ThrottledInputStream}. 47 * 48 * <h2>Using NIO</h2> 49 * <pre>{@code 50 * ThrottledInputStream in = ThrottledInputStream.builder() 51 * .setPath(Paths.get("MyFile.xml")) 52 * .setMaxBytes(100_000, ChronoUnit.SECONDS) 53 * .get(); 54 * } 55 * </pre> 56 * <h2>Using IO</h2> 57 * <pre>{@code 58 * ThrottledInputStream in = ThrottledInputStream.builder() 59 * .setFile(new File("MyFile.xml")) 60 * .setMaxBytes(100_000, ChronoUnit.SECONDS) 61 * .get(); 62 * } 63 * </pre> 64 * <pre>{@code 65 * ThrottledInputStream in = ThrottledInputStream.builder() 66 * .setInputStream(inputStream) 67 * .setMaxBytes(100_000, ChronoUnit.SECONDS) 68 * .get(); 69 * } 70 * </pre> 71 * 72 * @see #get() 73 */ 74 // @formatter:on 75 public static class Builder extends AbstractBuilder<ThrottledInputStream, Builder> { 76 77 /** 78 * Effectively not throttled. 79 */ 80 private double maxBytesPerSecond = Double.MAX_VALUE; 81 82 /** 83 * Constructs a new builder of {@link ThrottledInputStream}. 84 */ 85 public Builder() { 86 // empty 87 } 88 89 /** 90 * Builds a new {@link ThrottledInputStream}. 91 * <p> 92 * You must set an aspect that supports {@link #getInputStream()}, otherwise, this method throws an exception. 93 * </p> 94 * <p> 95 * This builder uses the following aspects: 96 * </p> 97 * <ul> 98 * <li>{@link #getInputStream()} gets the target aspect.</li> 99 * <li>maxBytesPerSecond</li> 100 * </ul> 101 * 102 * @return a new instance. 103 * @throws IllegalStateException if the {@code origin} is {@code null}. 104 * @throws UnsupportedOperationException if the origin cannot be converted to an {@link InputStream}. 105 * @throws IOException if an I/O error occurs converting to an {@link InputStream} using {@link #getInputStream()}. 106 * @see #getInputStream() 107 * @see #getUnchecked() 108 */ 109 @Override 110 public ThrottledInputStream get() throws IOException { 111 return new ThrottledInputStream(this); 112 } 113 114 // package private for testing. 115 double getMaxBytesPerSecond() { 116 return maxBytesPerSecond; 117 } 118 119 /** 120 * Sets the maximum bytes per time period unit. 121 * <p> 122 * For example, to throttle reading to 100K per second, use: 123 * </p> 124 * <pre> 125 * builder.setMaxBytes(100_000, ChronoUnit.SECONDS) 126 * </pre> 127 * <p> 128 * To test idle timeouts for example, use 1 byte per minute, 1 byte per 30 seconds, and so on. 129 * </p> 130 * 131 * @param value the maximum bytes 132 * @param chronoUnit a duration scale goal. 133 * @return this instance. 134 * @throws IllegalArgumentException Thrown if maxBytesPerSecond <= 0. 135 * @since 2.19.0 136 */ 137 public Builder setMaxBytes(final long value, final ChronoUnit chronoUnit) { 138 setMaxBytes(value, chronoUnit.getDuration()); 139 return asThis(); 140 } 141 142 /** 143 * Sets the maximum bytes per duration. 144 * <p> 145 * For example, to throttle reading to 100K per second, use: 146 * </p> 147 * <pre> 148 * builder.setMaxBytes(100_000, Duration.ofSeconds(1)) 149 * </pre> 150 * <p> 151 * To test idle timeouts for example, use 1 byte per minute, 1 byte per 30 seconds, and so on. 152 * </p> 153 * 154 * @param value the maximum bytes 155 * @param duration a duration goal. 156 * @return this instance. 157 * @throws IllegalArgumentException Thrown if maxBytesPerSecond <= 0. 158 */ 159 // Consider making public in the future 160 Builder setMaxBytes(final long value, final Duration duration) { 161 setMaxBytesPerSecond((double) Objects.requireNonNull(duration, "duration").toMillis() / 1_000 * value); 162 return asThis(); 163 } 164 165 /** 166 * Sets the maximum bytes per second. 167 * 168 * @param maxBytesPerSecond the maximum bytes per second. 169 * @return this instance. 170 * @throws IllegalArgumentException Thrown if maxBytesPerSecond <= 0. 171 */ 172 private Builder setMaxBytesPerSecond(final double maxBytesPerSecond) { 173 if (maxBytesPerSecond <= 0) { 174 throw new IllegalArgumentException("Bandwidth " + maxBytesPerSecond + " must be > 0."); 175 } 176 this.maxBytesPerSecond = maxBytesPerSecond; 177 return asThis(); 178 } 179 180 /** 181 * Sets the maximum bytes per second. 182 * 183 * @param maxBytesPerSecond the maximum bytes per second. 184 * @throws IllegalArgumentException Thrown if maxBytesPerSecond <= 0. 185 */ 186 public void setMaxBytesPerSecond(final long maxBytesPerSecond) { 187 setMaxBytesPerSecond((double) maxBytesPerSecond); 188 // TODO 3.0 189 // return asThis(); 190 } 191 192 } 193 194 /** 195 * Constructs a new {@link Builder}. 196 * 197 * @return a new {@link Builder}. 198 */ 199 public static Builder builder() { 200 return new Builder(); 201 } 202 203 // package private for testing 204 static long toSleepMillis(final long bytesRead, final long elapsedMillis, final double maxBytesPerSec) { 205 if (bytesRead <= 0 || maxBytesPerSec <= 0 || elapsedMillis == 0) { 206 return 0; 207 } 208 // We use this class to load the single source file, so the bytesRead 209 // and maxBytesPerSec aren't greater than Double.MAX_VALUE. 210 // We can get the precise sleep time by using the double value. 211 final long millis = (long) (bytesRead / maxBytesPerSec * 1000 - elapsedMillis); 212 if (millis <= 0) { 213 return 0; 214 } 215 return millis; 216 } 217 218 private final double maxBytesPerSecond; 219 private final long startTime = System.currentTimeMillis(); 220 private Duration totalSleepDuration = Duration.ZERO; 221 222 private ThrottledInputStream(final Builder builder) throws IOException { 223 super(builder); 224 if (builder.maxBytesPerSecond <= 0) { 225 throw new IllegalArgumentException("Bandwidth " + builder.maxBytesPerSecond + " is invalid."); 226 } 227 this.maxBytesPerSecond = builder.maxBytesPerSecond; 228 } 229 230 @Override 231 protected void beforeRead(final int n) throws IOException { 232 throttle(); 233 } 234 235 /** 236 * Gets the read-rate from this stream, since creation. Calculated as bytesRead/elapsedTimeSinceStart. 237 * 238 * @return Read rate, in bytes/sec. 239 */ 240 private long getBytesPerSecond() { 241 final long elapsedSeconds = (System.currentTimeMillis() - startTime) / 1000; 242 if (elapsedSeconds == 0) { 243 return getByteCount(); 244 } 245 return getByteCount() / elapsedSeconds; 246 } 247 248 // package private for testing. 249 double getMaxBytesPerSecond() { 250 return maxBytesPerSecond; 251 } 252 253 private long getSleepMillis() { 254 return toSleepMillis(getByteCount(), System.currentTimeMillis() - startTime, maxBytesPerSecond); 255 } 256 257 /** 258 * Gets the total duration spent in sleep. 259 * 260 * @return Duration spent in sleep. 261 */ 262 // package private for testing 263 Duration getTotalSleepDuration() { 264 return totalSleepDuration; 265 } 266 267 private void throttle() throws InterruptedIOException { 268 final long sleepMillis = getSleepMillis(); 269 if (sleepMillis > 0) { 270 totalSleepDuration = totalSleepDuration.plus(sleepMillis, ChronoUnit.MILLIS); 271 try { 272 TimeUnit.MILLISECONDS.sleep(sleepMillis); 273 } catch (final InterruptedException e) { 274 throw new InterruptedIOException("Thread aborted"); 275 } 276 } 277 } 278 279 /** {@inheritDoc} */ 280 @Override 281 public String toString() { 282 return "ThrottledInputStream[bytesRead=" + getByteCount() + ", maxBytesPerSec=" + maxBytesPerSecond + ", bytesPerSec=" + getBytesPerSecond() 283 + ", totalSleepDuration=" + totalSleepDuration + ']'; 284 } 285 }