001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.commons.io.input;
018
019import static org.apache.commons.io.IOUtils.EOF;
020
021import java.io.IOException;
022import java.io.InputStream;
023
024import org.apache.commons.io.IOUtils;
025import org.apache.commons.io.build.AbstractStreamBuilder;
026
027//@formatter:off
028/**
029 * Reads bytes up to a maximum count and stops once reached.
030 * <p>
031 * To build an instance, see {@link AbstractBuilder}.
032 * </p>
033 * <p>
034 * By default, a {@link BoundedInputStream} is <em>unbound</em>; so make sure to call {@link AbstractBuilder#setMaxCount(long)}.
035 * </p>
036 * <p>
037 * You can find out how many bytes this stream has seen so far by calling {@link BoundedInputStream#getCount()}. This value reflects bytes read and skipped.
038 * </p>
039 * <h2>Using a ServletInputStream</h2>
040 * <p>
041 * A {@code ServletInputStream} can block if you try to read content that isn't there
042 * because it doesn't know whether the content hasn't arrived yet or whether the content has finished. Initialize an {@link BoundedInputStream} with the
043 * {@code Content-Length} sent in the {@code ServletInputStream}'s header, this stop it from blocking, providing it's been sent with a correct content
044 * length in the first place.
045 * </p>
046 * <h2>Using NIO</h2>
047 * <pre>{@code
048 * BoundedInputStream s = BoundedInputStream.builder()
049 *   .setPath(Paths.get("MyFile.xml"))
050 *   .setMaxCount(1024)
051 *   .setPropagateClose(false)
052 *   .get();
053 * }
054 * </pre>
055 * <h2>Using IO</h2>
056 * <pre>{@code
057 * BoundedInputStream s = BoundedInputStream.builder()
058 *   .setFile(new File("MyFile.xml"))
059 *   .setMaxCount(1024)
060 *   .setPropagateClose(false)
061 *   .get();
062 * }
063 * </pre>
064 * <h2>Counting Bytes</h2>
065 * <p>You can set the running count when building, which is most useful when starting from another stream:
066 * <pre>{@code
067 * InputStream in = ...;
068 * BoundedInputStream s = BoundedInputStream.builder()
069 *   .setInputStream(in)
070 *   .setCount(12)
071 *   .setMaxCount(1024)
072 *   .setPropagateClose(false)
073 *   .get();
074 * }
075 * </pre>
076 * @see Builder
077 * @since 2.0
078 */
079//@formatter:on
080public class BoundedInputStream extends ProxyInputStream {
081
082    /**
083     * For subclassing builders from {@link BoundedInputStream} subclassses.
084     *
085     * @param <T> The subclass.
086     */
087    static abstract class AbstractBuilder<T extends AbstractBuilder<T>> extends AbstractStreamBuilder<BoundedInputStream, T> {
088
089        /** The current count of bytes counted. */
090        private long count;
091
092        /** The max count of bytes to read. */
093        private long maxCount = EOF;
094
095        /** Flag if {@link #close()} should be propagated, {@code true} by default. */
096        private boolean propagateClose = true;
097
098        long getCount() {
099            return count;
100        }
101
102        long getMaxCount() {
103            return maxCount;
104        }
105
106        boolean isPropagateClose() {
107            return propagateClose;
108        }
109
110        /**
111         * Sets the current number of bytes counted.
112         * <p>
113         * Useful when building from another stream to carry forward a read count.
114         * </p>
115         * <p>
116         * Default is {@code 0}, negative means 0.
117         * </p>
118         *
119         * @param count The current number of bytes counted.
120         * @return this.
121         */
122        public T setCount(final long count) {
123            this.count = Math.max(0, count);
124            return asThis();
125        }
126
127        /**
128         * Sets the maximum number of bytes to return.
129         * <p>
130         * Default is {@value IOUtils#EOF}, negative means unbound.
131         * </p>
132         *
133         * @param maxCount The maximum number of bytes to return.
134         * @return this.
135         */
136        public T setMaxCount(final long maxCount) {
137            this.maxCount = Math.max(EOF, maxCount);
138            return asThis();
139        }
140
141        /**
142         * Sets whether the {@link #close()} method should propagate to the underling {@link InputStream}.
143         * <p>
144         * Default is {@code true}.
145         * </p>
146         *
147         * @param propagateClose {@code true} if calling {@link #close()} propagates to the {@code close()} method of the underlying stream or {@code false} if
148         *                       it does not.
149         * @return this.
150         */
151        public T setPropagateClose(final boolean propagateClose) {
152            this.propagateClose = propagateClose;
153            return asThis();
154        }
155
156    }
157
158    //@formatter:off
159    /**
160     * Builds a new {@link BoundedInputStream}.
161     * <p>
162     * By default, a {@link BoundedInputStream} is <em>unbound</em>; so make sure to call {@link AbstractBuilder#setMaxCount(long)}.
163     * </p>
164     * <p>
165     * You can find out how many bytes this stream has seen so far by calling {@link BoundedInputStream#getCount()}. This value reflects bytes read and skipped.
166     * </p>
167     * <h2>Using a ServletInputStream</h2>
168     * <p>
169     * A {@code ServletInputStream} can block if you try to read content that isn't there
170     * because it doesn't know whether the content hasn't arrived yet or whether the content has finished. Initialize an {@link BoundedInputStream} with the
171     * {@code Content-Length} sent in the {@code ServletInputStream}'s header, this stop it from blocking, providing it's been sent with a correct content
172     * length in the first place.
173     * </p>
174     * <h2>Using NIO</h2>
175     * <pre>{@code
176     * BoundedInputStream s = BoundedInputStream.builder()
177     *   .setPath(Paths.get("MyFile.xml"))
178     *   .setMaxCount(1024)
179     *   .setPropagateClose(false)
180     *   .get();
181     * }
182     * </pre>
183     * <h2>Using IO</h2>
184     * <pre>{@code
185     * BoundedInputStream s = BoundedInputStream.builder()
186     *   .setFile(new File("MyFile.xml"))
187     *   .setMaxCount(1024)
188     *   .setPropagateClose(false)
189     *   .get();
190     * }
191     * </pre>
192     * <h2>Counting Bytes</h2>
193     * <p>You can set the running count when building, which is most useful when starting from another stream:
194     * <pre>{@code
195     * InputStream in = ...;
196     * BoundedInputStream s = BoundedInputStream.builder()
197     *   .setInputStream(in)
198     *   .setCount(12)
199     *   .setMaxCount(1024)
200     *   .setPropagateClose(false)
201     *   .get();
202     * }
203     * </pre>
204     *
205     * @see #get()
206     * @since 2.16.0
207     */
208    //@formatter:on
209    public static class Builder extends AbstractBuilder<Builder> {
210
211        /**
212         * Builds a new {@link BoundedInputStream}.
213         * <p>
214         * You must set input that supports {@link #getInputStream()}, otherwise, this method throws an exception.
215         * </p>
216         * <p>
217         * This builder use the following aspects:
218         * </p>
219         * <ul>
220         * <li>{@link #getInputStream()}</li>
221         * <li>maxCount</li>
222         * <li>propagateClose</li>
223         * </ul>
224         *
225         * @return a new instance.
226         * @throws IllegalStateException         if the {@code origin} is {@code null}.
227         * @throws UnsupportedOperationException if the origin cannot be converted to an {@link InputStream}.
228         * @throws IOException                   if an I/O error occurs.
229         * @see #getInputStream()
230         */
231        @SuppressWarnings("resource")
232        @Override
233        public BoundedInputStream get() throws IOException {
234            return new BoundedInputStream(getInputStream(), getCount(), getMaxCount(), isPropagateClose());
235        }
236
237    }
238
239    /**
240     * Constructs a new {@link AbstractBuilder}.
241     *
242     * @return a new {@link AbstractBuilder}.
243     * @since 2.16.0
244     */
245    public static Builder builder() {
246        return new Builder();
247    }
248
249    /** The current count of bytes counted. */
250    private long count;
251
252    /** The current mark. */
253    private long mark;
254
255    /** The max count of bytes to read. */
256    private final long maxCount;
257
258    /**
259     * Flag if close should be propagated.
260     *
261     * TODO Make final in 3.0.
262     */
263    private boolean propagateClose = true;
264
265    /**
266     * Constructs a new {@link BoundedInputStream} that wraps the given input stream and is unlimited.
267     *
268     * @param in The wrapped input stream.
269     * @deprecated Use {@link AbstractBuilder#get()}.
270     */
271    @Deprecated
272    public BoundedInputStream(final InputStream in) {
273        this(in, EOF);
274    }
275
276    /**
277     * Constructs a new {@link BoundedInputStream} that wraps the given input stream and limits it to a certain size.
278     *
279     * @param inputStream The wrapped input stream.
280     * @param maxCount    The maximum number of bytes to return.
281     * @deprecated Use {@link AbstractBuilder#get()}.
282     */
283    @Deprecated
284    public BoundedInputStream(final InputStream inputStream, final long maxCount) {
285        // Some badly designed methods - e.g. the Servlet API - overload length
286        // such that "-1" means stream finished
287        this(inputStream, 0, maxCount, true);
288    }
289
290    /**
291     * Constructs a new {@link BoundedInputStream} that wraps the given input stream and limits it to a certain size.
292     *
293     * @param inputStream    The wrapped input stream.
294     * @param count          The current number of bytes read.
295     * @param maxCount       The maximum number of bytes to return.
296     * @param propagateClose {@code true} if calling {@link #close()} propagates to the {@code close()} method of the underlying stream or {@code false} if it
297     *                       does not.
298     */
299    BoundedInputStream(final InputStream inputStream, final long count, final long maxCount, final boolean propagateClose) {
300        // Some badly designed methods - e.g. the Servlet API - overload length
301        // such that "-1" means stream finished
302        // Can't throw because we start from an InputStream.
303        super(inputStream);
304        this.count = count;
305        this.maxCount = maxCount;
306        this.propagateClose = propagateClose;
307    }
308
309    /**
310     * Adds the number of read bytes to the count.
311     *
312     * @param n number of bytes read, or -1 if no more bytes are available
313     * @throws IOException Not thrown here but subclasses may throw.
314     * @since 2.0
315     */
316    @Override
317    protected synchronized void afterRead(final int n) throws IOException {
318        if (n != EOF) {
319            count += n;
320        }
321    }
322
323    /**
324     * {@inheritDoc}
325     */
326    @Override
327    public int available() throws IOException {
328        if (isMaxCount()) {
329            onMaxLength(maxCount, getCount());
330            return 0;
331        }
332        return in.available();
333    }
334
335    /**
336     * Invokes the delegate's {@link InputStream#close()} method if {@link #isPropagateClose()} is {@code true}.
337     *
338     * @throws IOException if an I/O error occurs.
339     */
340    @Override
341    public void close() throws IOException {
342        if (propagateClose) {
343            in.close();
344        }
345    }
346
347    /**
348     * Gets the count of bytes read.
349     *
350     * @return The count of bytes read.
351     * @since 2.12.0
352     */
353    public synchronized long getCount() {
354        return count;
355    }
356
357    /**
358     * Gets the max count of bytes to read.
359     *
360     * @return The max count of bytes to read.
361     * @since 2.16.0
362     */
363    public long getMaxCount() {
364        return maxCount;
365    }
366
367    /**
368     * Gets the max count of bytes to read.
369     *
370     * @return The max count of bytes to read.
371     * @since 2.12.0
372     * @deprecated Use {@link #getMaxCount()}.
373     */
374    @Deprecated
375    public long getMaxLength() {
376        return maxCount;
377    }
378
379    /**
380     * Gets how many bytes remain to read.
381     *
382     * @return bytes how many bytes remain to read.
383     * @since 2.16.0
384     */
385    public long getRemaining() {
386        return Math.max(0, getMaxCount() - getCount());
387    }
388
389    private boolean isMaxCount() {
390        return maxCount >= 0 && getCount() >= maxCount;
391    }
392
393    /**
394     * Tests whether the {@link #close()} method should propagate to the underling {@link InputStream}.
395     *
396     * @return {@code true} if calling {@link #close()} propagates to the {@code close()} method of the underlying stream or {@code false} if it does not.
397     */
398    public boolean isPropagateClose() {
399        return propagateClose;
400    }
401
402    /**
403     * Invokes the delegate's {@link InputStream#mark(int)} method.
404     *
405     * @param readLimit read ahead limit
406     */
407    @Override
408    public synchronized void mark(final int readLimit) {
409        in.mark(readLimit);
410        mark = count;
411    }
412
413    /**
414     * Invokes the delegate's {@link InputStream#markSupported()} method.
415     *
416     * @return true if mark is supported, otherwise false
417     */
418    @Override
419    public boolean markSupported() {
420        return in.markSupported();
421    }
422
423    /**
424     * A caller has caused a request that would cross the {@code maxLength} boundary.
425     *
426     * @param maxLength The max count of bytes to read.
427     * @param count     The count of bytes read.
428     * @throws IOException Subclasses may throw.
429     * @since 2.12.0
430     */
431    @SuppressWarnings("unused")
432    protected void onMaxLength(final long maxLength, final long count) throws IOException {
433        // for subclasses
434    }
435
436    /**
437     * Invokes the delegate's {@link InputStream#read()} method if the current position is less than the limit.
438     *
439     * @return the byte read or -1 if the end of stream or the limit has been reached.
440     * @throws IOException if an I/O error occurs.
441     */
442    @Override
443    public int read() throws IOException {
444        if (isMaxCount()) {
445            onMaxLength(maxCount, getCount());
446            return EOF;
447        }
448        return super.read();
449    }
450
451    /**
452     * Invokes the delegate's {@link InputStream#read(byte[])} method.
453     *
454     * @param b the buffer to read the bytes into
455     * @return the number of bytes read or -1 if the end of stream or the limit has been reached.
456     * @throws IOException if an I/O error occurs.
457     */
458    @Override
459    public int read(final byte[] b) throws IOException {
460        return read(b, 0, b.length);
461    }
462
463    /**
464     * Invokes the delegate's {@link InputStream#read(byte[], int, int)} method.
465     *
466     * @param b   the buffer to read the bytes into
467     * @param off The start offset
468     * @param len The number of bytes to read
469     * @return the number of bytes read or -1 if the end of stream or the limit has been reached.
470     * @throws IOException if an I/O error occurs.
471     */
472    @Override
473    public int read(final byte[] b, final int off, final int len) throws IOException {
474        if (isMaxCount()) {
475            onMaxLength(maxCount, getCount());
476            return EOF;
477        }
478        return super.read(b, off, (int) toReadLen(len));
479    }
480
481    /**
482     * Invokes the delegate's {@link InputStream#reset()} method.
483     *
484     * @throws IOException if an I/O error occurs.
485     */
486    @Override
487    public synchronized void reset() throws IOException {
488        in.reset();
489        count = mark;
490    }
491
492    /**
493     * Sets whether the {@link #close()} method should propagate to the underling {@link InputStream}.
494     *
495     * @param propagateClose {@code true} if calling {@link #close()} propagates to the {@code close()} method of the underlying stream or {@code false} if it
496     *                       does not.
497     * @deprecated Use {@link AbstractBuilder#setPropagateClose(boolean)}.
498     */
499    @Deprecated
500    public void setPropagateClose(final boolean propagateClose) {
501        this.propagateClose = propagateClose;
502    }
503
504    /**
505     * Invokes the delegate's {@link InputStream#skip(long)} method.
506     *
507     * @param n the number of bytes to skip
508     * @return the actual number of bytes skipped
509     * @throws IOException if an I/O error occurs.
510     */
511    @Override
512    public synchronized long skip(final long n) throws IOException {
513        final long skip = super.skip(toReadLen(n));
514        count += skip;
515        return skip;
516    }
517
518    private long toReadLen(final long len) {
519        return maxCount >= 0 ? Math.min(len, maxCount - getCount()) : len;
520    }
521
522    /**
523     * Invokes the delegate's {@link InputStream#toString()} method.
524     *
525     * @return the delegate's {@link InputStream#toString()}
526     */
527    @Override
528    public String toString() {
529        return in.toString();
530    }
531}