1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17 package org.apache.commons.io.input;
18
19 import static org.apache.commons.io.IOUtils.EOF;
20
21 import java.io.IOException;
22 import java.io.InputStream;
23
24 import org.apache.commons.io.IOUtils;
25 import org.apache.commons.io.function.IOBiConsumer;
26
27 //@formatter:off
28 /**
29 * Reads bytes up to a maximum count and stops once reached.
30 * <p>
31 * To build an instance: Use the {@link #builder()} to access all features.
32 * </p>
33 * <p>
34 * By default, a {@link BoundedInputStream} is <em>unbound</em>; so make sure to call {@link AbstractBuilder#setMaxCount(long)}.
35 * </p>
36 * <p>
37 * You can find out how many bytes this stream has seen so far by calling {@link BoundedInputStream#getCount()}. This value reflects bytes read and skipped.
38 * </p>
39 * <h2>Using a ServletInputStream</h2>
40 * <p>
41 * A {@code ServletInputStream} can block if you try to read content that isn't there
42 * because it doesn't know whether the content hasn't arrived yet or whether the content has finished. Initialize an {@link BoundedInputStream} with the
43 * {@code Content-Length} sent in the {@code ServletInputStream}'s header, this stop it from blocking, providing it's been sent with a correct content
44 * length in the first place.
45 * </p>
46 * <h2>Using NIO</h2>
47 * <pre>{@code
48 * BoundedInputStream s = BoundedInputStream.builder()
49 * .setPath(Paths.get("MyFile.xml"))
50 * .setMaxCount(1024)
51 * .setPropagateClose(false)
52 * .get();
53 * }
54 * </pre>
55 * <h2>Using IO</h2>
56 * <pre>{@code
57 * BoundedInputStream s = BoundedInputStream.builder()
58 * .setFile(new File("MyFile.xml"))
59 * .setMaxCount(1024)
60 * .setPropagateClose(false)
61 * .get();
62 * }
63 * </pre>
64 * <h2>Counting Bytes</h2>
65 * <p>You can set the running count when building, which is most useful when starting from another stream:
66 * <pre>{@code
67 * InputStream in = ...;
68 * BoundedInputStream s = BoundedInputStream.builder()
69 * .setInputStream(in)
70 * .setCount(12)
71 * .setMaxCount(1024)
72 * .setPropagateClose(false)
73 * .get();
74 * }
75 * </pre>
76 * <h2>Listening for the maximum count reached</h2>
77 * <pre>{@code
78 * BoundedInputStream s = BoundedInputStream.builder()
79 * .setPath(Paths.get("MyFile.xml"))
80 * .setMaxCount(1024)
81 * .setOnMaxCount((max, count) -> System.out.printf("Maximum count %,d reached with a last read count of %,d%n", max, count))
82 * .get();
83 * }
84 * </pre>
85 * @see Builder
86 * @since 2.0
87 */
88 //@formatter:on
89 public class BoundedInputStream extends ProxyInputStream {
90
91 /**
92 * For subclassing builders from {@link BoundedInputStream} subclassses.
93 *
94 * @param <T> The subclass.
95 */
96 abstract static class AbstractBuilder<T extends AbstractBuilder<T>> extends ProxyInputStream.AbstractBuilder<BoundedInputStream, T> {
97
98 /** The current count of bytes counted. */
99 private long count;
100
101 /** The maximum count of bytes to read. */
102 private long maxCount = EOF;
103
104 private IOBiConsumer<Long, Long> onMaxCount = IOBiConsumer.noop();
105
106 /** Flag if {@link #close()} should be propagated, {@code true} by default. */
107 private boolean propagateClose = true;
108
109 long getCount() {
110 return count;
111 }
112
113 long getMaxCount() {
114 return maxCount;
115 }
116
117 IOBiConsumer<Long, Long> getOnMaxCount() {
118 return onMaxCount;
119 }
120
121 boolean isPropagateClose() {
122 return propagateClose;
123 }
124
125 /**
126 * Sets the current number of bytes counted.
127 * <p>
128 * Useful when building from another stream to carry forward a read count.
129 * </p>
130 * <p>
131 * Default is {@code 0}, negative means 0.
132 * </p>
133 *
134 * @param count The current number of bytes counted.
135 * @return {@code this} instance.
136 */
137 public T setCount(final long count) {
138 this.count = Math.max(0, count);
139 return asThis();
140 }
141
142 /**
143 * Sets the maximum number of bytes to return.
144 * <p>
145 * Default is {@value IOUtils#EOF}, negative means unbound.
146 * </p>
147 *
148 * @param maxCount The maximum number of bytes to return, negative means unbound.
149 * @return {@code this} instance.
150 */
151 public T setMaxCount(final long maxCount) {
152 this.maxCount = Math.max(EOF, maxCount);
153 return asThis();
154 }
155
156 /**
157 * Sets the default {@link BoundedInputStream#onMaxLength(long, long)} behavior, {@code null} resets to a NOOP.
158 * <p>
159 * The first Long is the number of bytes remaining to read before the maximum is reached count of bytes to read. The second Long is the count of bytes
160 * read.
161 * </p>
162 * <p>
163 * This does <em>not</em> override a {@code BoundedInputStream} subclass' implementation of the {@link BoundedInputStream#onMaxLength(long, long)}
164 * method.
165 * </p>
166 *
167 * @param onMaxCount the {@link ProxyInputStream#afterRead(int)} behavior.
168 * @return {@code this} instance.
169 * @since 2.18.0
170 */
171 public T setOnMaxCount(final IOBiConsumer<Long, Long> onMaxCount) {
172 this.onMaxCount = onMaxCount != null ? onMaxCount : IOBiConsumer.noop();
173 return asThis();
174 }
175
176 /**
177 * Sets whether the {@link #close()} method should propagate to the underling {@link InputStream}.
178 * <p>
179 * Default is {@code true}.
180 * </p>
181 *
182 * @param propagateClose {@code true} if calling {@link #close()} propagates to the {@code close()} method of the underlying stream or {@code false} if
183 * it does not.
184 * @return {@code this} instance.
185 */
186 public T setPropagateClose(final boolean propagateClose) {
187 this.propagateClose = propagateClose;
188 return asThis();
189 }
190
191 }
192
193 //@formatter:off
194 /**
195 * Builds a new {@link BoundedInputStream}.
196 * <p>
197 * By default, a {@link BoundedInputStream} is <em>unbound</em>; so make sure to call {@link AbstractBuilder#setMaxCount(long)}.
198 * </p>
199 * <p>
200 * You can find out how many bytes this stream has seen so far by calling {@link BoundedInputStream#getCount()}. This value reflects bytes read and skipped.
201 * </p>
202 * <h2>Using a ServletInputStream</h2>
203 * <p>
204 * A {@code ServletInputStream} can block if you try to read content that isn't there
205 * because it doesn't know whether the content hasn't arrived yet or whether the content has finished. Initialize an {@link BoundedInputStream} with the
206 * {@code Content-Length} sent in the {@code ServletInputStream}'s header, this stop it from blocking, providing it's been sent with a correct content
207 * length in the first place.
208 * </p>
209 * <h2>Using NIO</h2>
210 * <pre>{@code
211 * BoundedInputStream s = BoundedInputStream.builder()
212 * .setPath(Paths.get("MyFile.xml"))
213 * .setMaxCount(1024)
214 * .setPropagateClose(false)
215 * .get();
216 * }
217 * </pre>
218 * <h2>Using IO</h2>
219 * <pre>{@code
220 * BoundedInputStream s = BoundedInputStream.builder()
221 * .setFile(new File("MyFile.xml"))
222 * .setMaxCount(1024)
223 * .setPropagateClose(false)
224 * .get();
225 * }
226 * </pre>
227 * <h2>Counting Bytes</h2>
228 * <p>You can set the running count when building, which is most useful when starting from another stream:
229 * <pre>{@code
230 * InputStream in = ...;
231 * BoundedInputStream s = BoundedInputStream.builder()
232 * .setInputStream(in)
233 * .setCount(12)
234 * .setMaxCount(1024)
235 * .setPropagateClose(false)
236 * .get();
237 * }
238 * </pre>
239 *
240 * @see #get()
241 * @since 2.16.0
242 */
243 //@formatter:on
244 public static class Builder extends AbstractBuilder<Builder> {
245
246 /**
247 * Constructs a new builder of {@link BoundedInputStream}.
248 */
249 public Builder() {
250 // empty
251 }
252
253 /**
254 * Builds a new {@link BoundedInputStream}.
255 * <p>
256 * You must set an aspect that supports {@link #getInputStream()}, otherwise, this method throws an exception.
257 * </p>
258 * <p>
259 * If you start from an input stream, an exception can't be thrown, and you can call {@link #getUnchecked()} instead.
260 * </p>
261 * <p>
262 * This builder uses the following aspects:
263 * </p>
264 * <ul>
265 * <li>{@link #getInputStream()} gets the target aspect.</li>
266 * <li>{@link #getAfterRead()}</li>
267 * <li>{@code #getCount()}</li>
268 * <li>{@code #getMaxCount()}</li>
269 * <li>{@code #getOnMaxCount()}</li>
270 * <li>{@code #isPropagateClose()}</li>
271 * </ul>
272 *
273 * @return a new instance.
274 * @throws IllegalStateException if the {@code origin} is {@code null}.
275 * @throws UnsupportedOperationException if the origin cannot be converted to an {@link InputStream}.
276 * @throws IOException if an I/O error occurs converting to an {@link InputStream} using {@link #getInputStream()}.
277 * @see #getInputStream()
278 * @see #getUnchecked()
279 */
280 @Override
281 public BoundedInputStream get() throws IOException {
282 return new BoundedInputStream(this);
283 }
284
285 }
286
287 /**
288 * Constructs a new {@link AbstractBuilder}.
289 *
290 * @return a new {@link AbstractBuilder}.
291 * @since 2.16.0
292 */
293 public static Builder builder() {
294 return new Builder();
295 }
296
297 /** The current count of bytes counted. */
298 private long count;
299
300 /** The current mark. */
301 private long mark;
302
303 /** The maximum count of bytes to read. */
304 private final long maxCount;
305
306 private final IOBiConsumer<Long, Long> onMaxCount;
307
308 /**
309 * Flag if close should be propagated.
310 *
311 * TODO Make final in 3.0.
312 */
313 private boolean propagateClose = true;
314
315 private BoundedInputStream(final Builder builder) throws IOException {
316 super(builder);
317 this.count = builder.getCount();
318 this.maxCount = builder.getMaxCount();
319 this.propagateClose = builder.isPropagateClose();
320 this.onMaxCount = builder.getOnMaxCount();
321 }
322
323 /**
324 * Constructs a new {@link BoundedInputStream} that wraps the given input stream and is <em>unbounded</em>.
325 * <p>
326 * To build an instance: Use the {@link #builder()} to access all features.
327 * </p>
328 *
329 * @param in The wrapped input stream.
330 * @deprecated Use {@link AbstractBuilder#get()}.
331 */
332 @Deprecated
333 public BoundedInputStream(final InputStream in) {
334 this(in, EOF);
335 }
336
337 private BoundedInputStream(final InputStream inputStream, final Builder builder) {
338 super(inputStream, builder);
339 this.count = builder.getCount();
340 this.maxCount = builder.getMaxCount();
341 this.propagateClose = builder.isPropagateClose();
342 this.onMaxCount = builder.getOnMaxCount();
343 }
344
345 /**
346 * Constructs a new {@link BoundedInputStream} that wraps the given input stream and limits it to a certain size.
347 *
348 * @param inputStream The wrapped input stream.
349 * @param maxCount The maximum number of bytes to return, negative means unbound.
350 * @deprecated Use {@link AbstractBuilder#get()}.
351 */
352 @Deprecated
353 public BoundedInputStream(final InputStream inputStream, final long maxCount) {
354 // Some badly designed methods, for example the Servlet API, overload length
355 // such that "-1" means stream finished
356 this(inputStream, builder().setMaxCount(maxCount));
357 }
358
359 /**
360 * Adds the number of read bytes to the count.
361 *
362 * @param n number of bytes read, or -1 if no more bytes are available.
363 * @throws IOException Not thrown here but subclasses may throw.
364 * @since 2.0
365 */
366 @Override
367 protected synchronized void afterRead(final int n) throws IOException {
368 if (n != EOF) {
369 count += n;
370 }
371 super.afterRead(n);
372 }
373
374 @Override
375 public int available() throws IOException {
376 // Safe cast: value is between 0 and Integer.MAX_VALUE
377 final int remaining = (int) Math.min(getRemaining(), Integer.MAX_VALUE);
378 return Math.min(super.available(), remaining);
379 }
380
381 /**
382 * Invokes the delegate's {@link InputStream#close()} method if {@link #isPropagateClose()} is {@code true}.
383 *
384 * @throws IOException if an I/O error occurs.
385 */
386 @Override
387 public void close() throws IOException {
388 if (propagateClose) {
389 super.close();
390 }
391 }
392
393 /**
394 * Gets the count of bytes read.
395 *
396 * @return The count of bytes read.
397 * @since 2.12.0
398 */
399 public synchronized long getCount() {
400 return count;
401 }
402
403 /**
404 * Gets the maximum number of bytes to read.
405 *
406 * @return The maximum number of bytes to read, or {@value IOUtils#EOF} if unbounded.
407 * @since 2.16.0
408 */
409 public long getMaxCount() {
410 return maxCount;
411 }
412
413 /**
414 * Gets the maximum count of bytes to read.
415 *
416 * @return The maximum count of bytes to read.
417 * @since 2.12.0
418 * @deprecated Use {@link #getMaxCount()}.
419 */
420 @Deprecated
421 public long getMaxLength() {
422 return maxCount;
423 }
424
425 /**
426 * Gets the number of bytes remaining to read before the maximum is reached.
427 *
428 * <p>
429 * This method does <strong>not</strong> report the bytes available in the
430 * underlying stream; it only reflects the remaining allowance imposed by this
431 * {@code BoundedInputStream}.
432 * </p>
433 *
434 * @return The number of bytes remaining to read before the maximum is reached,
435 * or {@link Long#MAX_VALUE} if no bound is set.
436 * @since 2.16.0
437 */
438 public long getRemaining() {
439 final long maxCount = getMaxCount();
440 return maxCount == EOF ? Long.MAX_VALUE : Math.max(0, maxCount - getCount());
441 }
442
443 private boolean isMaxCount() {
444 return maxCount >= 0 && getCount() >= maxCount;
445 }
446
447 /**
448 * Tests whether the {@link #close()} method should propagate to the underling {@link InputStream}.
449 *
450 * @return {@code true} if calling {@link #close()} propagates to the {@code close()} method of the underlying stream or {@code false} if it does not.
451 */
452 public boolean isPropagateClose() {
453 return propagateClose;
454 }
455
456 /**
457 * Invokes the delegate's {@link InputStream#mark(int)} method.
458 *
459 * @param readLimit read ahead limit.
460 */
461 @Override
462 public synchronized void mark(final int readLimit) {
463 in.mark(readLimit);
464 mark = count;
465 }
466
467 /**
468 * Invokes the delegate's {@link InputStream#markSupported()} method.
469 *
470 * @return true if mark is supported, otherwise false.
471 */
472 @Override
473 public boolean markSupported() {
474 return in.markSupported();
475 }
476
477 /**
478 * A caller has caused a request that would cross the {@code maxLength} boundary.
479 * <p>
480 * Delegates to the consumer set in {@link Builder#setOnMaxCount(IOBiConsumer)}.
481 * </p>
482 *
483 * @param max The maximum count of bytes to read.
484 * @param count The count of bytes read.
485 * @throws IOException Subclasses may throw.
486 * @since 2.12.0
487 */
488 @SuppressWarnings("unused")
489 // TODO Rename to onMaxCount for 3.0
490 protected void onMaxLength(final long max, final long count) throws IOException {
491 onMaxCount.accept(max, count);
492 }
493
494 /**
495 * Invokes the delegate's {@link InputStream#read()} method if the current position is less than the limit.
496 *
497 * @return the byte read or -1 if the end of stream or the limit has been reached.
498 * @throws IOException if an I/O error occurs.
499 */
500 @Override
501 public int read() throws IOException {
502 if (isMaxCount()) {
503 onMaxLength(maxCount, getCount());
504 return EOF;
505 }
506 return super.read();
507 }
508
509 /**
510 * Invokes the delegate's {@link InputStream#read(byte[])} method.
511 *
512 * @param b the buffer to read the bytes into.
513 * @return the number of bytes read or -1 if the end of stream or the limit has been reached.
514 * @throws IOException if an I/O error occurs.
515 */
516 @Override
517 public int read(final byte[] b) throws IOException {
518 return read(b, 0, b.length);
519 }
520
521 /**
522 * Invokes the delegate's {@link InputStream#read(byte[], int, int)} method.
523 *
524 * @param b the buffer to read the bytes into.
525 * @param off The start offset.
526 * @param len The number of bytes to read.
527 * @return the number of bytes read or -1 if the end of stream or the limit has been reached.
528 * @throws IOException if an I/O error occurs.
529 */
530 @Override
531 public int read(final byte[] b, final int off, final int len) throws IOException {
532 if (isMaxCount()) {
533 onMaxLength(maxCount, getCount());
534 return EOF;
535 }
536 return super.read(b, off, (int) toReadLen(len));
537 }
538
539 /**
540 * Invokes the delegate's {@link InputStream#reset()} method.
541 *
542 * @throws IOException if an I/O error occurs.
543 */
544 @Override
545 public synchronized void reset() throws IOException {
546 in.reset();
547 count = mark;
548 }
549
550 /**
551 * Sets whether the {@link #close()} method should propagate to the underling {@link InputStream}.
552 *
553 * @param propagateClose {@code true} if calling {@link #close()} propagates to the {@code close()} method of the underlying stream or {@code false} if it
554 * does not.
555 * @deprecated Use {@link AbstractBuilder#setPropagateClose(boolean)}.
556 */
557 @Deprecated
558 public synchronized void setPropagateClose(final boolean propagateClose) {
559 this.propagateClose = propagateClose;
560 }
561
562 /**
563 * Invokes the delegate's {@link InputStream#skip(long)} method.
564 *
565 * @param n the number of bytes to skip.
566 * @return the actual number of bytes skipped.
567 * @throws IOException if an I/O error occurs.
568 */
569 @Override
570 public synchronized long skip(final long n) throws IOException {
571 final long skip = super.skip(toReadLen(n));
572 count += skip;
573 return skip;
574 }
575
576 /**
577 * Converts a request to read {@code len} bytes to a lower count if reading would put us over the limit.
578 * <p>
579 * If a {@code maxCount} is not set, then return max{@code maxCount}.
580 * </p>
581 *
582 * @param len The requested byte count.
583 * @return How many bytes to actually attempt to read.
584 */
585 private long toReadLen(final long len) {
586 return maxCount >= 0 ? Math.min(len, maxCount - getCount()) : len;
587 }
588
589 /**
590 * Invokes the delegate's {@link InputStream#toString()} method.
591 *
592 * @return the delegate's {@link InputStream#toString()}.
593 */
594 @Override
595 public String toString() {
596 return in.toString();
597 }
598 }