1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17 package org.apache.commons.io.input;
18
19 import static org.apache.commons.io.IOUtils.EOF;
20
21 import java.io.IOException;
22 import java.io.InputStream;
23
24 import org.apache.commons.io.IOUtils;
25 import org.apache.commons.io.function.IOBiConsumer;
26
27 //@formatter:off
28 /**
29 * Reads bytes up to a maximum count and stops once reached.
30 * <p>
31 * To build an instance: Use the {@link #builder()} to access all features.
32 * </p>
33 * <p>
34 * By default, a {@link BoundedInputStream} is <em>unbound</em>; so make sure to call {@link AbstractBuilder#setMaxCount(long)}.
35 * </p>
36 * <p>
37 * You can find out how many bytes this stream has seen so far by calling {@link BoundedInputStream#getCount()}. This value reflects bytes read and skipped.
38 * </p>
39 * <h2>Using a ServletInputStream</h2>
40 * <p>
41 * A {@code ServletInputStream} can block if you try to read content that isn't there
42 * because it doesn't know whether the content hasn't arrived yet or whether the content has finished. Initialize an {@link BoundedInputStream} with the
43 * {@code Content-Length} sent in the {@code ServletInputStream}'s header, this stop it from blocking, providing it's been sent with a correct content
44 * length in the first place.
45 * </p>
46 * <h2>Using NIO</h2>
47 * <pre>{@code
48 * BoundedInputStream s = BoundedInputStream.builder()
49 * .setPath(Paths.get("MyFile.xml"))
50 * .setMaxCount(1024)
51 * .setPropagateClose(false)
52 * .get();
53 * }
54 * </pre>
55 * <h2>Using IO</h2>
56 * <pre>{@code
57 * BoundedInputStream s = BoundedInputStream.builder()
58 * .setFile(new File("MyFile.xml"))
59 * .setMaxCount(1024)
60 * .setPropagateClose(false)
61 * .get();
62 * }
63 * </pre>
64 * <h2>Counting Bytes</h2>
65 * <p>You can set the running count when building, which is most useful when starting from another stream:
66 * <pre>{@code
67 * InputStream in = ...;
68 * BoundedInputStream s = BoundedInputStream.builder()
69 * .setInputStream(in)
70 * .setCount(12)
71 * .setMaxCount(1024)
72 * .setPropagateClose(false)
73 * .get();
74 * }
75 * </pre>
76 * <h2>Listening for the maximum count reached</h2>
77 * <pre>{@code
78 * BoundedInputStream s = BoundedInputStream.builder()
79 * .setPath(Paths.get("MyFile.xml"))
80 * .setMaxCount(1024)
81 * .setOnMaxCount((max, count) -> System.out.printf("Maximum count %,d reached with a last read count of %,d%n", max, count))
82 * .get();
83 * }
84 * </pre>
85 *
86 * @see Builder
87 * @since 2.0
88 */
89 //@formatter:on
90 public class BoundedInputStream extends ProxyInputStream {
91
92 /**
93 * For subclassing builders from {@link BoundedInputStream} subclassses.
94 *
95 * @param <T> The subclass.
96 */
97 abstract static class AbstractBuilder<T extends AbstractBuilder<T>> extends ProxyInputStream.AbstractBuilder<BoundedInputStream, T> {
98
99 /** The current count of bytes counted. */
100 private long count;
101
102 /** The maximum count of bytes to read. */
103 private long maxCount = EOF;
104
105 private IOBiConsumer<Long, Long> onMaxCount = IOBiConsumer.noop();
106
107 /** Flag if {@link #close()} should be propagated, {@code true} by default. */
108 private boolean propagateClose = true;
109
110 long getCount() {
111 return count;
112 }
113
114 long getMaxCount() {
115 return maxCount;
116 }
117
118 IOBiConsumer<Long, Long> getOnMaxCount() {
119 return onMaxCount;
120 }
121
122 boolean isPropagateClose() {
123 return propagateClose;
124 }
125
126 /**
127 * Sets the current number of bytes counted.
128 * <p>
129 * Useful when building from another stream to carry forward a read count.
130 * </p>
131 * <p>
132 * Default is {@code 0}, negative means 0.
133 * </p>
134 *
135 * @param count The current number of bytes counted.
136 * @return {@code this} instance.
137 */
138 public T setCount(final long count) {
139 this.count = Math.max(0, count);
140 return asThis();
141 }
142
143 /**
144 * Sets the maximum number of bytes to return.
145 * <p>
146 * Default is {@value IOUtils#EOF}, negative means unbound.
147 * </p>
148 *
149 * @param maxCount The maximum number of bytes to return, negative means unbound.
150 * @return {@code this} instance.
151 */
152 public T setMaxCount(final long maxCount) {
153 this.maxCount = Math.max(EOF, maxCount);
154 return asThis();
155 }
156
157 /**
158 * Sets the default {@link BoundedInputStream#onMaxLength(long, long)} behavior, {@code null} resets to a NOOP.
159 * <p>
160 * The first Long is the number of bytes remaining to read before the maximum is reached count of bytes to read. The second Long is the count of bytes
161 * read.
162 * </p>
163 * <p>
164 * This does <em>not</em> override a {@code BoundedInputStream} subclass' implementation of the {@link BoundedInputStream#onMaxLength(long, long)}
165 * method.
166 * </p>
167 *
168 * @param onMaxCount the {@link ProxyInputStream#afterRead(int)} behavior.
169 * @return {@code this} instance.
170 * @since 2.18.0
171 */
172 public T setOnMaxCount(final IOBiConsumer<Long, Long> onMaxCount) {
173 this.onMaxCount = onMaxCount != null ? onMaxCount : IOBiConsumer.noop();
174 return asThis();
175 }
176
177 /**
178 * Sets whether the {@link #close()} method should propagate to the underling {@link InputStream}.
179 * <p>
180 * Default is {@code true}.
181 * </p>
182 *
183 * @param propagateClose {@code true} if calling {@link #close()} propagates to the {@code close()} method of the underlying stream or {@code false} if
184 * it does not.
185 * @return {@code this} instance.
186 */
187 public T setPropagateClose(final boolean propagateClose) {
188 this.propagateClose = propagateClose;
189 return asThis();
190 }
191
192 }
193
194 //@formatter:off
195 /**
196 * Builds a new {@link BoundedInputStream}.
197 * <p>
198 * By default, a {@link BoundedInputStream} is <em>unbound</em>; so make sure to call {@link AbstractBuilder#setMaxCount(long)}.
199 * </p>
200 * <p>
201 * You can find out how many bytes this stream has seen so far by calling {@link BoundedInputStream#getCount()}. This value reflects bytes read and skipped.
202 * </p>
203 * <h2>Using a ServletInputStream</h2>
204 * <p>
205 * A {@code ServletInputStream} can block if you try to read content that isn't there
206 * because it doesn't know whether the content hasn't arrived yet or whether the content has finished. Initialize an {@link BoundedInputStream} with the
207 * {@code Content-Length} sent in the {@code ServletInputStream}'s header, this stop it from blocking, providing it's been sent with a correct content
208 * length in the first place.
209 * </p>
210 * <h2>Using NIO</h2>
211 * <pre>{@code
212 * BoundedInputStream s = BoundedInputStream.builder()
213 * .setPath(Paths.get("MyFile.xml"))
214 * .setMaxCount(1024)
215 * .setPropagateClose(false)
216 * .get();
217 * }
218 * </pre>
219 * <h2>Using IO</h2>
220 * <pre>{@code
221 * BoundedInputStream s = BoundedInputStream.builder()
222 * .setFile(new File("MyFile.xml"))
223 * .setMaxCount(1024)
224 * .setPropagateClose(false)
225 * .get();
226 * }
227 * </pre>
228 * <h2>Counting Bytes</h2>
229 * <p>You can set the running count when building, which is most useful when starting from another stream:
230 * <pre>{@code
231 * InputStream in = ...;
232 * BoundedInputStream s = BoundedInputStream.builder()
233 * .setInputStream(in)
234 * .setCount(12)
235 * .setMaxCount(1024)
236 * .setPropagateClose(false)
237 * .get();
238 * }
239 * </pre>
240 *
241 * @see #get()
242 * @since 2.16.0
243 */
244 //@formatter:on
245 public static class Builder extends AbstractBuilder<Builder> {
246
247 /**
248 * Constructs a new builder of {@link BoundedInputStream}.
249 */
250 public Builder() {
251 // empty
252 }
253
254 /**
255 * Builds a new {@link BoundedInputStream}.
256 * <p>
257 * You must set an aspect that supports {@link #getInputStream()}, otherwise, this method throws an exception.
258 * </p>
259 * <p>
260 * If you start from an input stream, an exception can't be thrown, and you can call {@link #getUnchecked()} instead.
261 * </p>
262 * <p>
263 * This builder uses the following aspects:
264 * </p>
265 * <ul>
266 * <li>{@link #getInputStream()} gets the target aspect.</li>
267 * <li>{@link #getAfterRead()}</li>
268 * <li>{@code #getCount()}</li>
269 * <li>{@code #getMaxCount()}</li>
270 * <li>{@code #getOnMaxCount()}</li>
271 * <li>{@code #isPropagateClose()}</li>
272 * </ul>
273 *
274 * @return a new instance.
275 * @throws IllegalStateException if the {@code origin} is {@code null}.
276 * @throws UnsupportedOperationException if the origin cannot be converted to an {@link InputStream}.
277 * @throws IOException if an I/O error occurs converting to an {@link InputStream} using {@link #getInputStream()}.
278 * @see #getInputStream()
279 * @see #getUnchecked()
280 */
281 @Override
282 public BoundedInputStream get() throws IOException {
283 return new BoundedInputStream(this);
284 }
285
286 }
287
288 /**
289 * Constructs a new {@link AbstractBuilder}.
290 *
291 * @return a new {@link AbstractBuilder}.
292 * @since 2.16.0
293 */
294 public static Builder builder() {
295 return new Builder();
296 }
297
298 /** The current count of bytes counted. */
299 private long count;
300
301 /** The current mark. */
302 private long mark;
303
304 /** The maximum count of bytes to read. */
305 private final long maxCount;
306
307 private final IOBiConsumer<Long, Long> onMaxCount;
308
309 /**
310 * Flag if close should be propagated.
311 *
312 * TODO Make final in 3.0.
313 */
314 private boolean propagateClose = true;
315
316 private BoundedInputStream(final Builder builder) throws IOException {
317 super(builder);
318 this.count = builder.getCount();
319 this.maxCount = builder.getMaxCount();
320 this.propagateClose = builder.isPropagateClose();
321 this.onMaxCount = builder.getOnMaxCount();
322 }
323
324 /**
325 * Constructs a new {@link BoundedInputStream} that wraps the given input stream and is <em>unbounded</em>.
326 * <p>
327 * To build an instance: Use the {@link #builder()} to access all features.
328 * </p>
329 *
330 * @param in The wrapped input stream.
331 * @deprecated Use {@link AbstractBuilder#get()}.
332 */
333 @Deprecated
334 public BoundedInputStream(final InputStream in) {
335 this(in, EOF);
336 }
337
338 private BoundedInputStream(final InputStream inputStream, final Builder builder) {
339 super(inputStream, builder);
340 this.count = builder.getCount();
341 this.maxCount = builder.getMaxCount();
342 this.propagateClose = builder.isPropagateClose();
343 this.onMaxCount = builder.getOnMaxCount();
344 }
345
346 /**
347 * Constructs a new {@link BoundedInputStream} that wraps the given input stream and limits it to a certain size.
348 *
349 * @param inputStream The wrapped input stream.
350 * @param maxCount The maximum number of bytes to return, negative means unbound.
351 * @deprecated Use {@link AbstractBuilder#get()}.
352 */
353 @Deprecated
354 public BoundedInputStream(final InputStream inputStream, final long maxCount) {
355 // Some badly designed methods, for example the Servlet API, overload length
356 // such that "-1" means stream finished
357 this(inputStream, builder().setMaxCount(maxCount));
358 }
359
360 /**
361 * Adds the number of read bytes to the count.
362 *
363 * @param n number of bytes read, or -1 if no more bytes are available.
364 * @throws IOException Not thrown here but subclasses may throw.
365 * @since 2.0
366 */
367 @Override
368 protected synchronized void afterRead(final int n) throws IOException {
369 if (n != EOF) {
370 count += n;
371 }
372 super.afterRead(n);
373 }
374
375 @Override
376 public int available() throws IOException {
377 // Safe cast: value is between 0 and Integer.MAX_VALUE
378 final int remaining = (int) Math.min(getRemaining(), Integer.MAX_VALUE);
379 return Math.min(super.available(), remaining);
380 }
381
382 /**
383 * Invokes the delegate's {@link InputStream#close()} method if {@link #isPropagateClose()} is {@code true}.
384 *
385 * @throws IOException if an I/O error occurs.
386 */
387 @Override
388 public void close() throws IOException {
389 if (propagateClose) {
390 super.close();
391 }
392 }
393
394 /**
395 * Gets the count of bytes read.
396 *
397 * @return The count of bytes read.
398 * @since 2.12.0
399 */
400 public synchronized long getCount() {
401 return count;
402 }
403
404 /**
405 * Gets the maximum number of bytes to read.
406 *
407 * @return The maximum number of bytes to read, or {@value IOUtils#EOF} if unbounded.
408 * @since 2.16.0
409 */
410 public long getMaxCount() {
411 return maxCount;
412 }
413
414 /**
415 * Gets the maximum count of bytes to read.
416 *
417 * @return The maximum count of bytes to read.
418 * @since 2.12.0
419 * @deprecated Use {@link #getMaxCount()}.
420 */
421 @Deprecated
422 public long getMaxLength() {
423 return maxCount;
424 }
425
426 /**
427 * Gets the number of bytes remaining to read before the maximum is reached.
428 *
429 * <p>
430 * This method does <strong>not</strong> report the bytes available in the
431 * underlying stream; it only reflects the remaining allowance imposed by this
432 * {@code BoundedInputStream}.
433 * </p>
434 *
435 * @return The number of bytes remaining to read before the maximum is reached,
436 * or {@link Long#MAX_VALUE} if no bound is set.
437 * @since 2.16.0
438 */
439 public long getRemaining() {
440 final long maxCount = getMaxCount();
441 return maxCount == EOF ? Long.MAX_VALUE : Math.max(0, maxCount - getCount());
442 }
443
444 private boolean isMaxCount() {
445 return maxCount >= 0 && getCount() >= maxCount;
446 }
447
448 /**
449 * Tests whether the {@link #close()} method should propagate to the underling {@link InputStream}.
450 *
451 * @return {@code true} if calling {@link #close()} propagates to the {@code close()} method of the underlying stream or {@code false} if it does not.
452 */
453 public boolean isPropagateClose() {
454 return propagateClose;
455 }
456
457 /**
458 * Invokes the delegate's {@link InputStream#mark(int)} method.
459 *
460 * @param readLimit read ahead limit.
461 */
462 @Override
463 public synchronized void mark(final int readLimit) {
464 in.mark(readLimit);
465 mark = count;
466 }
467
468 /**
469 * Invokes the delegate's {@link InputStream#markSupported()} method.
470 *
471 * @return true if mark is supported, otherwise false.
472 */
473 @Override
474 public boolean markSupported() {
475 return in.markSupported();
476 }
477
478 /**
479 * A caller has caused a request that would cross the {@code maxLength} boundary.
480 * <p>
481 * Delegates to the consumer set in {@link Builder#setOnMaxCount(IOBiConsumer)}.
482 * </p>
483 *
484 * @param max The maximum count of bytes to read.
485 * @param count The count of bytes read.
486 * @throws IOException Subclasses may throw.
487 * @since 2.12.0
488 */
489 @SuppressWarnings("unused")
490 // TODO Rename to onMaxCount for 3.0
491 protected void onMaxLength(final long max, final long count) throws IOException {
492 onMaxCount.accept(max, count);
493 }
494
495 /**
496 * Invokes the delegate's {@link InputStream#read()} method if the current position is less than the limit.
497 *
498 * @return the byte read or -1 if the end of stream or the limit has been reached.
499 * @throws IOException if an I/O error occurs.
500 */
501 @Override
502 public int read() throws IOException {
503 if (isMaxCount()) {
504 onMaxLength(maxCount, getCount());
505 return EOF;
506 }
507 return super.read();
508 }
509
510 /**
511 * Invokes the delegate's {@link InputStream#read(byte[])} method.
512 *
513 * @param b the buffer to read the bytes into.
514 * @return the number of bytes read or -1 if the end of stream or the limit has been reached.
515 * @throws IOException if an I/O error occurs.
516 */
517 @Override
518 public int read(final byte[] b) throws IOException {
519 return read(b, 0, b.length);
520 }
521
522 /**
523 * Invokes the delegate's {@link InputStream#read(byte[], int, int)} method.
524 *
525 * @param b the buffer to read the bytes into.
526 * @param off The start offset.
527 * @param len The number of bytes to read.
528 * @return the number of bytes read or -1 if the end of stream or the limit has been reached.
529 * @throws IOException if an I/O error occurs.
530 */
531 @Override
532 public int read(final byte[] b, final int off, final int len) throws IOException {
533 if (isMaxCount()) {
534 onMaxLength(maxCount, getCount());
535 return EOF;
536 }
537 return super.read(b, off, (int) toReadLen(len));
538 }
539
540 /**
541 * Invokes the delegate's {@link InputStream#reset()} method.
542 *
543 * @throws IOException if an I/O error occurs.
544 */
545 @Override
546 public synchronized void reset() throws IOException {
547 in.reset();
548 count = mark;
549 }
550
551 /**
552 * Sets whether the {@link #close()} method should propagate to the underling {@link InputStream}.
553 *
554 * @param propagateClose {@code true} if calling {@link #close()} propagates to the {@code close()} method of the underlying stream or {@code false} if it
555 * does not.
556 * @deprecated Use {@link AbstractBuilder#setPropagateClose(boolean)}.
557 */
558 @Deprecated
559 public synchronized void setPropagateClose(final boolean propagateClose) {
560 this.propagateClose = propagateClose;
561 }
562
563 /**
564 * Invokes the delegate's {@link InputStream#skip(long)} method.
565 *
566 * @param n the number of bytes to skip.
567 * @return the actual number of bytes skipped.
568 * @throws IOException if an I/O error occurs.
569 */
570 @Override
571 public synchronized long skip(final long n) throws IOException {
572 final long skip = super.skip(toReadLen(n));
573 count += skip;
574 return skip;
575 }
576
577 /**
578 * Converts a request to read {@code len} bytes to a lower count if reading would put us over the limit.
579 * <p>
580 * If a {@code maxCount} is not set, then return max{@code maxCount}.
581 * </p>
582 *
583 * @param len The requested byte count.
584 * @return How many bytes to actually attempt to read.
585 */
586 private long toReadLen(final long len) {
587 return maxCount >= 0 ? Math.min(len, maxCount - getCount()) : len;
588 }
589
590 /**
591 * Invokes the delegate's {@link InputStream#toString()} method.
592 *
593 * @return the delegate's {@link InputStream#toString()}.
594 */
595 @Override
596 public String toString() {
597 return in.toString();
598 }
599 }