001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      https://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.commons.io.input;
018
019import static org.apache.commons.io.IOUtils.EOF;
020
021import java.io.IOException;
022import java.io.InputStream;
023
024import org.apache.commons.io.IOUtils;
025import org.apache.commons.io.function.IOBiConsumer;
026
027//@formatter:off
028/**
029 * Reads bytes up to a maximum count and stops once reached.
030 * <p>
031 * To build an instance: Use the {@link #builder()} to access all features.
032 * </p>
033 * <p>
034 * By default, a {@link BoundedInputStream} is <em>unbound</em>; so make sure to call {@link AbstractBuilder#setMaxCount(long)}.
035 * </p>
036 * <p>
037 * You can find out how many bytes this stream has seen so far by calling {@link BoundedInputStream#getCount()}. This value reflects bytes read and skipped.
038 * </p>
039 * <h2>Using a ServletInputStream</h2>
040 * <p>
041 * A {@code ServletInputStream} can block if you try to read content that isn't there
042 * because it doesn't know whether the content hasn't arrived yet or whether the content has finished. Initialize an {@link BoundedInputStream} with the
043 * {@code Content-Length} sent in the {@code ServletInputStream}'s header, this stop it from blocking, providing it's been sent with a correct content
044 * length in the first place.
045 * </p>
046 * <h2>Using NIO</h2>
047 * <pre>{@code
048 * BoundedInputStream s = BoundedInputStream.builder()
049 *   .setPath(Paths.get("MyFile.xml"))
050 *   .setMaxCount(1024)
051 *   .setPropagateClose(false)
052 *   .get();
053 * }
054 * </pre>
055 * <h2>Using IO</h2>
056 * <pre>{@code
057 * BoundedInputStream s = BoundedInputStream.builder()
058 *   .setFile(new File("MyFile.xml"))
059 *   .setMaxCount(1024)
060 *   .setPropagateClose(false)
061 *   .get();
062 * }
063 * </pre>
064 * <h2>Counting Bytes</h2>
065 * <p>You can set the running count when building, which is most useful when starting from another stream:
066 * <pre>{@code
067 * InputStream in = ...;
068 * BoundedInputStream s = BoundedInputStream.builder()
069 *   .setInputStream(in)
070 *   .setCount(12)
071 *   .setMaxCount(1024)
072 *   .setPropagateClose(false)
073 *   .get();
074 * }
075 * </pre>
076 * <h2>Listening for the maximum count reached</h2>
077 * <pre>{@code
078 * BoundedInputStream s = BoundedInputStream.builder()
079 *   .setPath(Paths.get("MyFile.xml"))
080 *   .setMaxCount(1024)
081 *   .setOnMaxCount((max, count) -> System.out.printf("Maximum count %,d reached with a last read count of %,d%n", max, count))
082 *   .get();
083 * }
084 * </pre>
085 * @see Builder
086 * @since 2.0
087 */
088//@formatter:on
089public class BoundedInputStream extends ProxyInputStream {
090
091    /**
092     * For subclassing builders from {@link BoundedInputStream} subclassses.
093     *
094     * @param <T> The subclass.
095     */
096    abstract static class AbstractBuilder<T extends AbstractBuilder<T>> extends ProxyInputStream.AbstractBuilder<BoundedInputStream, T> {
097
098        /** The current count of bytes counted. */
099        private long count;
100
101        /** The maximum count of bytes to read. */
102        private long maxCount = EOF;
103
104        private IOBiConsumer<Long, Long> onMaxCount = IOBiConsumer.noop();
105
106        /** Flag if {@link #close()} should be propagated, {@code true} by default. */
107        private boolean propagateClose = true;
108
109        long getCount() {
110            return count;
111        }
112
113        long getMaxCount() {
114            return maxCount;
115        }
116
117        IOBiConsumer<Long, Long> getOnMaxCount() {
118            return onMaxCount;
119        }
120
121        boolean isPropagateClose() {
122            return propagateClose;
123        }
124
125        /**
126         * Sets the current number of bytes counted.
127         * <p>
128         * Useful when building from another stream to carry forward a read count.
129         * </p>
130         * <p>
131         * Default is {@code 0}, negative means 0.
132         * </p>
133         *
134         * @param count The current number of bytes counted.
135         * @return {@code this} instance.
136         */
137        public T setCount(final long count) {
138            this.count = Math.max(0, count);
139            return asThis();
140        }
141
142        /**
143         * Sets the maximum number of bytes to return.
144         * <p>
145         * Default is {@value IOUtils#EOF}, negative means unbound.
146         * </p>
147         *
148         * @param maxCount The maximum number of bytes to return, negative means unbound.
149         * @return {@code this} instance.
150         */
151        public T setMaxCount(final long maxCount) {
152            this.maxCount = Math.max(EOF, maxCount);
153            return asThis();
154        }
155
156        /**
157         * Sets the default {@link BoundedInputStream#onMaxLength(long, long)} behavior, {@code null} resets to a NOOP.
158         * <p>
159         * The first Long is the number of bytes remaining to read before the maximum is reached count of bytes to read. The second Long is the count of bytes
160         * read.
161         * </p>
162         * <p>
163         * This does <em>not</em> override a {@code BoundedInputStream} subclass' implementation of the {@link BoundedInputStream#onMaxLength(long, long)}
164         * method.
165         * </p>
166         *
167         * @param onMaxCount the {@link ProxyInputStream#afterRead(int)} behavior.
168         * @return {@code this} instance.
169         * @since 2.18.0
170         */
171        public T setOnMaxCount(final IOBiConsumer<Long, Long> onMaxCount) {
172            this.onMaxCount = onMaxCount != null ? onMaxCount : IOBiConsumer.noop();
173            return asThis();
174        }
175
176        /**
177         * Sets whether the {@link #close()} method should propagate to the underling {@link InputStream}.
178         * <p>
179         * Default is {@code true}.
180         * </p>
181         *
182         * @param propagateClose {@code true} if calling {@link #close()} propagates to the {@code close()} method of the underlying stream or {@code false} if
183         *                       it does not.
184         * @return {@code this} instance.
185         */
186        public T setPropagateClose(final boolean propagateClose) {
187            this.propagateClose = propagateClose;
188            return asThis();
189        }
190
191    }
192
193    //@formatter:off
194    /**
195     * Builds a new {@link BoundedInputStream}.
196     * <p>
197     * By default, a {@link BoundedInputStream} is <em>unbound</em>; so make sure to call {@link AbstractBuilder#setMaxCount(long)}.
198     * </p>
199     * <p>
200     * You can find out how many bytes this stream has seen so far by calling {@link BoundedInputStream#getCount()}. This value reflects bytes read and skipped.
201     * </p>
202     * <h2>Using a ServletInputStream</h2>
203     * <p>
204     * A {@code ServletInputStream} can block if you try to read content that isn't there
205     * because it doesn't know whether the content hasn't arrived yet or whether the content has finished. Initialize an {@link BoundedInputStream} with the
206     * {@code Content-Length} sent in the {@code ServletInputStream}'s header, this stop it from blocking, providing it's been sent with a correct content
207     * length in the first place.
208     * </p>
209     * <h2>Using NIO</h2>
210     * <pre>{@code
211     * BoundedInputStream s = BoundedInputStream.builder()
212     *   .setPath(Paths.get("MyFile.xml"))
213     *   .setMaxCount(1024)
214     *   .setPropagateClose(false)
215     *   .get();
216     * }
217     * </pre>
218     * <h2>Using IO</h2>
219     * <pre>{@code
220     * BoundedInputStream s = BoundedInputStream.builder()
221     *   .setFile(new File("MyFile.xml"))
222     *   .setMaxCount(1024)
223     *   .setPropagateClose(false)
224     *   .get();
225     * }
226     * </pre>
227     * <h2>Counting Bytes</h2>
228     * <p>You can set the running count when building, which is most useful when starting from another stream:
229     * <pre>{@code
230     * InputStream in = ...;
231     * BoundedInputStream s = BoundedInputStream.builder()
232     *   .setInputStream(in)
233     *   .setCount(12)
234     *   .setMaxCount(1024)
235     *   .setPropagateClose(false)
236     *   .get();
237     * }
238     * </pre>
239     *
240     * @see #get()
241     * @since 2.16.0
242     */
243    //@formatter:on
244    public static class Builder extends AbstractBuilder<Builder> {
245
246        /**
247         * Constructs a new builder of {@link BoundedInputStream}.
248         */
249        public Builder() {
250            // empty
251        }
252
253        /**
254         * Builds a new {@link BoundedInputStream}.
255         * <p>
256         * You must set an aspect that supports {@link #getInputStream()}, otherwise, this method throws an exception.
257         * </p>
258         * <p>
259         * If you start from an input stream, an exception can't be thrown, and you can call {@link #getUnchecked()} instead.
260         * </p>
261         * <p>
262         * This builder uses the following aspects:
263         * </p>
264         * <ul>
265         * <li>{@link #getInputStream()} gets the target aspect.</li>
266         * <li>{@link #getAfterRead()}</li>
267         * <li>{@code #getCount()}</li>
268         * <li>{@code #getMaxCount()}</li>
269         * <li>{@code #getOnMaxCount()}</li>
270         * <li>{@code #isPropagateClose()}</li>
271         * </ul>
272         *
273         * @return a new instance.
274         * @throws IllegalStateException         if the {@code origin} is {@code null}.
275         * @throws UnsupportedOperationException if the origin cannot be converted to an {@link InputStream}.
276         * @throws IOException                   if an I/O error occurs converting to an {@link InputStream} using {@link #getInputStream()}.
277         * @see #getInputStream()
278         * @see #getUnchecked()
279         */
280        @Override
281        public BoundedInputStream get() throws IOException {
282            return new BoundedInputStream(this);
283        }
284
285    }
286
287    /**
288     * Constructs a new {@link AbstractBuilder}.
289     *
290     * @return a new {@link AbstractBuilder}.
291     * @since 2.16.0
292     */
293    public static Builder builder() {
294        return new Builder();
295    }
296
297    /** The current count of bytes counted. */
298    private long count;
299
300    /** The current mark. */
301    private long mark;
302
303    /** The maximum count of bytes to read. */
304    private final long maxCount;
305
306    private final IOBiConsumer<Long, Long> onMaxCount;
307
308    /**
309     * Flag if close should be propagated.
310     *
311     * TODO Make final in 3.0.
312     */
313    private boolean propagateClose = true;
314
315    private BoundedInputStream(final Builder builder) throws IOException {
316        super(builder);
317        this.count = builder.getCount();
318        this.maxCount = builder.getMaxCount();
319        this.propagateClose = builder.isPropagateClose();
320        this.onMaxCount = builder.getOnMaxCount();
321    }
322
323    /**
324     * Constructs a new {@link BoundedInputStream} that wraps the given input stream and is <em>unbounded</em>.
325     * <p>
326     * To build an instance: Use the {@link #builder()} to access all features.
327     * </p>
328     *
329     * @param in The wrapped input stream.
330     * @deprecated Use {@link AbstractBuilder#get()}.
331     */
332    @Deprecated
333    public BoundedInputStream(final InputStream in) {
334        this(in, EOF);
335    }
336
337    private BoundedInputStream(final InputStream inputStream, final Builder builder) {
338        super(inputStream, builder);
339        this.count = builder.getCount();
340        this.maxCount = builder.getMaxCount();
341        this.propagateClose = builder.isPropagateClose();
342        this.onMaxCount = builder.getOnMaxCount();
343    }
344
345    /**
346     * Constructs a new {@link BoundedInputStream} that wraps the given input stream and limits it to a certain size.
347     *
348     * @param inputStream The wrapped input stream.
349     * @param maxCount    The maximum number of bytes to return, negative means unbound.
350     * @deprecated Use {@link AbstractBuilder#get()}.
351     */
352    @Deprecated
353    public BoundedInputStream(final InputStream inputStream, final long maxCount) {
354        // Some badly designed methods, for example the Servlet API, overload length
355        // such that "-1" means stream finished
356        this(inputStream, builder().setMaxCount(maxCount));
357    }
358
359    /**
360     * Adds the number of read bytes to the count.
361     *
362     * @param n number of bytes read, or -1 if no more bytes are available.
363     * @throws IOException Not thrown here but subclasses may throw.
364     * @since 2.0
365     */
366    @Override
367    protected synchronized void afterRead(final int n) throws IOException {
368        if (n != EOF) {
369            count += n;
370        }
371        super.afterRead(n);
372    }
373
374    @Override
375    public int available() throws IOException {
376        // Safe cast: value is between 0 and Integer.MAX_VALUE
377        final int remaining = (int) Math.min(getRemaining(), Integer.MAX_VALUE);
378        return Math.min(super.available(), remaining);
379    }
380
381    /**
382     * Invokes the delegate's {@link InputStream#close()} method if {@link #isPropagateClose()} is {@code true}.
383     *
384     * @throws IOException if an I/O error occurs.
385     */
386    @Override
387    public void close() throws IOException {
388        if (propagateClose) {
389            super.close();
390        }
391    }
392
393    /**
394     * Gets the count of bytes read.
395     *
396     * @return The count of bytes read.
397     * @since 2.12.0
398     */
399    public synchronized long getCount() {
400        return count;
401    }
402
403    /**
404     * Gets the maximum number of bytes to read.
405     *
406     * @return The maximum number of bytes to read, or {@value IOUtils#EOF} if unbounded.
407     * @since 2.16.0
408     */
409    public long getMaxCount() {
410        return maxCount;
411    }
412
413    /**
414     * Gets the maximum count of bytes to read.
415     *
416     * @return The maximum count of bytes to read.
417     * @since 2.12.0
418     * @deprecated Use {@link #getMaxCount()}.
419     */
420    @Deprecated
421    public long getMaxLength() {
422        return maxCount;
423    }
424
425    /**
426     * Gets the number of bytes remaining to read before the maximum is reached.
427     *
428     * <p>
429     * This method does <strong>not</strong> report the bytes available in the
430     * underlying stream; it only reflects the remaining allowance imposed by this
431     * {@code BoundedInputStream}.
432     * </p>
433     *
434     * @return The number of bytes remaining to read before the maximum is reached,
435     *         or {@link Long#MAX_VALUE} if no bound is set.
436     * @since 2.16.0
437     */
438    public long getRemaining() {
439        final long maxCount = getMaxCount();
440        return maxCount == EOF ? Long.MAX_VALUE : Math.max(0, maxCount - getCount());
441    }
442
443    private boolean isMaxCount() {
444        return maxCount >= 0 && getCount() >= maxCount;
445    }
446
447    /**
448     * Tests whether the {@link #close()} method should propagate to the underling {@link InputStream}.
449     *
450     * @return {@code true} if calling {@link #close()} propagates to the {@code close()} method of the underlying stream or {@code false} if it does not.
451     */
452    public boolean isPropagateClose() {
453        return propagateClose;
454    }
455
456    /**
457     * Invokes the delegate's {@link InputStream#mark(int)} method.
458     *
459     * @param readLimit read ahead limit.
460     */
461    @Override
462    public synchronized void mark(final int readLimit) {
463        in.mark(readLimit);
464        mark = count;
465    }
466
467    /**
468     * Invokes the delegate's {@link InputStream#markSupported()} method.
469     *
470     * @return true if mark is supported, otherwise false.
471     */
472    @Override
473    public boolean markSupported() {
474        return in.markSupported();
475    }
476
477    /**
478     * A caller has caused a request that would cross the {@code maxLength} boundary.
479     * <p>
480     * Delegates to the consumer set in {@link Builder#setOnMaxCount(IOBiConsumer)}.
481     * </p>
482     *
483     * @param max The maximum count of bytes to read.
484     * @param count     The count of bytes read.
485     * @throws IOException Subclasses may throw.
486     * @since 2.12.0
487     */
488    @SuppressWarnings("unused")
489    // TODO Rename to onMaxCount for 3.0
490    protected void onMaxLength(final long max, final long count) throws IOException {
491        onMaxCount.accept(max, count);
492    }
493
494    /**
495     * Invokes the delegate's {@link InputStream#read()} method if the current position is less than the limit.
496     *
497     * @return the byte read or -1 if the end of stream or the limit has been reached.
498     * @throws IOException if an I/O error occurs.
499     */
500    @Override
501    public int read() throws IOException {
502        if (isMaxCount()) {
503            onMaxLength(maxCount, getCount());
504            return EOF;
505        }
506        return super.read();
507    }
508
509    /**
510     * Invokes the delegate's {@link InputStream#read(byte[])} method.
511     *
512     * @param b the buffer to read the bytes into.
513     * @return the number of bytes read or -1 if the end of stream or the limit has been reached.
514     * @throws IOException if an I/O error occurs.
515     */
516    @Override
517    public int read(final byte[] b) throws IOException {
518        return read(b, 0, b.length);
519    }
520
521    /**
522     * Invokes the delegate's {@link InputStream#read(byte[], int, int)} method.
523     *
524     * @param b   the buffer to read the bytes into.
525     * @param off The start offset.
526     * @param len The number of bytes to read.
527     * @return the number of bytes read or -1 if the end of stream or the limit has been reached.
528     * @throws IOException if an I/O error occurs.
529     */
530    @Override
531    public int read(final byte[] b, final int off, final int len) throws IOException {
532        if (isMaxCount()) {
533            onMaxLength(maxCount, getCount());
534            return EOF;
535        }
536        return super.read(b, off, (int) toReadLen(len));
537    }
538
539    /**
540     * Invokes the delegate's {@link InputStream#reset()} method.
541     *
542     * @throws IOException if an I/O error occurs.
543     */
544    @Override
545    public synchronized void reset() throws IOException {
546        in.reset();
547        count = mark;
548    }
549
550    /**
551     * Sets whether the {@link #close()} method should propagate to the underling {@link InputStream}.
552     *
553     * @param propagateClose {@code true} if calling {@link #close()} propagates to the {@code close()} method of the underlying stream or {@code false} if it
554     *                       does not.
555     * @deprecated Use {@link AbstractBuilder#setPropagateClose(boolean)}.
556     */
557    @Deprecated
558    public synchronized void setPropagateClose(final boolean propagateClose) {
559        this.propagateClose = propagateClose;
560    }
561
562    /**
563     * Invokes the delegate's {@link InputStream#skip(long)} method.
564     *
565     * @param n the number of bytes to skip.
566     * @return the actual number of bytes skipped.
567     * @throws IOException if an I/O error occurs.
568     */
569    @Override
570    public synchronized long skip(final long n) throws IOException {
571        final long skip = super.skip(toReadLen(n));
572        count += skip;
573        return skip;
574    }
575
576    /**
577     * Converts a request to read {@code len} bytes to a lower count if reading would put us over the limit.
578     * <p>
579     * If a {@code maxCount} is not set, then return max{@code maxCount}.
580     * </p>
581     *
582     * @param len The requested byte count.
583     * @return How many bytes to actually attempt to read.
584     */
585    private long toReadLen(final long len) {
586        return maxCount >= 0 ? Math.min(len, maxCount - getCount()) : len;
587    }
588
589    /**
590     * Invokes the delegate's {@link InputStream#toString()} method.
591     *
592     * @return the delegate's {@link InputStream#toString()}.
593     */
594    @Override
595    public String toString() {
596        return in.toString();
597    }
598}