001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package org.apache.commons.io.input;
019
020import java.io.IOException;
021import java.io.InputStream;
022import java.io.InterruptedIOException;
023import java.time.Duration;
024import java.time.temporal.ChronoUnit;
025import java.util.concurrent.TimeUnit;
026
027import org.apache.commons.io.build.AbstractStreamBuilder;
028
029/**
030 * Provides bandwidth throttling on a specified InputStream. It is implemented as a wrapper on top of another InputStream instance. The throttling works by
031 * examining the number of bytes read from the underlying InputStream from the beginning, and sleep()ing for a time interval if the byte-transfer is found
032 * exceed the specified tolerable maximum. (Thus, while the read-rate might exceed the maximum for a short interval, the average tends towards the
033 * specified maximum, overall.)
034 * <p>
035 * To build an instance, see {@link Builder}
036 * </p>
037 * <p>
038 * Inspired by Apache HBase's class of the same name.
039 * </p>
040 *
041 * @see Builder
042 * @since 2.16.0
043 */
044public final class ThrottledInputStream extends CountingInputStream {
045
046    // @formatter:off
047    /**
048     * Builds a new {@link ThrottledInputStream}.
049     *
050     * <h2>Using NIO</h2>
051     * <pre>{@code
052     * ThrottledInputStream in = ThrottledInputStream.builder()
053     *   .setPath(Paths.get("MyFile.xml"))
054     *   .setMaxBytesPerSecond(100_000)
055     *   .get();
056     * }
057     * </pre>
058     * <h2>Using IO</h2>
059     * <pre>{@code
060     * ThrottledInputStream in = ThrottledInputStream.builder()
061     *   .setFile(new File("MyFile.xml"))
062     *   .setMaxBytesPerSecond(100_000)
063     *   .get();
064     * }
065     * </pre>
066     * <pre>{@code
067     * ThrottledInputStream in = ThrottledInputStream.builder()
068     *   .setInputStream(inputStream)
069     *   .setMaxBytesPerSecond(100_000)
070     *   .get();
071     * }
072     * </pre>
073     *
074     * @see #get()
075     */
076    // @formatter:on
077    public static class Builder extends AbstractStreamBuilder<ThrottledInputStream, Builder> {
078
079        /**
080         * Effectively not throttled.
081         */
082        private long maxBytesPerSecond = Long.MAX_VALUE;
083
084        /**
085         * Builds a new {@link ThrottledInputStream}.
086         * <p>
087         * You must set input that supports {@link #getInputStream()}, otherwise, this method throws an exception.
088         * </p>
089         * <p>
090         * This builder use the following aspects:
091         * </p>
092         * <ul>
093         * <li>{@link #getInputStream()}</li>
094         * <li>maxBytesPerSecond</li>
095         * </ul>
096         *
097         * @return a new instance.
098         * @throws IllegalStateException         if the {@code origin} is {@code null}.
099         * @throws UnsupportedOperationException if the origin cannot be converted to an {@link InputStream}.
100         * @throws IOException                   if an I/O error occurs.
101         * @see #getInputStream()
102         */
103        @SuppressWarnings("resource")
104        @Override
105        public ThrottledInputStream get() throws IOException {
106            return new ThrottledInputStream(getInputStream(), maxBytesPerSecond);
107        }
108
109        /**
110         * Sets the maximum bytes per second.
111         *
112         * @param maxBytesPerSecond the maximum bytes per second.
113         */
114        public void setMaxBytesPerSecond(final long maxBytesPerSecond) {
115            this.maxBytesPerSecond = maxBytesPerSecond;
116        }
117
118    }
119
120    /**
121     * Constructs a new {@link Builder}.
122     *
123     * @return a new {@link Builder}.
124     */
125    public static Builder builder() {
126        return new Builder();
127    }
128
129    static long toSleepMillis(final long bytesRead, final long maxBytesPerSec, final long elapsedMillis) {
130        assert elapsedMillis >= 0 : "The elapsed time should be greater or equal to zero";
131        if (bytesRead <= 0 || maxBytesPerSec <= 0 || elapsedMillis == 0) {
132            return 0;
133        }
134        // We use this class to load the single source file, so the bytesRead
135        // and maxBytesPerSec aren't greater than Double.MAX_VALUE.
136        // We can get the precise sleep time by using the double value.
137        final long millis = (long) ((double) bytesRead / (double) maxBytesPerSec * 1000 - elapsedMillis);
138        if (millis <= 0) {
139            return 0;
140        }
141        return millis;
142    }
143
144    private final long maxBytesPerSecond;
145    private final long startTime = System.currentTimeMillis();
146    private Duration totalSleepDuration = Duration.ZERO;
147
148    private ThrottledInputStream(final InputStream proxy, final long maxBytesPerSecond) {
149        super(proxy);
150        assert maxBytesPerSecond > 0 : "Bandwidth " + maxBytesPerSecond + " is invalid.";
151        this.maxBytesPerSecond = maxBytesPerSecond;
152    }
153
154    @Override
155    protected void beforeRead(final int n) throws IOException {
156        throttle();
157    }
158
159    /**
160     * Gets the read-rate from this stream, since creation. Calculated as bytesRead/elapsedTimeSinceStart.
161     *
162     * @return Read rate, in bytes/sec.
163     */
164    private long getBytesPerSecond() {
165        final long elapsedSeconds = (System.currentTimeMillis() - startTime) / 1000;
166        if (elapsedSeconds == 0) {
167            return getByteCount();
168        }
169        return getByteCount() / elapsedSeconds;
170    }
171
172    private long getSleepMillis() {
173        return toSleepMillis(getByteCount(), maxBytesPerSecond, System.currentTimeMillis() - startTime);
174    }
175
176    /**
177     * Gets the total duration spent in sleep.
178     *
179     * @return Duration spent in sleep.
180     */
181    Duration getTotalSleepDuration() {
182        return totalSleepDuration;
183    }
184
185    private void throttle() throws InterruptedIOException {
186        final long sleepMillis = getSleepMillis();
187        if (sleepMillis > 0) {
188            totalSleepDuration = totalSleepDuration.plus(sleepMillis, ChronoUnit.MILLIS);
189            try {
190                TimeUnit.MILLISECONDS.sleep(sleepMillis);
191            } catch (final InterruptedException e) {
192                throw new InterruptedIOException("Thread aborted");
193            }
194        }
195    }
196
197    /** {@inheritDoc} */
198    @Override
199    public String toString() {
200        return "ThrottledInputStream[bytesRead=" + getByteCount() + ", maxBytesPerSec=" + maxBytesPerSecond + ", bytesPerSec=" + getBytesPerSecond()
201                + ", totalSleepDuration=" + totalSleepDuration + ']';
202    }
203}