001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      https://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.commons.io.input;
018
019import static org.apache.commons.io.IOUtils.EOF;
020
021import java.io.IOException;
022import java.io.InputStream;
023
024import org.apache.commons.io.IOUtils;
025import org.apache.commons.io.function.IOBiConsumer;
026
027//@formatter:off
028/**
029 * Reads bytes up to a maximum count and stops once reached.
030 * <p>
031 * To build an instance: Use the {@link #builder()} to access all features.
032 * </p>
033 * <p>
034 * By default, a {@link BoundedInputStream} is <em>unbound</em>; so make sure to call {@link AbstractBuilder#setMaxCount(long)}.
035 * </p>
036 * <p>
037 * You can find out how many bytes this stream has seen so far by calling {@link BoundedInputStream#getCount()}. This value reflects bytes read and skipped.
038 * </p>
039 * <h2>Using a ServletInputStream</h2>
040 * <p>
041 * A {@code ServletInputStream} can block if you try to read content that isn't there
042 * because it doesn't know whether the content hasn't arrived yet or whether the content has finished. Initialize an {@link BoundedInputStream} with the
043 * {@code Content-Length} sent in the {@code ServletInputStream}'s header, this stop it from blocking, providing it's been sent with a correct content
044 * length in the first place.
045 * </p>
046 * <h2>Using NIO</h2>
047 * <pre>{@code
048 * BoundedInputStream s = BoundedInputStream.builder()
049 *   .setPath(Paths.get("MyFile.xml"))
050 *   .setMaxCount(1024)
051 *   .setPropagateClose(false)
052 *   .get();
053 * }
054 * </pre>
055 * <h2>Using IO</h2>
056 * <pre>{@code
057 * BoundedInputStream s = BoundedInputStream.builder()
058 *   .setFile(new File("MyFile.xml"))
059 *   .setMaxCount(1024)
060 *   .setPropagateClose(false)
061 *   .get();
062 * }
063 * </pre>
064 * <h2>Counting Bytes</h2>
065 * <p>You can set the running count when building, which is most useful when starting from another stream:
066 * <pre>{@code
067 * InputStream in = ...;
068 * BoundedInputStream s = BoundedInputStream.builder()
069 *   .setInputStream(in)
070 *   .setCount(12)
071 *   .setMaxCount(1024)
072 *   .setPropagateClose(false)
073 *   .get();
074 * }
075 * </pre>
076 * <h2>Listening for the maximum count reached</h2>
077 * <pre>{@code
078 * BoundedInputStream s = BoundedInputStream.builder()
079 *   .setPath(Paths.get("MyFile.xml"))
080 *   .setMaxCount(1024)
081 *   .setOnMaxCount((max, count) -> System.out.printf("Maximum count %,d reached with a last read count of %,d%n", max, count))
082 *   .get();
083 * }
084 * </pre>
085 *
086 * @see Builder
087 * @since 2.0
088 */
089//@formatter:on
090public class BoundedInputStream extends ProxyInputStream {
091
092    /**
093     * For subclassing builders from {@link BoundedInputStream} subclassses.
094     *
095     * @param <T> The subclass.
096     */
097    abstract static class AbstractBuilder<T extends AbstractBuilder<T>> extends ProxyInputStream.AbstractBuilder<BoundedInputStream, T> {
098
099        /** The current count of bytes counted. */
100        private long count;
101
102        /** The maximum count of bytes to read. */
103        private long maxCount = EOF;
104
105        private IOBiConsumer<Long, Long> onMaxCount = IOBiConsumer.noop();
106
107        /** Flag if {@link #close()} should be propagated, {@code true} by default. */
108        private boolean propagateClose = true;
109
110        long getCount() {
111            return count;
112        }
113
114        long getMaxCount() {
115            return maxCount;
116        }
117
118        IOBiConsumer<Long, Long> getOnMaxCount() {
119            return onMaxCount;
120        }
121
122        boolean isPropagateClose() {
123            return propagateClose;
124        }
125
126        /**
127         * Sets the current number of bytes counted.
128         * <p>
129         * Useful when building from another stream to carry forward a read count.
130         * </p>
131         * <p>
132         * Default is {@code 0}, negative means 0.
133         * </p>
134         *
135         * @param count The current number of bytes counted.
136         * @return {@code this} instance.
137         */
138        public T setCount(final long count) {
139            this.count = Math.max(0, count);
140            return asThis();
141        }
142
143        /**
144         * Sets the maximum number of bytes to return.
145         * <p>
146         * Default is {@value IOUtils#EOF}, negative means unbound.
147         * </p>
148         *
149         * @param maxCount The maximum number of bytes to return, negative means unbound.
150         * @return {@code this} instance.
151         */
152        public T setMaxCount(final long maxCount) {
153            this.maxCount = Math.max(EOF, maxCount);
154            return asThis();
155        }
156
157        /**
158         * Sets the default {@link BoundedInputStream#onMaxLength(long, long)} behavior, {@code null} resets to a NOOP.
159         * <p>
160         * The first Long is the number of bytes remaining to read before the maximum is reached count of bytes to read. The second Long is the count of bytes
161         * read.
162         * </p>
163         * <p>
164         * This does <em>not</em> override a {@code BoundedInputStream} subclass' implementation of the {@link BoundedInputStream#onMaxLength(long, long)}
165         * method.
166         * </p>
167         *
168         * @param onMaxCount the {@link ProxyInputStream#afterRead(int)} behavior.
169         * @return {@code this} instance.
170         * @since 2.18.0
171         */
172        public T setOnMaxCount(final IOBiConsumer<Long, Long> onMaxCount) {
173            this.onMaxCount = onMaxCount != null ? onMaxCount : IOBiConsumer.noop();
174            return asThis();
175        }
176
177        /**
178         * Sets whether the {@link #close()} method should propagate to the underling {@link InputStream}.
179         * <p>
180         * Default is {@code true}.
181         * </p>
182         *
183         * @param propagateClose {@code true} if calling {@link #close()} propagates to the {@code close()} method of the underlying stream or {@code false} if
184         *                       it does not.
185         * @return {@code this} instance.
186         */
187        public T setPropagateClose(final boolean propagateClose) {
188            this.propagateClose = propagateClose;
189            return asThis();
190        }
191
192    }
193
194    //@formatter:off
195    /**
196     * Builds a new {@link BoundedInputStream}.
197     * <p>
198     * By default, a {@link BoundedInputStream} is <em>unbound</em>; so make sure to call {@link AbstractBuilder#setMaxCount(long)}.
199     * </p>
200     * <p>
201     * You can find out how many bytes this stream has seen so far by calling {@link BoundedInputStream#getCount()}. This value reflects bytes read and skipped.
202     * </p>
203     * <h2>Using a ServletInputStream</h2>
204     * <p>
205     * A {@code ServletInputStream} can block if you try to read content that isn't there
206     * because it doesn't know whether the content hasn't arrived yet or whether the content has finished. Initialize an {@link BoundedInputStream} with the
207     * {@code Content-Length} sent in the {@code ServletInputStream}'s header, this stop it from blocking, providing it's been sent with a correct content
208     * length in the first place.
209     * </p>
210     * <h2>Using NIO</h2>
211     * <pre>{@code
212     * BoundedInputStream s = BoundedInputStream.builder()
213     *   .setPath(Paths.get("MyFile.xml"))
214     *   .setMaxCount(1024)
215     *   .setPropagateClose(false)
216     *   .get();
217     * }
218     * </pre>
219     * <h2>Using IO</h2>
220     * <pre>{@code
221     * BoundedInputStream s = BoundedInputStream.builder()
222     *   .setFile(new File("MyFile.xml"))
223     *   .setMaxCount(1024)
224     *   .setPropagateClose(false)
225     *   .get();
226     * }
227     * </pre>
228     * <h2>Counting Bytes</h2>
229     * <p>You can set the running count when building, which is most useful when starting from another stream:
230     * <pre>{@code
231     * InputStream in = ...;
232     * BoundedInputStream s = BoundedInputStream.builder()
233     *   .setInputStream(in)
234     *   .setCount(12)
235     *   .setMaxCount(1024)
236     *   .setPropagateClose(false)
237     *   .get();
238     * }
239     * </pre>
240     *
241     * @see #get()
242     * @since 2.16.0
243     */
244    //@formatter:on
245    public static class Builder extends AbstractBuilder<Builder> {
246
247        /**
248         * Constructs a new builder of {@link BoundedInputStream}.
249         */
250        public Builder() {
251            // empty
252        }
253
254        /**
255         * Builds a new {@link BoundedInputStream}.
256         * <p>
257         * You must set an aspect that supports {@link #getInputStream()}, otherwise, this method throws an exception.
258         * </p>
259         * <p>
260         * If you start from an input stream, an exception can't be thrown, and you can call {@link #getUnchecked()} instead.
261         * </p>
262         * <p>
263         * This builder uses the following aspects:
264         * </p>
265         * <ul>
266         * <li>{@link #getInputStream()} gets the target aspect.</li>
267         * <li>{@link #getAfterRead()}</li>
268         * <li>{@code #getCount()}</li>
269         * <li>{@code #getMaxCount()}</li>
270         * <li>{@code #getOnMaxCount()}</li>
271         * <li>{@code #isPropagateClose()}</li>
272         * </ul>
273         *
274         * @return a new instance.
275         * @throws IllegalStateException         if the {@code origin} is {@code null}.
276         * @throws UnsupportedOperationException if the origin cannot be converted to an {@link InputStream}.
277         * @throws IOException                   if an I/O error occurs converting to an {@link InputStream} using {@link #getInputStream()}.
278         * @see #getInputStream()
279         * @see #getUnchecked()
280         */
281        @Override
282        public BoundedInputStream get() throws IOException {
283            return new BoundedInputStream(this);
284        }
285
286    }
287
288    /**
289     * Constructs a new {@link AbstractBuilder}.
290     *
291     * @return a new {@link AbstractBuilder}.
292     * @since 2.16.0
293     */
294    public static Builder builder() {
295        return new Builder();
296    }
297
298    /** The current count of bytes counted. */
299    private long count;
300
301    /** The current mark. */
302    private long mark;
303
304    /** The maximum count of bytes to read. */
305    private final long maxCount;
306
307    private final IOBiConsumer<Long, Long> onMaxCount;
308
309    /**
310     * Flag if close should be propagated.
311     *
312     * TODO Make final in 3.0.
313     */
314    private boolean propagateClose = true;
315
316    private BoundedInputStream(final Builder builder) throws IOException {
317        super(builder);
318        this.count = builder.getCount();
319        this.maxCount = builder.getMaxCount();
320        this.propagateClose = builder.isPropagateClose();
321        this.onMaxCount = builder.getOnMaxCount();
322    }
323
324    /**
325     * Constructs a new {@link BoundedInputStream} that wraps the given input stream and is <em>unbounded</em>.
326     * <p>
327     * To build an instance: Use the {@link #builder()} to access all features.
328     * </p>
329     *
330     * @param in The wrapped input stream.
331     * @deprecated Use {@link AbstractBuilder#get()}.
332     */
333    @Deprecated
334    public BoundedInputStream(final InputStream in) {
335        this(in, EOF);
336    }
337
338    private BoundedInputStream(final InputStream inputStream, final Builder builder) {
339        super(inputStream, builder);
340        this.count = builder.getCount();
341        this.maxCount = builder.getMaxCount();
342        this.propagateClose = builder.isPropagateClose();
343        this.onMaxCount = builder.getOnMaxCount();
344    }
345
346    /**
347     * Constructs a new {@link BoundedInputStream} that wraps the given input stream and limits it to a certain size.
348     *
349     * @param inputStream The wrapped input stream.
350     * @param maxCount    The maximum number of bytes to return, negative means unbound.
351     * @deprecated Use {@link AbstractBuilder#get()}.
352     */
353    @Deprecated
354    public BoundedInputStream(final InputStream inputStream, final long maxCount) {
355        // Some badly designed methods, for example the Servlet API, overload length
356        // such that "-1" means stream finished
357        this(inputStream, builder().setMaxCount(maxCount));
358    }
359
360    /**
361     * Adds the number of read bytes to the count.
362     *
363     * @param n number of bytes read, or -1 if no more bytes are available.
364     * @throws IOException Not thrown here but subclasses may throw.
365     * @since 2.0
366     */
367    @Override
368    protected synchronized void afterRead(final int n) throws IOException {
369        if (n != EOF) {
370            count += n;
371        }
372        super.afterRead(n);
373    }
374
375    @Override
376    public int available() throws IOException {
377        // Safe cast: value is between 0 and Integer.MAX_VALUE
378        final int remaining = (int) Math.min(getRemaining(), Integer.MAX_VALUE);
379        return Math.min(super.available(), remaining);
380    }
381
382    /**
383     * Invokes the delegate's {@link InputStream#close()} method if {@link #isPropagateClose()} is {@code true}.
384     *
385     * @throws IOException if an I/O error occurs.
386     */
387    @Override
388    public void close() throws IOException {
389        if (propagateClose) {
390            super.close();
391        }
392    }
393
394    /**
395     * Gets the count of bytes read.
396     *
397     * @return The count of bytes read.
398     * @since 2.12.0
399     */
400    public synchronized long getCount() {
401        return count;
402    }
403
404    /**
405     * Gets the maximum number of bytes to read.
406     *
407     * @return The maximum number of bytes to read, or {@value IOUtils#EOF} if unbounded.
408     * @since 2.16.0
409     */
410    public long getMaxCount() {
411        return maxCount;
412    }
413
414    /**
415     * Gets the maximum count of bytes to read.
416     *
417     * @return The maximum count of bytes to read.
418     * @since 2.12.0
419     * @deprecated Use {@link #getMaxCount()}.
420     */
421    @Deprecated
422    public long getMaxLength() {
423        return maxCount;
424    }
425
426    /**
427     * Gets the number of bytes remaining to read before the maximum is reached.
428     *
429     * <p>
430     * This method does <strong>not</strong> report the bytes available in the
431     * underlying stream; it only reflects the remaining allowance imposed by this
432     * {@code BoundedInputStream}.
433     * </p>
434     *
435     * @return The number of bytes remaining to read before the maximum is reached,
436     *         or {@link Long#MAX_VALUE} if no bound is set.
437     * @since 2.16.0
438     */
439    public long getRemaining() {
440        final long maxCount = getMaxCount();
441        return maxCount == EOF ? Long.MAX_VALUE : Math.max(0, maxCount - getCount());
442    }
443
444    private boolean isMaxCount() {
445        return maxCount >= 0 && getCount() >= maxCount;
446    }
447
448    /**
449     * Tests whether the {@link #close()} method should propagate to the underling {@link InputStream}.
450     *
451     * @return {@code true} if calling {@link #close()} propagates to the {@code close()} method of the underlying stream or {@code false} if it does not.
452     */
453    public boolean isPropagateClose() {
454        return propagateClose;
455    }
456
457    /**
458     * Invokes the delegate's {@link InputStream#mark(int)} method.
459     *
460     * @param readLimit read ahead limit.
461     */
462    @Override
463    public synchronized void mark(final int readLimit) {
464        in.mark(readLimit);
465        mark = count;
466    }
467
468    /**
469     * Invokes the delegate's {@link InputStream#markSupported()} method.
470     *
471     * @return true if mark is supported, otherwise false.
472     */
473    @Override
474    public boolean markSupported() {
475        return in.markSupported();
476    }
477
478    /**
479     * A caller has caused a request that would cross the {@code maxLength} boundary.
480     * <p>
481     * Delegates to the consumer set in {@link Builder#setOnMaxCount(IOBiConsumer)}.
482     * </p>
483     *
484     * @param max The maximum count of bytes to read.
485     * @param count     The count of bytes read.
486     * @throws IOException Subclasses may throw.
487     * @since 2.12.0
488     */
489    @SuppressWarnings("unused")
490    // TODO Rename to onMaxCount for 3.0
491    protected void onMaxLength(final long max, final long count) throws IOException {
492        onMaxCount.accept(max, count);
493    }
494
495    /**
496     * Invokes the delegate's {@link InputStream#read()} method if the current position is less than the limit.
497     *
498     * @return the byte read or -1 if the end of stream or the limit has been reached.
499     * @throws IOException if an I/O error occurs.
500     */
501    @Override
502    public int read() throws IOException {
503        if (isMaxCount()) {
504            onMaxLength(maxCount, getCount());
505            return EOF;
506        }
507        return super.read();
508    }
509
510    /**
511     * Invokes the delegate's {@link InputStream#read(byte[])} method.
512     *
513     * @param b the buffer to read the bytes into.
514     * @return the number of bytes read or -1 if the end of stream or the limit has been reached.
515     * @throws IOException if an I/O error occurs.
516     */
517    @Override
518    public int read(final byte[] b) throws IOException {
519        return read(b, 0, b.length);
520    }
521
522    /**
523     * Invokes the delegate's {@link InputStream#read(byte[], int, int)} method.
524     *
525     * @param b   the buffer to read the bytes into.
526     * @param off The start offset.
527     * @param len The number of bytes to read.
528     * @return the number of bytes read or -1 if the end of stream or the limit has been reached.
529     * @throws IOException if an I/O error occurs.
530     */
531    @Override
532    public int read(final byte[] b, final int off, final int len) throws IOException {
533        if (isMaxCount()) {
534            onMaxLength(maxCount, getCount());
535            return EOF;
536        }
537        return super.read(b, off, (int) toReadLen(len));
538    }
539
540    /**
541     * Invokes the delegate's {@link InputStream#reset()} method.
542     *
543     * @throws IOException if an I/O error occurs.
544     */
545    @Override
546    public synchronized void reset() throws IOException {
547        in.reset();
548        count = mark;
549    }
550
551    /**
552     * Sets whether the {@link #close()} method should propagate to the underling {@link InputStream}.
553     *
554     * @param propagateClose {@code true} if calling {@link #close()} propagates to the {@code close()} method of the underlying stream or {@code false} if it
555     *                       does not.
556     * @deprecated Use {@link AbstractBuilder#setPropagateClose(boolean)}.
557     */
558    @Deprecated
559    public synchronized void setPropagateClose(final boolean propagateClose) {
560        this.propagateClose = propagateClose;
561    }
562
563    /**
564     * Invokes the delegate's {@link InputStream#skip(long)} method.
565     *
566     * @param n the number of bytes to skip.
567     * @return the actual number of bytes skipped.
568     * @throws IOException if an I/O error occurs.
569     */
570    @Override
571    public synchronized long skip(final long n) throws IOException {
572        final long skip = super.skip(toReadLen(n));
573        count += skip;
574        return skip;
575    }
576
577    /**
578     * Converts a request to read {@code len} bytes to a lower count if reading would put us over the limit.
579     * <p>
580     * If a {@code maxCount} is not set, then return max{@code maxCount}.
581     * </p>
582     *
583     * @param len The requested byte count.
584     * @return How many bytes to actually attempt to read.
585     */
586    private long toReadLen(final long len) {
587        return maxCount >= 0 ? Math.min(len, maxCount - getCount()) : len;
588    }
589
590    /**
591     * Invokes the delegate's {@link InputStream#toString()} method.
592     *
593     * @return the delegate's {@link InputStream#toString()}.
594     */
595    @Override
596    public String toString() {
597        return in.toString();
598    }
599}