001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.commons.io.input;
018
019import static org.apache.commons.io.IOUtils.EOF;
020
021import java.io.IOException;
022import java.io.InputStream;
023
024import org.apache.commons.io.IOUtils;
025import org.apache.commons.io.function.IOBiConsumer;
026
027//@formatter:off
028/**
029 * Reads bytes up to a maximum count and stops once reached.
030 * <p>
031 * To build an instance: Use the {@link #builder()} to access all features.
032 * </p>
033 * <p>
034 * By default, a {@link BoundedInputStream} is <em>unbound</em>; so make sure to call {@link AbstractBuilder#setMaxCount(long)}.
035 * </p>
036 * <p>
037 * You can find out how many bytes this stream has seen so far by calling {@link BoundedInputStream#getCount()}. This value reflects bytes read and skipped.
038 * </p>
039 * <h2>Using a ServletInputStream</h2>
040 * <p>
041 * A {@code ServletInputStream} can block if you try to read content that isn't there
042 * because it doesn't know whether the content hasn't arrived yet or whether the content has finished. Initialize an {@link BoundedInputStream} with the
043 * {@code Content-Length} sent in the {@code ServletInputStream}'s header, this stop it from blocking, providing it's been sent with a correct content
044 * length in the first place.
045 * </p>
046 * <h2>Using NIO</h2>
047 * <pre>{@code
048 * BoundedInputStream s = BoundedInputStream.builder()
049 *   .setPath(Paths.get("MyFile.xml"))
050 *   .setMaxCount(1024)
051 *   .setPropagateClose(false)
052 *   .get();
053 * }
054 * </pre>
055 * <h2>Using IO</h2>
056 * <pre>{@code
057 * BoundedInputStream s = BoundedInputStream.builder()
058 *   .setFile(new File("MyFile.xml"))
059 *   .setMaxCount(1024)
060 *   .setPropagateClose(false)
061 *   .get();
062 * }
063 * </pre>
064 * <h2>Counting Bytes</h2>
065 * <p>You can set the running count when building, which is most useful when starting from another stream:
066 * <pre>{@code
067 * InputStream in = ...;
068 * BoundedInputStream s = BoundedInputStream.builder()
069 *   .setInputStream(in)
070 *   .setCount(12)
071 *   .setMaxCount(1024)
072 *   .setPropagateClose(false)
073 *   .get();
074 * }
075 * </pre>
076 * <h2>Listening for the max count reached</h2>
077 * <pre>{@code
078 * BoundedInputStream s = BoundedInputStream.builder()
079 *   .setPath(Paths.get("MyFile.xml"))
080 *   .setMaxCount(1024)
081 *   .setOnMaxCount((max, count) -> System.out.printf("Max count %,d reached with a last read count of %,d%n", max, count))
082 *   .get();
083 * }
084 * </pre>
085 * @see Builder
086 * @since 2.0
087 */
088//@formatter:on
089public class BoundedInputStream extends ProxyInputStream {
090
091    /**
092     * For subclassing builders from {@link BoundedInputStream} subclassses.
093     *
094     * @param <T> The subclass.
095     */
096    abstract static class AbstractBuilder<T extends AbstractBuilder<T>> extends ProxyInputStream.AbstractBuilder<BoundedInputStream, T> {
097
098        /** The current count of bytes counted. */
099        private long count;
100
101        /** The max count of bytes to read. */
102        private long maxCount = EOF;
103
104        private IOBiConsumer<Long, Long> onMaxCount = IOBiConsumer.noop();
105
106        /** Flag if {@link #close()} should be propagated, {@code true} by default. */
107        private boolean propagateClose = true;
108
109        long getCount() {
110            return count;
111        }
112
113        long getMaxCount() {
114            return maxCount;
115        }
116
117        IOBiConsumer<Long, Long> getOnMaxCount() {
118            return onMaxCount;
119        }
120
121        boolean isPropagateClose() {
122            return propagateClose;
123        }
124
125        /**
126         * Sets the current number of bytes counted.
127         * <p>
128         * Useful when building from another stream to carry forward a read count.
129         * </p>
130         * <p>
131         * Default is {@code 0}, negative means 0.
132         * </p>
133         *
134         * @param count The current number of bytes counted.
135         * @return {@code this} instance.
136         */
137        public T setCount(final long count) {
138            this.count = Math.max(0, count);
139            return asThis();
140        }
141
142        /**
143         * Sets the maximum number of bytes to return.
144         * <p>
145         * Default is {@value IOUtils#EOF}, negative means unbound.
146         * </p>
147         *
148         * @param maxCount The maximum number of bytes to return, negative means unbound.
149         * @return {@code this} instance.
150         */
151        public T setMaxCount(final long maxCount) {
152            this.maxCount = Math.max(EOF, maxCount);
153            return asThis();
154        }
155
156        /**
157         * Sets the default {@link BoundedInputStream#onMaxLength(long, long)} behavior, {@code null} resets to a NOOP.
158         * <p>
159         * The first Long is the max count of bytes to read. The second Long is the count of bytes read.
160         * </p>
161         * <p>
162         * This does <em>not</em> override a {@code BoundedInputStream} subclass' implementation of the {@link BoundedInputStream#onMaxLength(long, long)}
163         * method.
164         * </p>
165         *
166         * @param onMaxCount the {@link ProxyInputStream#afterRead(int)} behavior.
167         * @return this instance.
168         * @since 2.18.0
169         */
170        public T setOnMaxCount(final IOBiConsumer<Long, Long> onMaxCount) {
171            this.onMaxCount = onMaxCount != null ? onMaxCount : IOBiConsumer.noop();
172            return asThis();
173        }
174
175        /**
176         * Sets whether the {@link #close()} method should propagate to the underling {@link InputStream}.
177         * <p>
178         * Default is {@code true}.
179         * </p>
180         *
181         * @param propagateClose {@code true} if calling {@link #close()} propagates to the {@code close()} method of the underlying stream or {@code false} if
182         *                       it does not.
183         * @return {@code this} instance.
184         */
185        public T setPropagateClose(final boolean propagateClose) {
186            this.propagateClose = propagateClose;
187            return asThis();
188        }
189
190    }
191
192    //@formatter:off
193    /**
194     * Builds a new {@link BoundedInputStream}.
195     * <p>
196     * By default, a {@link BoundedInputStream} is <em>unbound</em>; so make sure to call {@link AbstractBuilder#setMaxCount(long)}.
197     * </p>
198     * <p>
199     * You can find out how many bytes this stream has seen so far by calling {@link BoundedInputStream#getCount()}. This value reflects bytes read and skipped.
200     * </p>
201     * <h2>Using a ServletInputStream</h2>
202     * <p>
203     * A {@code ServletInputStream} can block if you try to read content that isn't there
204     * because it doesn't know whether the content hasn't arrived yet or whether the content has finished. Initialize an {@link BoundedInputStream} with the
205     * {@code Content-Length} sent in the {@code ServletInputStream}'s header, this stop it from blocking, providing it's been sent with a correct content
206     * length in the first place.
207     * </p>
208     * <h2>Using NIO</h2>
209     * <pre>{@code
210     * BoundedInputStream s = BoundedInputStream.builder()
211     *   .setPath(Paths.get("MyFile.xml"))
212     *   .setMaxCount(1024)
213     *   .setPropagateClose(false)
214     *   .get();
215     * }
216     * </pre>
217     * <h2>Using IO</h2>
218     * <pre>{@code
219     * BoundedInputStream s = BoundedInputStream.builder()
220     *   .setFile(new File("MyFile.xml"))
221     *   .setMaxCount(1024)
222     *   .setPropagateClose(false)
223     *   .get();
224     * }
225     * </pre>
226     * <h2>Counting Bytes</h2>
227     * <p>You can set the running count when building, which is most useful when starting from another stream:
228     * <pre>{@code
229     * InputStream in = ...;
230     * BoundedInputStream s = BoundedInputStream.builder()
231     *   .setInputStream(in)
232     *   .setCount(12)
233     *   .setMaxCount(1024)
234     *   .setPropagateClose(false)
235     *   .get();
236     * }
237     * </pre>
238     *
239     * @see #get()
240     * @since 2.16.0
241     */
242    //@formatter:on
243    public static class Builder extends AbstractBuilder<Builder> {
244
245        /**
246         * Constructs a new builder of {@link BoundedInputStream}.
247         */
248        public Builder() {
249            // empty
250        }
251
252        /**
253         * Builds a new {@link BoundedInputStream}.
254         * <p>
255         * You must set an aspect that supports {@link #getInputStream()}, otherwise, this method throws an exception.
256         * </p>
257         * <p>
258         * If you start from an input stream, an exception can't be thrown, and you can call {@link #getUnchecked()} instead.
259         * </p>
260         * <p>
261         * This builder uses the following aspects:
262         * </p>
263         * <ul>
264         * <li>{@link #getInputStream()} gets the target aspect.</li>
265         * <li>{@link #getAfterRead()}</li>
266         * <li>{@link #getCount()}</li>
267         * <li>{@link #getMaxCount()}</li>
268         * <li>{@link #getOnMaxCount()}</li>
269         * <li>{@link #isPropagateClose()}</li>
270         * </ul>
271         *
272         * @return a new instance.
273         * @throws IllegalStateException         if the {@code origin} is {@code null}.
274         * @throws UnsupportedOperationException if the origin cannot be converted to an {@link InputStream}.
275         * @throws IOException                   if an I/O error occurs converting to an {@link InputStream} using {@link #getInputStream()}.
276         * @see #getInputStream()
277         * @see #getUnchecked()
278         */
279        @Override
280        public BoundedInputStream get() throws IOException {
281            return new BoundedInputStream(this);
282        }
283
284    }
285
286    /**
287     * Constructs a new {@link AbstractBuilder}.
288     *
289     * @return a new {@link AbstractBuilder}.
290     * @since 2.16.0
291     */
292    public static Builder builder() {
293        return new Builder();
294    }
295
296    /** The current count of bytes counted. */
297    private long count;
298
299    /** The current mark. */
300    private long mark;
301
302    /** The max count of bytes to read. */
303    private final long maxCount;
304
305    private final IOBiConsumer<Long, Long> onMaxCount;
306
307    /**
308     * Flag if close should be propagated.
309     *
310     * TODO Make final in 3.0.
311     */
312    private boolean propagateClose = true;
313
314    BoundedInputStream(final Builder builder) throws IOException {
315        super(builder);
316        this.count = builder.getCount();
317        this.maxCount = builder.getMaxCount();
318        this.propagateClose = builder.isPropagateClose();
319        this.onMaxCount = builder.getOnMaxCount();
320    }
321
322    /**
323     * Constructs a new {@link BoundedInputStream} that wraps the given input stream and is <em>unbounded</em>.
324     * <p>
325     * To build an instance: Use the {@link #builder()} to access all features.
326     * </p>
327     *
328     * @param in The wrapped input stream.
329     * @deprecated Use {@link AbstractBuilder#get()}.
330     */
331    @Deprecated
332    public BoundedInputStream(final InputStream in) {
333        this(in, EOF);
334    }
335
336    BoundedInputStream(final InputStream inputStream, final Builder builder) {
337        super(inputStream, builder);
338        this.count = builder.getCount();
339        this.maxCount = builder.getMaxCount();
340        this.propagateClose = builder.isPropagateClose();
341        this.onMaxCount = builder.getOnMaxCount();
342    }
343
344    /**
345     * Constructs a new {@link BoundedInputStream} that wraps the given input stream and limits it to a certain size.
346     *
347     * @param inputStream The wrapped input stream.
348     * @param maxCount    The maximum number of bytes to return, negative means unbound.
349     * @deprecated Use {@link AbstractBuilder#get()}.
350     */
351    @Deprecated
352    public BoundedInputStream(final InputStream inputStream, final long maxCount) {
353        // Some badly designed methods - e.g. the Servlet API - overload length
354        // such that "-1" means stream finished
355        this(inputStream, builder().setMaxCount(maxCount));
356    }
357
358    /**
359     * Adds the number of read bytes to the count.
360     *
361     * @param n number of bytes read, or -1 if no more bytes are available
362     * @throws IOException Not thrown here but subclasses may throw.
363     * @since 2.0
364     */
365    @Override
366    protected synchronized void afterRead(final int n) throws IOException {
367        if (n != EOF) {
368            count += n;
369        }
370        super.afterRead(n);
371    }
372
373    /**
374     * {@inheritDoc}
375     */
376    @Override
377    public int available() throws IOException {
378        if (isMaxCount()) {
379            onMaxLength(maxCount, getCount());
380            return 0;
381        }
382        return in.available();
383    }
384
385    /**
386     * Invokes the delegate's {@link InputStream#close()} method if {@link #isPropagateClose()} is {@code true}.
387     *
388     * @throws IOException if an I/O error occurs.
389     */
390    @Override
391    public void close() throws IOException {
392        if (propagateClose) {
393            super.close();
394        }
395    }
396
397    /**
398     * Gets the count of bytes read.
399     *
400     * @return The count of bytes read.
401     * @since 2.12.0
402     */
403    public synchronized long getCount() {
404        return count;
405    }
406
407    /**
408     * Gets the max count of bytes to read.
409     *
410     * @return The max count of bytes to read.
411     * @since 2.16.0
412     */
413    public long getMaxCount() {
414        return maxCount;
415    }
416
417    /**
418     * Gets the max count of bytes to read.
419     *
420     * @return The max count of bytes to read.
421     * @since 2.12.0
422     * @deprecated Use {@link #getMaxCount()}.
423     */
424    @Deprecated
425    public long getMaxLength() {
426        return maxCount;
427    }
428
429    /**
430     * Gets how many bytes remain to read.
431     *
432     * @return bytes how many bytes remain to read.
433     * @since 2.16.0
434     */
435    public long getRemaining() {
436        return Math.max(0, getMaxCount() - getCount());
437    }
438
439    private boolean isMaxCount() {
440        return maxCount >= 0 && getCount() >= maxCount;
441    }
442
443    /**
444     * Tests whether the {@link #close()} method should propagate to the underling {@link InputStream}.
445     *
446     * @return {@code true} if calling {@link #close()} propagates to the {@code close()} method of the underlying stream or {@code false} if it does not.
447     */
448    public boolean isPropagateClose() {
449        return propagateClose;
450    }
451
452    /**
453     * Invokes the delegate's {@link InputStream#mark(int)} method.
454     *
455     * @param readLimit read ahead limit
456     */
457    @Override
458    public synchronized void mark(final int readLimit) {
459        in.mark(readLimit);
460        mark = count;
461    }
462
463    /**
464     * Invokes the delegate's {@link InputStream#markSupported()} method.
465     *
466     * @return true if mark is supported, otherwise false
467     */
468    @Override
469    public boolean markSupported() {
470        return in.markSupported();
471    }
472
473    /**
474     * A caller has caused a request that would cross the {@code maxLength} boundary.
475     * <p>
476     * Delegates to the consumer set in {@link Builder#setOnMaxCount(IOBiConsumer)}.
477     * </p>
478     *
479     * @param max The max count of bytes to read.
480     * @param count     The count of bytes read.
481     * @throws IOException Subclasses may throw.
482     * @since 2.12.0
483     */
484    @SuppressWarnings("unused")
485    // TODO Rename to onMaxCount for 3.0
486    protected void onMaxLength(final long max, final long count) throws IOException {
487        onMaxCount.accept(max, count);
488    }
489
490    /**
491     * Invokes the delegate's {@link InputStream#read()} method if the current position is less than the limit.
492     *
493     * @return the byte read or -1 if the end of stream or the limit has been reached.
494     * @throws IOException if an I/O error occurs.
495     */
496    @Override
497    public int read() throws IOException {
498        if (isMaxCount()) {
499            onMaxLength(maxCount, getCount());
500            return EOF;
501        }
502        return super.read();
503    }
504
505    /**
506     * Invokes the delegate's {@link InputStream#read(byte[])} method.
507     *
508     * @param b the buffer to read the bytes into
509     * @return the number of bytes read or -1 if the end of stream or the limit has been reached.
510     * @throws IOException if an I/O error occurs.
511     */
512    @Override
513    public int read(final byte[] b) throws IOException {
514        return read(b, 0, b.length);
515    }
516
517    /**
518     * Invokes the delegate's {@link InputStream#read(byte[], int, int)} method.
519     *
520     * @param b   the buffer to read the bytes into
521     * @param off The start offset
522     * @param len The number of bytes to read
523     * @return the number of bytes read or -1 if the end of stream or the limit has been reached.
524     * @throws IOException if an I/O error occurs.
525     */
526    @Override
527    public int read(final byte[] b, final int off, final int len) throws IOException {
528        if (isMaxCount()) {
529            onMaxLength(maxCount, getCount());
530            return EOF;
531        }
532        return super.read(b, off, (int) toReadLen(len));
533    }
534
535    /**
536     * Invokes the delegate's {@link InputStream#reset()} method.
537     *
538     * @throws IOException if an I/O error occurs.
539     */
540    @Override
541    public synchronized void reset() throws IOException {
542        in.reset();
543        count = mark;
544    }
545
546    /**
547     * Sets whether the {@link #close()} method should propagate to the underling {@link InputStream}.
548     *
549     * @param propagateClose {@code true} if calling {@link #close()} propagates to the {@code close()} method of the underlying stream or {@code false} if it
550     *                       does not.
551     * @deprecated Use {@link AbstractBuilder#setPropagateClose(boolean)}.
552     */
553    @Deprecated
554    public void setPropagateClose(final boolean propagateClose) {
555        this.propagateClose = propagateClose;
556    }
557
558    /**
559     * Invokes the delegate's {@link InputStream#skip(long)} method.
560     *
561     * @param n the number of bytes to skip
562     * @return the actual number of bytes skipped
563     * @throws IOException if an I/O error occurs.
564     */
565    @Override
566    public synchronized long skip(final long n) throws IOException {
567        final long skip = super.skip(toReadLen(n));
568        count += skip;
569        return skip;
570    }
571
572    private long toReadLen(final long len) {
573        return maxCount >= 0 ? Math.min(len, maxCount - getCount()) : len;
574    }
575
576    /**
577     * Invokes the delegate's {@link InputStream#toString()} method.
578     *
579     * @return the delegate's {@link InputStream#toString()}
580     */
581    @Override
582    public String toString() {
583        return in.toString();
584    }
585}