View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *      https://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.commons.io.input;
18  
19  import static org.apache.commons.io.IOUtils.EOF;
20  
21  import java.io.IOException;
22  import java.io.InputStream;
23  
24  import org.apache.commons.io.IOUtils;
25  import org.apache.commons.io.function.IOBiConsumer;
26  
27  //@formatter:off
28  /**
29   * Reads bytes up to a maximum count and stops once reached.
30   * <p>
31   * To build an instance: Use the {@link #builder()} to access all features.
32   * </p>
33   * <p>
34   * By default, a {@link BoundedInputStream} is <em>unbound</em>; so make sure to call {@link AbstractBuilder#setMaxCount(long)}.
35   * </p>
36   * <p>
37   * You can find out how many bytes this stream has seen so far by calling {@link BoundedInputStream#getCount()}. This value reflects bytes read and skipped.
38   * </p>
39   * <h2>Using a ServletInputStream</h2>
40   * <p>
41   * A {@code ServletInputStream} can block if you try to read content that isn't there
42   * because it doesn't know whether the content hasn't arrived yet or whether the content has finished. Initialize an {@link BoundedInputStream} with the
43   * {@code Content-Length} sent in the {@code ServletInputStream}'s header, this stop it from blocking, providing it's been sent with a correct content
44   * length in the first place.
45   * </p>
46   * <h2>Using NIO</h2>
47   * <pre>{@code
48   * BoundedInputStream s = BoundedInputStream.builder()
49   *   .setPath(Paths.get("MyFile.xml"))
50   *   .setMaxCount(1024)
51   *   .setPropagateClose(false)
52   *   .get();
53   * }
54   * </pre>
55   * <h2>Using IO</h2>
56   * <pre>{@code
57   * BoundedInputStream s = BoundedInputStream.builder()
58   *   .setFile(new File("MyFile.xml"))
59   *   .setMaxCount(1024)
60   *   .setPropagateClose(false)
61   *   .get();
62   * }
63   * </pre>
64   * <h2>Counting Bytes</h2>
65   * <p>You can set the running count when building, which is most useful when starting from another stream:
66   * <pre>{@code
67   * InputStream in = ...;
68   * BoundedInputStream s = BoundedInputStream.builder()
69   *   .setInputStream(in)
70   *   .setCount(12)
71   *   .setMaxCount(1024)
72   *   .setPropagateClose(false)
73   *   .get();
74   * }
75   * </pre>
76   * <h2>Listening for the maximum count reached</h2>
77   * <pre>{@code
78   * BoundedInputStream s = BoundedInputStream.builder()
79   *   .setPath(Paths.get("MyFile.xml"))
80   *   .setMaxCount(1024)
81   *   .setOnMaxCount((max, count) -> System.out.printf("Maximum count %,d reached with a last read count of %,d%n", max, count))
82   *   .get();
83   * }
84   * </pre>
85   * @see Builder
86   * @since 2.0
87   */
88  //@formatter:on
89  public class BoundedInputStream extends ProxyInputStream {
90  
91      /**
92       * For subclassing builders from {@link BoundedInputStream} subclassses.
93       *
94       * @param <T> The subclass.
95       */
96      abstract static class AbstractBuilder<T extends AbstractBuilder<T>> extends ProxyInputStream.AbstractBuilder<BoundedInputStream, T> {
97  
98          /** The current count of bytes counted. */
99          private long count;
100 
101         /** The maximum count of bytes to read. */
102         private long maxCount = EOF;
103 
104         private IOBiConsumer<Long, Long> onMaxCount = IOBiConsumer.noop();
105 
106         /** Flag if {@link #close()} should be propagated, {@code true} by default. */
107         private boolean propagateClose = true;
108 
109         long getCount() {
110             return count;
111         }
112 
113         long getMaxCount() {
114             return maxCount;
115         }
116 
117         IOBiConsumer<Long, Long> getOnMaxCount() {
118             return onMaxCount;
119         }
120 
121         boolean isPropagateClose() {
122             return propagateClose;
123         }
124 
125         /**
126          * Sets the current number of bytes counted.
127          * <p>
128          * Useful when building from another stream to carry forward a read count.
129          * </p>
130          * <p>
131          * Default is {@code 0}, negative means 0.
132          * </p>
133          *
134          * @param count The current number of bytes counted.
135          * @return {@code this} instance.
136          */
137         public T setCount(final long count) {
138             this.count = Math.max(0, count);
139             return asThis();
140         }
141 
142         /**
143          * Sets the maximum number of bytes to return.
144          * <p>
145          * Default is {@value IOUtils#EOF}, negative means unbound.
146          * </p>
147          *
148          * @param maxCount The maximum number of bytes to return, negative means unbound.
149          * @return {@code this} instance.
150          */
151         public T setMaxCount(final long maxCount) {
152             this.maxCount = Math.max(EOF, maxCount);
153             return asThis();
154         }
155 
156         /**
157          * Sets the default {@link BoundedInputStream#onMaxLength(long, long)} behavior, {@code null} resets to a NOOP.
158          * <p>
159          * The first Long is the number of bytes remaining to read before the maximum is reached count of bytes to read. The second Long is the count of bytes
160          * read.
161          * </p>
162          * <p>
163          * This does <em>not</em> override a {@code BoundedInputStream} subclass' implementation of the {@link BoundedInputStream#onMaxLength(long, long)}
164          * method.
165          * </p>
166          *
167          * @param onMaxCount the {@link ProxyInputStream#afterRead(int)} behavior.
168          * @return {@code this} instance.
169          * @since 2.18.0
170          */
171         public T setOnMaxCount(final IOBiConsumer<Long, Long> onMaxCount) {
172             this.onMaxCount = onMaxCount != null ? onMaxCount : IOBiConsumer.noop();
173             return asThis();
174         }
175 
176         /**
177          * Sets whether the {@link #close()} method should propagate to the underling {@link InputStream}.
178          * <p>
179          * Default is {@code true}.
180          * </p>
181          *
182          * @param propagateClose {@code true} if calling {@link #close()} propagates to the {@code close()} method of the underlying stream or {@code false} if
183          *                       it does not.
184          * @return {@code this} instance.
185          */
186         public T setPropagateClose(final boolean propagateClose) {
187             this.propagateClose = propagateClose;
188             return asThis();
189         }
190 
191     }
192 
193     //@formatter:off
194     /**
195      * Builds a new {@link BoundedInputStream}.
196      * <p>
197      * By default, a {@link BoundedInputStream} is <em>unbound</em>; so make sure to call {@link AbstractBuilder#setMaxCount(long)}.
198      * </p>
199      * <p>
200      * You can find out how many bytes this stream has seen so far by calling {@link BoundedInputStream#getCount()}. This value reflects bytes read and skipped.
201      * </p>
202      * <h2>Using a ServletInputStream</h2>
203      * <p>
204      * A {@code ServletInputStream} can block if you try to read content that isn't there
205      * because it doesn't know whether the content hasn't arrived yet or whether the content has finished. Initialize an {@link BoundedInputStream} with the
206      * {@code Content-Length} sent in the {@code ServletInputStream}'s header, this stop it from blocking, providing it's been sent with a correct content
207      * length in the first place.
208      * </p>
209      * <h2>Using NIO</h2>
210      * <pre>{@code
211      * BoundedInputStream s = BoundedInputStream.builder()
212      *   .setPath(Paths.get("MyFile.xml"))
213      *   .setMaxCount(1024)
214      *   .setPropagateClose(false)
215      *   .get();
216      * }
217      * </pre>
218      * <h2>Using IO</h2>
219      * <pre>{@code
220      * BoundedInputStream s = BoundedInputStream.builder()
221      *   .setFile(new File("MyFile.xml"))
222      *   .setMaxCount(1024)
223      *   .setPropagateClose(false)
224      *   .get();
225      * }
226      * </pre>
227      * <h2>Counting Bytes</h2>
228      * <p>You can set the running count when building, which is most useful when starting from another stream:
229      * <pre>{@code
230      * InputStream in = ...;
231      * BoundedInputStream s = BoundedInputStream.builder()
232      *   .setInputStream(in)
233      *   .setCount(12)
234      *   .setMaxCount(1024)
235      *   .setPropagateClose(false)
236      *   .get();
237      * }
238      * </pre>
239      *
240      * @see #get()
241      * @since 2.16.0
242      */
243     //@formatter:on
244     public static class Builder extends AbstractBuilder<Builder> {
245 
246         /**
247          * Constructs a new builder of {@link BoundedInputStream}.
248          */
249         public Builder() {
250             // empty
251         }
252 
253         /**
254          * Builds a new {@link BoundedInputStream}.
255          * <p>
256          * You must set an aspect that supports {@link #getInputStream()}, otherwise, this method throws an exception.
257          * </p>
258          * <p>
259          * If you start from an input stream, an exception can't be thrown, and you can call {@link #getUnchecked()} instead.
260          * </p>
261          * <p>
262          * This builder uses the following aspects:
263          * </p>
264          * <ul>
265          * <li>{@link #getInputStream()} gets the target aspect.</li>
266          * <li>{@link #getAfterRead()}</li>
267          * <li>{@code #getCount()}</li>
268          * <li>{@code #getMaxCount()}</li>
269          * <li>{@code #getOnMaxCount()}</li>
270          * <li>{@code #isPropagateClose()}</li>
271          * </ul>
272          *
273          * @return a new instance.
274          * @throws IllegalStateException         if the {@code origin} is {@code null}.
275          * @throws UnsupportedOperationException if the origin cannot be converted to an {@link InputStream}.
276          * @throws IOException                   if an I/O error occurs converting to an {@link InputStream} using {@link #getInputStream()}.
277          * @see #getInputStream()
278          * @see #getUnchecked()
279          */
280         @Override
281         public BoundedInputStream get() throws IOException {
282             return new BoundedInputStream(this);
283         }
284 
285     }
286 
287     /**
288      * Constructs a new {@link AbstractBuilder}.
289      *
290      * @return a new {@link AbstractBuilder}.
291      * @since 2.16.0
292      */
293     public static Builder builder() {
294         return new Builder();
295     }
296 
297     /** The current count of bytes counted. */
298     private long count;
299 
300     /** The current mark. */
301     private long mark;
302 
303     /** The maximum count of bytes to read. */
304     private final long maxCount;
305 
306     private final IOBiConsumer<Long, Long> onMaxCount;
307 
308     /**
309      * Flag if close should be propagated.
310      *
311      * TODO Make final in 3.0.
312      */
313     private boolean propagateClose = true;
314 
315     private BoundedInputStream(final Builder builder) throws IOException {
316         super(builder);
317         this.count = builder.getCount();
318         this.maxCount = builder.getMaxCount();
319         this.propagateClose = builder.isPropagateClose();
320         this.onMaxCount = builder.getOnMaxCount();
321     }
322 
323     /**
324      * Constructs a new {@link BoundedInputStream} that wraps the given input stream and is <em>unbounded</em>.
325      * <p>
326      * To build an instance: Use the {@link #builder()} to access all features.
327      * </p>
328      *
329      * @param in The wrapped input stream.
330      * @deprecated Use {@link AbstractBuilder#get()}.
331      */
332     @Deprecated
333     public BoundedInputStream(final InputStream in) {
334         this(in, EOF);
335     }
336 
337     private BoundedInputStream(final InputStream inputStream, final Builder builder) {
338         super(inputStream, builder);
339         this.count = builder.getCount();
340         this.maxCount = builder.getMaxCount();
341         this.propagateClose = builder.isPropagateClose();
342         this.onMaxCount = builder.getOnMaxCount();
343     }
344 
345     /**
346      * Constructs a new {@link BoundedInputStream} that wraps the given input stream and limits it to a certain size.
347      *
348      * @param inputStream The wrapped input stream.
349      * @param maxCount    The maximum number of bytes to return, negative means unbound.
350      * @deprecated Use {@link AbstractBuilder#get()}.
351      */
352     @Deprecated
353     public BoundedInputStream(final InputStream inputStream, final long maxCount) {
354         // Some badly designed methods, for example the Servlet API, overload length
355         // such that "-1" means stream finished
356         this(inputStream, builder().setMaxCount(maxCount));
357     }
358 
359     /**
360      * Adds the number of read bytes to the count.
361      *
362      * @param n number of bytes read, or -1 if no more bytes are available.
363      * @throws IOException Not thrown here but subclasses may throw.
364      * @since 2.0
365      */
366     @Override
367     protected synchronized void afterRead(final int n) throws IOException {
368         if (n != EOF) {
369             count += n;
370         }
371         super.afterRead(n);
372     }
373 
374     @Override
375     public int available() throws IOException {
376         // Safe cast: value is between 0 and Integer.MAX_VALUE
377         final int remaining = (int) Math.min(getRemaining(), Integer.MAX_VALUE);
378         return Math.min(super.available(), remaining);
379     }
380 
381     /**
382      * Invokes the delegate's {@link InputStream#close()} method if {@link #isPropagateClose()} is {@code true}.
383      *
384      * @throws IOException if an I/O error occurs.
385      */
386     @Override
387     public void close() throws IOException {
388         if (propagateClose) {
389             super.close();
390         }
391     }
392 
393     /**
394      * Gets the count of bytes read.
395      *
396      * @return The count of bytes read.
397      * @since 2.12.0
398      */
399     public synchronized long getCount() {
400         return count;
401     }
402 
403     /**
404      * Gets the maximum number of bytes to read.
405      *
406      * @return The maximum number of bytes to read, or {@value IOUtils#EOF} if unbounded.
407      * @since 2.16.0
408      */
409     public long getMaxCount() {
410         return maxCount;
411     }
412 
413     /**
414      * Gets the maximum count of bytes to read.
415      *
416      * @return The maximum count of bytes to read.
417      * @since 2.12.0
418      * @deprecated Use {@link #getMaxCount()}.
419      */
420     @Deprecated
421     public long getMaxLength() {
422         return maxCount;
423     }
424 
425     /**
426      * Gets the number of bytes remaining to read before the maximum is reached.
427      *
428      * <p>
429      * This method does <strong>not</strong> report the bytes available in the
430      * underlying stream; it only reflects the remaining allowance imposed by this
431      * {@code BoundedInputStream}.
432      * </p>
433      *
434      * @return The number of bytes remaining to read before the maximum is reached,
435      *         or {@link Long#MAX_VALUE} if no bound is set.
436      * @since 2.16.0
437      */
438     public long getRemaining() {
439         final long maxCount = getMaxCount();
440         return maxCount == EOF ? Long.MAX_VALUE : Math.max(0, maxCount - getCount());
441     }
442 
443     private boolean isMaxCount() {
444         return maxCount >= 0 && getCount() >= maxCount;
445     }
446 
447     /**
448      * Tests whether the {@link #close()} method should propagate to the underling {@link InputStream}.
449      *
450      * @return {@code true} if calling {@link #close()} propagates to the {@code close()} method of the underlying stream or {@code false} if it does not.
451      */
452     public boolean isPropagateClose() {
453         return propagateClose;
454     }
455 
456     /**
457      * Invokes the delegate's {@link InputStream#mark(int)} method.
458      *
459      * @param readLimit read ahead limit.
460      */
461     @Override
462     public synchronized void mark(final int readLimit) {
463         in.mark(readLimit);
464         mark = count;
465     }
466 
467     /**
468      * Invokes the delegate's {@link InputStream#markSupported()} method.
469      *
470      * @return true if mark is supported, otherwise false.
471      */
472     @Override
473     public boolean markSupported() {
474         return in.markSupported();
475     }
476 
477     /**
478      * A caller has caused a request that would cross the {@code maxLength} boundary.
479      * <p>
480      * Delegates to the consumer set in {@link Builder#setOnMaxCount(IOBiConsumer)}.
481      * </p>
482      *
483      * @param max The maximum count of bytes to read.
484      * @param count     The count of bytes read.
485      * @throws IOException Subclasses may throw.
486      * @since 2.12.0
487      */
488     @SuppressWarnings("unused")
489     // TODO Rename to onMaxCount for 3.0
490     protected void onMaxLength(final long max, final long count) throws IOException {
491         onMaxCount.accept(max, count);
492     }
493 
494     /**
495      * Invokes the delegate's {@link InputStream#read()} method if the current position is less than the limit.
496      *
497      * @return the byte read or -1 if the end of stream or the limit has been reached.
498      * @throws IOException if an I/O error occurs.
499      */
500     @Override
501     public int read() throws IOException {
502         if (isMaxCount()) {
503             onMaxLength(maxCount, getCount());
504             return EOF;
505         }
506         return super.read();
507     }
508 
509     /**
510      * Invokes the delegate's {@link InputStream#read(byte[])} method.
511      *
512      * @param b the buffer to read the bytes into.
513      * @return the number of bytes read or -1 if the end of stream or the limit has been reached.
514      * @throws IOException if an I/O error occurs.
515      */
516     @Override
517     public int read(final byte[] b) throws IOException {
518         return read(b, 0, b.length);
519     }
520 
521     /**
522      * Invokes the delegate's {@link InputStream#read(byte[], int, int)} method.
523      *
524      * @param b   the buffer to read the bytes into.
525      * @param off The start offset.
526      * @param len The number of bytes to read.
527      * @return the number of bytes read or -1 if the end of stream or the limit has been reached.
528      * @throws IOException if an I/O error occurs.
529      */
530     @Override
531     public int read(final byte[] b, final int off, final int len) throws IOException {
532         if (isMaxCount()) {
533             onMaxLength(maxCount, getCount());
534             return EOF;
535         }
536         return super.read(b, off, (int) toReadLen(len));
537     }
538 
539     /**
540      * Invokes the delegate's {@link InputStream#reset()} method.
541      *
542      * @throws IOException if an I/O error occurs.
543      */
544     @Override
545     public synchronized void reset() throws IOException {
546         in.reset();
547         count = mark;
548     }
549 
550     /**
551      * Sets whether the {@link #close()} method should propagate to the underling {@link InputStream}.
552      *
553      * @param propagateClose {@code true} if calling {@link #close()} propagates to the {@code close()} method of the underlying stream or {@code false} if it
554      *                       does not.
555      * @deprecated Use {@link AbstractBuilder#setPropagateClose(boolean)}.
556      */
557     @Deprecated
558     public synchronized void setPropagateClose(final boolean propagateClose) {
559         this.propagateClose = propagateClose;
560     }
561 
562     /**
563      * Invokes the delegate's {@link InputStream#skip(long)} method.
564      *
565      * @param n the number of bytes to skip.
566      * @return the actual number of bytes skipped.
567      * @throws IOException if an I/O error occurs.
568      */
569     @Override
570     public synchronized long skip(final long n) throws IOException {
571         final long skip = super.skip(toReadLen(n));
572         count += skip;
573         return skip;
574     }
575 
576     /**
577      * Converts a request to read {@code len} bytes to a lower count if reading would put us over the limit.
578      * <p>
579      * If a {@code maxCount} is not set, then return max{@code maxCount}.
580      * </p>
581      *
582      * @param len The requested byte count.
583      * @return How many bytes to actually attempt to read.
584      */
585     private long toReadLen(final long len) {
586         return maxCount >= 0 ? Math.min(len, maxCount - getCount()) : len;
587     }
588 
589     /**
590      * Invokes the delegate's {@link InputStream#toString()} method.
591      *
592      * @return the delegate's {@link InputStream#toString()}.
593      */
594     @Override
595     public String toString() {
596         return in.toString();
597     }
598 }