View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *      https://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.commons.io.input;
18  
19  import static org.apache.commons.io.IOUtils.EOF;
20  
21  import java.io.IOException;
22  import java.io.InputStream;
23  
24  import org.apache.commons.io.IOUtils;
25  import org.apache.commons.io.function.IOBiConsumer;
26  
27  //@formatter:off
28  /**
29   * Reads bytes up to a maximum count and stops once reached.
30   * <p>
31   * To build an instance: Use the {@link #builder()} to access all features.
32   * </p>
33   * <p>
34   * By default, a {@link BoundedInputStream} is <em>unbound</em>; so make sure to call {@link AbstractBuilder#setMaxCount(long)}.
35   * </p>
36   * <p>
37   * You can find out how many bytes this stream has seen so far by calling {@link BoundedInputStream#getCount()}. This value reflects bytes read and skipped.
38   * </p>
39   * <h2>Using a ServletInputStream</h2>
40   * <p>
41   * A {@code ServletInputStream} can block if you try to read content that isn't there
42   * because it doesn't know whether the content hasn't arrived yet or whether the content has finished. Initialize an {@link BoundedInputStream} with the
43   * {@code Content-Length} sent in the {@code ServletInputStream}'s header, this stop it from blocking, providing it's been sent with a correct content
44   * length in the first place.
45   * </p>
46   * <h2>Using NIO</h2>
47   * <pre>{@code
48   * BoundedInputStream s = BoundedInputStream.builder()
49   *   .setPath(Paths.get("MyFile.xml"))
50   *   .setMaxCount(1024)
51   *   .setPropagateClose(false)
52   *   .get();
53   * }
54   * </pre>
55   * <h2>Using IO</h2>
56   * <pre>{@code
57   * BoundedInputStream s = BoundedInputStream.builder()
58   *   .setFile(new File("MyFile.xml"))
59   *   .setMaxCount(1024)
60   *   .setPropagateClose(false)
61   *   .get();
62   * }
63   * </pre>
64   * <h2>Counting Bytes</h2>
65   * <p>You can set the running count when building, which is most useful when starting from another stream:
66   * <pre>{@code
67   * InputStream in = ...;
68   * BoundedInputStream s = BoundedInputStream.builder()
69   *   .setInputStream(in)
70   *   .setCount(12)
71   *   .setMaxCount(1024)
72   *   .setPropagateClose(false)
73   *   .get();
74   * }
75   * </pre>
76   * <h2>Listening for the maximum count reached</h2>
77   * <pre>{@code
78   * BoundedInputStream s = BoundedInputStream.builder()
79   *   .setPath(Paths.get("MyFile.xml"))
80   *   .setMaxCount(1024)
81   *   .setOnMaxCount((max, count) -> System.out.printf("Maximum count %,d reached with a last read count of %,d%n", max, count))
82   *   .get();
83   * }
84   * </pre>
85   *
86   * @see Builder
87   * @since 2.0
88   */
89  //@formatter:on
90  public class BoundedInputStream extends ProxyInputStream {
91  
92      /**
93       * For subclassing builders from {@link BoundedInputStream} subclassses.
94       *
95       * @param <T> The subclass.
96       */
97      abstract static class AbstractBuilder<T extends AbstractBuilder<T>> extends ProxyInputStream.AbstractBuilder<BoundedInputStream, T> {
98  
99          /** The current count of bytes counted. */
100         private long count;
101 
102         /** The maximum count of bytes to read. */
103         private long maxCount = EOF;
104 
105         private IOBiConsumer<Long, Long> onMaxCount = IOBiConsumer.noop();
106 
107         /** Flag if {@link #close()} should be propagated, {@code true} by default. */
108         private boolean propagateClose = true;
109 
110         long getCount() {
111             return count;
112         }
113 
114         long getMaxCount() {
115             return maxCount;
116         }
117 
118         IOBiConsumer<Long, Long> getOnMaxCount() {
119             return onMaxCount;
120         }
121 
122         boolean isPropagateClose() {
123             return propagateClose;
124         }
125 
126         /**
127          * Sets the current number of bytes counted.
128          * <p>
129          * Useful when building from another stream to carry forward a read count.
130          * </p>
131          * <p>
132          * Default is {@code 0}, negative means 0.
133          * </p>
134          *
135          * @param count The current number of bytes counted.
136          * @return {@code this} instance.
137          */
138         public T setCount(final long count) {
139             this.count = Math.max(0, count);
140             return asThis();
141         }
142 
143         /**
144          * Sets the maximum number of bytes to return.
145          * <p>
146          * Default is {@value IOUtils#EOF}, negative means unbound.
147          * </p>
148          *
149          * @param maxCount The maximum number of bytes to return, negative means unbound.
150          * @return {@code this} instance.
151          */
152         public T setMaxCount(final long maxCount) {
153             this.maxCount = Math.max(EOF, maxCount);
154             return asThis();
155         }
156 
157         /**
158          * Sets the default {@link BoundedInputStream#onMaxLength(long, long)} behavior, {@code null} resets to a NOOP.
159          * <p>
160          * The first Long is the number of bytes remaining to read before the maximum is reached count of bytes to read. The second Long is the count of bytes
161          * read.
162          * </p>
163          * <p>
164          * This does <em>not</em> override a {@code BoundedInputStream} subclass' implementation of the {@link BoundedInputStream#onMaxLength(long, long)}
165          * method.
166          * </p>
167          *
168          * @param onMaxCount the {@link ProxyInputStream#afterRead(int)} behavior.
169          * @return {@code this} instance.
170          * @since 2.18.0
171          */
172         public T setOnMaxCount(final IOBiConsumer<Long, Long> onMaxCount) {
173             this.onMaxCount = onMaxCount != null ? onMaxCount : IOBiConsumer.noop();
174             return asThis();
175         }
176 
177         /**
178          * Sets whether the {@link #close()} method should propagate to the underling {@link InputStream}.
179          * <p>
180          * Default is {@code true}.
181          * </p>
182          *
183          * @param propagateClose {@code true} if calling {@link #close()} propagates to the {@code close()} method of the underlying stream or {@code false} if
184          *                       it does not.
185          * @return {@code this} instance.
186          */
187         public T setPropagateClose(final boolean propagateClose) {
188             this.propagateClose = propagateClose;
189             return asThis();
190         }
191 
192     }
193 
194     //@formatter:off
195     /**
196      * Builds a new {@link BoundedInputStream}.
197      * <p>
198      * By default, a {@link BoundedInputStream} is <em>unbound</em>; so make sure to call {@link AbstractBuilder#setMaxCount(long)}.
199      * </p>
200      * <p>
201      * You can find out how many bytes this stream has seen so far by calling {@link BoundedInputStream#getCount()}. This value reflects bytes read and skipped.
202      * </p>
203      * <h2>Using a ServletInputStream</h2>
204      * <p>
205      * A {@code ServletInputStream} can block if you try to read content that isn't there
206      * because it doesn't know whether the content hasn't arrived yet or whether the content has finished. Initialize an {@link BoundedInputStream} with the
207      * {@code Content-Length} sent in the {@code ServletInputStream}'s header, this stop it from blocking, providing it's been sent with a correct content
208      * length in the first place.
209      * </p>
210      * <h2>Using NIO</h2>
211      * <pre>{@code
212      * BoundedInputStream s = BoundedInputStream.builder()
213      *   .setPath(Paths.get("MyFile.xml"))
214      *   .setMaxCount(1024)
215      *   .setPropagateClose(false)
216      *   .get();
217      * }
218      * </pre>
219      * <h2>Using IO</h2>
220      * <pre>{@code
221      * BoundedInputStream s = BoundedInputStream.builder()
222      *   .setFile(new File("MyFile.xml"))
223      *   .setMaxCount(1024)
224      *   .setPropagateClose(false)
225      *   .get();
226      * }
227      * </pre>
228      * <h2>Counting Bytes</h2>
229      * <p>You can set the running count when building, which is most useful when starting from another stream:
230      * <pre>{@code
231      * InputStream in = ...;
232      * BoundedInputStream s = BoundedInputStream.builder()
233      *   .setInputStream(in)
234      *   .setCount(12)
235      *   .setMaxCount(1024)
236      *   .setPropagateClose(false)
237      *   .get();
238      * }
239      * </pre>
240      *
241      * @see #get()
242      * @since 2.16.0
243      */
244     //@formatter:on
245     public static class Builder extends AbstractBuilder<Builder> {
246 
247         /**
248          * Constructs a new builder of {@link BoundedInputStream}.
249          */
250         public Builder() {
251             // empty
252         }
253 
254         /**
255          * Builds a new {@link BoundedInputStream}.
256          * <p>
257          * You must set an aspect that supports {@link #getInputStream()}, otherwise, this method throws an exception.
258          * </p>
259          * <p>
260          * If you start from an input stream, an exception can't be thrown, and you can call {@link #getUnchecked()} instead.
261          * </p>
262          * <p>
263          * This builder uses the following aspects:
264          * </p>
265          * <ul>
266          * <li>{@link #getInputStream()} gets the target aspect.</li>
267          * <li>{@link #getAfterRead()}</li>
268          * <li>{@code #getCount()}</li>
269          * <li>{@code #getMaxCount()}</li>
270          * <li>{@code #getOnMaxCount()}</li>
271          * <li>{@code #isPropagateClose()}</li>
272          * </ul>
273          *
274          * @return a new instance.
275          * @throws IllegalStateException         if the {@code origin} is {@code null}.
276          * @throws UnsupportedOperationException if the origin cannot be converted to an {@link InputStream}.
277          * @throws IOException                   if an I/O error occurs converting to an {@link InputStream} using {@link #getInputStream()}.
278          * @see #getInputStream()
279          * @see #getUnchecked()
280          */
281         @Override
282         public BoundedInputStream get() throws IOException {
283             return new BoundedInputStream(this);
284         }
285 
286     }
287 
288     /**
289      * Constructs a new {@link AbstractBuilder}.
290      *
291      * @return a new {@link AbstractBuilder}.
292      * @since 2.16.0
293      */
294     public static Builder builder() {
295         return new Builder();
296     }
297 
298     /** The current count of bytes counted. */
299     private long count;
300 
301     /** The current mark. */
302     private long mark;
303 
304     /** The maximum count of bytes to read. */
305     private final long maxCount;
306 
307     private final IOBiConsumer<Long, Long> onMaxCount;
308 
309     /**
310      * Flag if close should be propagated.
311      *
312      * TODO Make final in 3.0.
313      */
314     private boolean propagateClose = true;
315 
316     private BoundedInputStream(final Builder builder) throws IOException {
317         super(builder);
318         this.count = builder.getCount();
319         this.maxCount = builder.getMaxCount();
320         this.propagateClose = builder.isPropagateClose();
321         this.onMaxCount = builder.getOnMaxCount();
322     }
323 
324     /**
325      * Constructs a new {@link BoundedInputStream} that wraps the given input stream and is <em>unbounded</em>.
326      * <p>
327      * To build an instance: Use the {@link #builder()} to access all features.
328      * </p>
329      *
330      * @param in The wrapped input stream.
331      * @deprecated Use {@link AbstractBuilder#get()}.
332      */
333     @Deprecated
334     public BoundedInputStream(final InputStream in) {
335         this(in, EOF);
336     }
337 
338     private BoundedInputStream(final InputStream inputStream, final Builder builder) {
339         super(inputStream, builder);
340         this.count = builder.getCount();
341         this.maxCount = builder.getMaxCount();
342         this.propagateClose = builder.isPropagateClose();
343         this.onMaxCount = builder.getOnMaxCount();
344     }
345 
346     /**
347      * Constructs a new {@link BoundedInputStream} that wraps the given input stream and limits it to a certain size.
348      *
349      * @param inputStream The wrapped input stream.
350      * @param maxCount    The maximum number of bytes to return, negative means unbound.
351      * @deprecated Use {@link AbstractBuilder#get()}.
352      */
353     @Deprecated
354     public BoundedInputStream(final InputStream inputStream, final long maxCount) {
355         // Some badly designed methods, for example the Servlet API, overload length
356         // such that "-1" means stream finished
357         this(inputStream, builder().setMaxCount(maxCount));
358     }
359 
360     /**
361      * Adds the number of read bytes to the count.
362      *
363      * @param n number of bytes read, or -1 if no more bytes are available.
364      * @throws IOException Not thrown here but subclasses may throw.
365      * @since 2.0
366      */
367     @Override
368     protected synchronized void afterRead(final int n) throws IOException {
369         if (n != EOF) {
370             count += n;
371         }
372         super.afterRead(n);
373     }
374 
375     @Override
376     public int available() throws IOException {
377         // Safe cast: value is between 0 and Integer.MAX_VALUE
378         final int remaining = (int) Math.min(getRemaining(), Integer.MAX_VALUE);
379         return Math.min(super.available(), remaining);
380     }
381 
382     /**
383      * Invokes the delegate's {@link InputStream#close()} method if {@link #isPropagateClose()} is {@code true}.
384      *
385      * @throws IOException if an I/O error occurs.
386      */
387     @Override
388     public void close() throws IOException {
389         if (propagateClose) {
390             super.close();
391         }
392     }
393 
394     /**
395      * Gets the count of bytes read.
396      *
397      * @return The count of bytes read.
398      * @since 2.12.0
399      */
400     public synchronized long getCount() {
401         return count;
402     }
403 
404     /**
405      * Gets the maximum number of bytes to read.
406      *
407      * @return The maximum number of bytes to read, or {@value IOUtils#EOF} if unbounded.
408      * @since 2.16.0
409      */
410     public long getMaxCount() {
411         return maxCount;
412     }
413 
414     /**
415      * Gets the maximum count of bytes to read.
416      *
417      * @return The maximum count of bytes to read.
418      * @since 2.12.0
419      * @deprecated Use {@link #getMaxCount()}.
420      */
421     @Deprecated
422     public long getMaxLength() {
423         return maxCount;
424     }
425 
426     /**
427      * Gets the number of bytes remaining to read before the maximum is reached.
428      *
429      * <p>
430      * This method does <strong>not</strong> report the bytes available in the
431      * underlying stream; it only reflects the remaining allowance imposed by this
432      * {@code BoundedInputStream}.
433      * </p>
434      *
435      * @return The number of bytes remaining to read before the maximum is reached,
436      *         or {@link Long#MAX_VALUE} if no bound is set.
437      * @since 2.16.0
438      */
439     public long getRemaining() {
440         final long maxCount = getMaxCount();
441         return maxCount == EOF ? Long.MAX_VALUE : Math.max(0, maxCount - getCount());
442     }
443 
444     private boolean isMaxCount() {
445         return maxCount >= 0 && getCount() >= maxCount;
446     }
447 
448     /**
449      * Tests whether the {@link #close()} method should propagate to the underling {@link InputStream}.
450      *
451      * @return {@code true} if calling {@link #close()} propagates to the {@code close()} method of the underlying stream or {@code false} if it does not.
452      */
453     public boolean isPropagateClose() {
454         return propagateClose;
455     }
456 
457     /**
458      * Invokes the delegate's {@link InputStream#mark(int)} method.
459      *
460      * @param readLimit read ahead limit.
461      */
462     @Override
463     public synchronized void mark(final int readLimit) {
464         in.mark(readLimit);
465         mark = count;
466     }
467 
468     /**
469      * Invokes the delegate's {@link InputStream#markSupported()} method.
470      *
471      * @return true if mark is supported, otherwise false.
472      */
473     @Override
474     public boolean markSupported() {
475         return in.markSupported();
476     }
477 
478     /**
479      * A caller has caused a request that would cross the {@code maxLength} boundary.
480      * <p>
481      * Delegates to the consumer set in {@link Builder#setOnMaxCount(IOBiConsumer)}.
482      * </p>
483      *
484      * @param max The maximum count of bytes to read.
485      * @param count     The count of bytes read.
486      * @throws IOException Subclasses may throw.
487      * @since 2.12.0
488      */
489     @SuppressWarnings("unused")
490     // TODO Rename to onMaxCount for 3.0
491     protected void onMaxLength(final long max, final long count) throws IOException {
492         onMaxCount.accept(max, count);
493     }
494 
495     /**
496      * Invokes the delegate's {@link InputStream#read()} method if the current position is less than the limit.
497      *
498      * @return the byte read or -1 if the end of stream or the limit has been reached.
499      * @throws IOException if an I/O error occurs.
500      */
501     @Override
502     public int read() throws IOException {
503         if (isMaxCount()) {
504             onMaxLength(maxCount, getCount());
505             return EOF;
506         }
507         return super.read();
508     }
509 
510     /**
511      * Invokes the delegate's {@link InputStream#read(byte[])} method.
512      *
513      * @param b the buffer to read the bytes into.
514      * @return the number of bytes read or -1 if the end of stream or the limit has been reached.
515      * @throws IOException if an I/O error occurs.
516      */
517     @Override
518     public int read(final byte[] b) throws IOException {
519         return read(b, 0, b.length);
520     }
521 
522     /**
523      * Invokes the delegate's {@link InputStream#read(byte[], int, int)} method.
524      *
525      * @param b   the buffer to read the bytes into.
526      * @param off The start offset.
527      * @param len The number of bytes to read.
528      * @return the number of bytes read or -1 if the end of stream or the limit has been reached.
529      * @throws IOException if an I/O error occurs.
530      */
531     @Override
532     public int read(final byte[] b, final int off, final int len) throws IOException {
533         if (isMaxCount()) {
534             onMaxLength(maxCount, getCount());
535             return EOF;
536         }
537         return super.read(b, off, (int) toReadLen(len));
538     }
539 
540     /**
541      * Invokes the delegate's {@link InputStream#reset()} method.
542      *
543      * @throws IOException if an I/O error occurs.
544      */
545     @Override
546     public synchronized void reset() throws IOException {
547         in.reset();
548         count = mark;
549     }
550 
551     /**
552      * Sets whether the {@link #close()} method should propagate to the underling {@link InputStream}.
553      *
554      * @param propagateClose {@code true} if calling {@link #close()} propagates to the {@code close()} method of the underlying stream or {@code false} if it
555      *                       does not.
556      * @deprecated Use {@link AbstractBuilder#setPropagateClose(boolean)}.
557      */
558     @Deprecated
559     public synchronized void setPropagateClose(final boolean propagateClose) {
560         this.propagateClose = propagateClose;
561     }
562 
563     /**
564      * Invokes the delegate's {@link InputStream#skip(long)} method.
565      *
566      * @param n the number of bytes to skip.
567      * @return the actual number of bytes skipped.
568      * @throws IOException if an I/O error occurs.
569      */
570     @Override
571     public synchronized long skip(final long n) throws IOException {
572         final long skip = super.skip(toReadLen(n));
573         count += skip;
574         return skip;
575     }
576 
577     /**
578      * Converts a request to read {@code len} bytes to a lower count if reading would put us over the limit.
579      * <p>
580      * If a {@code maxCount} is not set, then return max{@code maxCount}.
581      * </p>
582      *
583      * @param len The requested byte count.
584      * @return How many bytes to actually attempt to read.
585      */
586     private long toReadLen(final long len) {
587         return maxCount >= 0 ? Math.min(len, maxCount - getCount()) : len;
588     }
589 
590     /**
591      * Invokes the delegate's {@link InputStream#toString()} method.
592      *
593      * @return the delegate's {@link InputStream#toString()}.
594      */
595     @Override
596     public String toString() {
597         return in.toString();
598     }
599 }