1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17 package org.apache.commons.io.input;
18
19 import static org.apache.commons.io.IOUtils.EOF;
20
21 import java.io.IOException;
22 import java.io.InputStream;
23
24 import org.apache.commons.io.IOUtils;
25 import org.apache.commons.io.function.IOBiConsumer;
26
27 //@formatter:off
28 /**
29 * Reads bytes up to a maximum count and stops once reached.
30 * <p>
31 * To build an instance: Use the {@link #builder()} to access all features.
32 * </p>
33 * <p>
34 * By default, a {@link BoundedInputStream} is <em>unbound</em>; so make sure to call {@link AbstractBuilder#setMaxCount(long)}.
35 * </p>
36 * <p>
37 * You can find out how many bytes this stream has seen so far by calling {@link BoundedInputStream#getCount()}. This value reflects bytes read and skipped.
38 * </p>
39 * <h2>Using a ServletInputStream</h2>
40 * <p>
41 * A {@code ServletInputStream} can block if you try to read content that isn't there
42 * because it doesn't know whether the content hasn't arrived yet or whether the content has finished. Initialize an {@link BoundedInputStream} with the
43 * {@code Content-Length} sent in the {@code ServletInputStream}'s header, this stop it from blocking, providing it's been sent with a correct content
44 * length in the first place.
45 * </p>
46 * <h2>Using NIO</h2>
47 * <pre>{@code
48 * BoundedInputStream s = BoundedInputStream.builder()
49 * .setPath(Paths.get("MyFile.xml"))
50 * .setMaxCount(1024)
51 * .setPropagateClose(false)
52 * .get();
53 * }
54 * </pre>
55 * <h2>Using IO</h2>
56 * <pre>{@code
57 * BoundedInputStream s = BoundedInputStream.builder()
58 * .setFile(new File("MyFile.xml"))
59 * .setMaxCount(1024)
60 * .setPropagateClose(false)
61 * .get();
62 * }
63 * </pre>
64 * <h2>Counting Bytes</h2>
65 * <p>You can set the running count when building, which is most useful when starting from another stream:
66 * <pre>{@code
67 * InputStream in = ...;
68 * BoundedInputStream s = BoundedInputStream.builder()
69 * .setInputStream(in)
70 * .setCount(12)
71 * .setMaxCount(1024)
72 * .setPropagateClose(false)
73 * .get();
74 * }
75 * </pre>
76 * <h2>Listening for the max count reached</h2>
77 * <pre>{@code
78 * BoundedInputStream s = BoundedInputStream.builder()
79 * .setPath(Paths.get("MyFile.xml"))
80 * .setMaxCount(1024)
81 * .setOnMaxCount((max, count) -> System.out.printf("Max count %,d reached with a last read count of %,d%n", max, count))
82 * .get();
83 * }
84 * </pre>
85 * @see Builder
86 * @since 2.0
87 */
88 //@formatter:on
89 public class BoundedInputStream extends ProxyInputStream {
90
91 /**
92 * For subclassing builders from {@link BoundedInputStream} subclassses.
93 *
94 * @param <T> The subclass.
95 */
96 abstract static class AbstractBuilder<T extends AbstractBuilder<T>> extends ProxyInputStream.AbstractBuilder<BoundedInputStream, T> {
97
98 /** The current count of bytes counted. */
99 private long count;
100
101 /** The max count of bytes to read. */
102 private long maxCount = EOF;
103
104 private IOBiConsumer<Long, Long> onMaxCount = IOBiConsumer.noop();
105
106 /** Flag if {@link #close()} should be propagated, {@code true} by default. */
107 private boolean propagateClose = true;
108
109 long getCount() {
110 return count;
111 }
112
113 long getMaxCount() {
114 return maxCount;
115 }
116
117 IOBiConsumer<Long, Long> getOnMaxCount() {
118 return onMaxCount;
119 }
120
121 boolean isPropagateClose() {
122 return propagateClose;
123 }
124
125 /**
126 * Sets the current number of bytes counted.
127 * <p>
128 * Useful when building from another stream to carry forward a read count.
129 * </p>
130 * <p>
131 * Default is {@code 0}, negative means 0.
132 * </p>
133 *
134 * @param count The current number of bytes counted.
135 * @return {@code this} instance.
136 */
137 public T setCount(final long count) {
138 this.count = Math.max(0, count);
139 return asThis();
140 }
141
142 /**
143 * Sets the maximum number of bytes to return.
144 * <p>
145 * Default is {@value IOUtils#EOF}, negative means unbound.
146 * </p>
147 *
148 * @param maxCount The maximum number of bytes to return, negative means unbound.
149 * @return {@code this} instance.
150 */
151 public T setMaxCount(final long maxCount) {
152 this.maxCount = Math.max(EOF, maxCount);
153 return asThis();
154 }
155
156 /**
157 * Sets the default {@link BoundedInputStream#onMaxLength(long, long)} behavior, {@code null} resets to a NOOP.
158 * <p>
159 * The first Long is the max count of bytes to read. The second Long is the count of bytes read.
160 * </p>
161 * <p>
162 * This does <em>not</em> override a {@code BoundedInputStream} subclass' implementation of the {@link BoundedInputStream#onMaxLength(long, long)}
163 * method.
164 * </p>
165 *
166 * @param onMaxCount the {@link ProxyInputStream#afterRead(int)} behavior.
167 * @return this instance.
168 * @since 2.18.0
169 */
170 public T setOnMaxCount(final IOBiConsumer<Long, Long> onMaxCount) {
171 this.onMaxCount = onMaxCount != null ? onMaxCount : IOBiConsumer.noop();
172 return asThis();
173 }
174
175 /**
176 * Sets whether the {@link #close()} method should propagate to the underling {@link InputStream}.
177 * <p>
178 * Default is {@code true}.
179 * </p>
180 *
181 * @param propagateClose {@code true} if calling {@link #close()} propagates to the {@code close()} method of the underlying stream or {@code false} if
182 * it does not.
183 * @return {@code this} instance.
184 */
185 public T setPropagateClose(final boolean propagateClose) {
186 this.propagateClose = propagateClose;
187 return asThis();
188 }
189
190 }
191
192 //@formatter:off
193 /**
194 * Builds a new {@link BoundedInputStream}.
195 * <p>
196 * By default, a {@link BoundedInputStream} is <em>unbound</em>; so make sure to call {@link AbstractBuilder#setMaxCount(long)}.
197 * </p>
198 * <p>
199 * You can find out how many bytes this stream has seen so far by calling {@link BoundedInputStream#getCount()}. This value reflects bytes read and skipped.
200 * </p>
201 * <h2>Using a ServletInputStream</h2>
202 * <p>
203 * A {@code ServletInputStream} can block if you try to read content that isn't there
204 * because it doesn't know whether the content hasn't arrived yet or whether the content has finished. Initialize an {@link BoundedInputStream} with the
205 * {@code Content-Length} sent in the {@code ServletInputStream}'s header, this stop it from blocking, providing it's been sent with a correct content
206 * length in the first place.
207 * </p>
208 * <h2>Using NIO</h2>
209 * <pre>{@code
210 * BoundedInputStream s = BoundedInputStream.builder()
211 * .setPath(Paths.get("MyFile.xml"))
212 * .setMaxCount(1024)
213 * .setPropagateClose(false)
214 * .get();
215 * }
216 * </pre>
217 * <h2>Using IO</h2>
218 * <pre>{@code
219 * BoundedInputStream s = BoundedInputStream.builder()
220 * .setFile(new File("MyFile.xml"))
221 * .setMaxCount(1024)
222 * .setPropagateClose(false)
223 * .get();
224 * }
225 * </pre>
226 * <h2>Counting Bytes</h2>
227 * <p>You can set the running count when building, which is most useful when starting from another stream:
228 * <pre>{@code
229 * InputStream in = ...;
230 * BoundedInputStream s = BoundedInputStream.builder()
231 * .setInputStream(in)
232 * .setCount(12)
233 * .setMaxCount(1024)
234 * .setPropagateClose(false)
235 * .get();
236 * }
237 * </pre>
238 *
239 * @see #get()
240 * @since 2.16.0
241 */
242 //@formatter:on
243 public static class Builder extends AbstractBuilder<Builder> {
244
245 /**
246 * Constructs a new builder of {@link BoundedInputStream}.
247 */
248 public Builder() {
249 // empty
250 }
251
252 /**
253 * Builds a new {@link BoundedInputStream}.
254 * <p>
255 * You must set an aspect that supports {@link #getInputStream()}, otherwise, this method throws an exception.
256 * </p>
257 * <p>
258 * If you start from an input stream, an exception can't be thrown, and you can call {@link #getUnchecked()} instead.
259 * </p>
260 * <p>
261 * This builder uses the following aspects:
262 * </p>
263 * <ul>
264 * <li>{@link #getInputStream()} gets the target aspect.</li>
265 * <li>{@link #getAfterRead()}</li>
266 * <li>{@link #getCount()}</li>
267 * <li>{@link #getMaxCount()}</li>
268 * <li>{@link #getOnMaxCount()}</li>
269 * <li>{@link #isPropagateClose()}</li>
270 * </ul>
271 *
272 * @return a new instance.
273 * @throws IllegalStateException if the {@code origin} is {@code null}.
274 * @throws UnsupportedOperationException if the origin cannot be converted to an {@link InputStream}.
275 * @throws IOException if an I/O error occurs converting to an {@link InputStream} using {@link #getInputStream()}.
276 * @see #getInputStream()
277 * @see #getUnchecked()
278 */
279 @Override
280 public BoundedInputStream get() throws IOException {
281 return new BoundedInputStream(this);
282 }
283
284 }
285
286 /**
287 * Constructs a new {@link AbstractBuilder}.
288 *
289 * @return a new {@link AbstractBuilder}.
290 * @since 2.16.0
291 */
292 public static Builder builder() {
293 return new Builder();
294 }
295
296 /** The current count of bytes counted. */
297 private long count;
298
299 /** The current mark. */
300 private long mark;
301
302 /** The max count of bytes to read. */
303 private final long maxCount;
304
305 private final IOBiConsumer<Long, Long> onMaxCount;
306
307 /**
308 * Flag if close should be propagated.
309 *
310 * TODO Make final in 3.0.
311 */
312 private boolean propagateClose = true;
313
314 BoundedInputStream(final Builder builder) throws IOException {
315 super(builder);
316 this.count = builder.getCount();
317 this.maxCount = builder.getMaxCount();
318 this.propagateClose = builder.isPropagateClose();
319 this.onMaxCount = builder.getOnMaxCount();
320 }
321
322 /**
323 * Constructs a new {@link BoundedInputStream} that wraps the given input stream and is <em>unbounded</em>.
324 * <p>
325 * To build an instance: Use the {@link #builder()} to access all features.
326 * </p>
327 *
328 * @param in The wrapped input stream.
329 * @deprecated Use {@link AbstractBuilder#get()}.
330 */
331 @Deprecated
332 public BoundedInputStream(final InputStream in) {
333 this(in, EOF);
334 }
335
336 BoundedInputStream(final InputStream inputStream, final Builder builder) {
337 super(inputStream, builder);
338 this.count = builder.getCount();
339 this.maxCount = builder.getMaxCount();
340 this.propagateClose = builder.isPropagateClose();
341 this.onMaxCount = builder.getOnMaxCount();
342 }
343
344 /**
345 * Constructs a new {@link BoundedInputStream} that wraps the given input stream and limits it to a certain size.
346 *
347 * @param inputStream The wrapped input stream.
348 * @param maxCount The maximum number of bytes to return, negative means unbound.
349 * @deprecated Use {@link AbstractBuilder#get()}.
350 */
351 @Deprecated
352 public BoundedInputStream(final InputStream inputStream, final long maxCount) {
353 // Some badly designed methods - e.g. the Servlet API - overload length
354 // such that "-1" means stream finished
355 this(inputStream, builder().setMaxCount(maxCount));
356 }
357
358 /**
359 * Adds the number of read bytes to the count.
360 *
361 * @param n number of bytes read, or -1 if no more bytes are available
362 * @throws IOException Not thrown here but subclasses may throw.
363 * @since 2.0
364 */
365 @Override
366 protected synchronized void afterRead(final int n) throws IOException {
367 if (n != EOF) {
368 count += n;
369 }
370 super.afterRead(n);
371 }
372
373 /**
374 * {@inheritDoc}
375 */
376 @Override
377 public int available() throws IOException {
378 if (isMaxCount()) {
379 onMaxLength(maxCount, getCount());
380 return 0;
381 }
382 return in.available();
383 }
384
385 /**
386 * Invokes the delegate's {@link InputStream#close()} method if {@link #isPropagateClose()} is {@code true}.
387 *
388 * @throws IOException if an I/O error occurs.
389 */
390 @Override
391 public void close() throws IOException {
392 if (propagateClose) {
393 super.close();
394 }
395 }
396
397 /**
398 * Gets the count of bytes read.
399 *
400 * @return The count of bytes read.
401 * @since 2.12.0
402 */
403 public synchronized long getCount() {
404 return count;
405 }
406
407 /**
408 * Gets the max count of bytes to read.
409 *
410 * @return The max count of bytes to read.
411 * @since 2.16.0
412 */
413 public long getMaxCount() {
414 return maxCount;
415 }
416
417 /**
418 * Gets the max count of bytes to read.
419 *
420 * @return The max count of bytes to read.
421 * @since 2.12.0
422 * @deprecated Use {@link #getMaxCount()}.
423 */
424 @Deprecated
425 public long getMaxLength() {
426 return maxCount;
427 }
428
429 /**
430 * Gets how many bytes remain to read.
431 *
432 * @return bytes how many bytes remain to read.
433 * @since 2.16.0
434 */
435 public long getRemaining() {
436 return Math.max(0, getMaxCount() - getCount());
437 }
438
439 private boolean isMaxCount() {
440 return maxCount >= 0 && getCount() >= maxCount;
441 }
442
443 /**
444 * Tests whether the {@link #close()} method should propagate to the underling {@link InputStream}.
445 *
446 * @return {@code true} if calling {@link #close()} propagates to the {@code close()} method of the underlying stream or {@code false} if it does not.
447 */
448 public boolean isPropagateClose() {
449 return propagateClose;
450 }
451
452 /**
453 * Invokes the delegate's {@link InputStream#mark(int)} method.
454 *
455 * @param readLimit read ahead limit
456 */
457 @Override
458 public synchronized void mark(final int readLimit) {
459 in.mark(readLimit);
460 mark = count;
461 }
462
463 /**
464 * Invokes the delegate's {@link InputStream#markSupported()} method.
465 *
466 * @return true if mark is supported, otherwise false
467 */
468 @Override
469 public boolean markSupported() {
470 return in.markSupported();
471 }
472
473 /**
474 * A caller has caused a request that would cross the {@code maxLength} boundary.
475 * <p>
476 * Delegates to the consumer set in {@link Builder#setOnMaxCount(IOBiConsumer)}.
477 * </p>
478 *
479 * @param max The max count of bytes to read.
480 * @param count The count of bytes read.
481 * @throws IOException Subclasses may throw.
482 * @since 2.12.0
483 */
484 @SuppressWarnings("unused")
485 // TODO Rename to onMaxCount for 3.0
486 protected void onMaxLength(final long max, final long count) throws IOException {
487 onMaxCount.accept(max, count);
488 }
489
490 /**
491 * Invokes the delegate's {@link InputStream#read()} method if the current position is less than the limit.
492 *
493 * @return the byte read or -1 if the end of stream or the limit has been reached.
494 * @throws IOException if an I/O error occurs.
495 */
496 @Override
497 public int read() throws IOException {
498 if (isMaxCount()) {
499 onMaxLength(maxCount, getCount());
500 return EOF;
501 }
502 return super.read();
503 }
504
505 /**
506 * Invokes the delegate's {@link InputStream#read(byte[])} method.
507 *
508 * @param b the buffer to read the bytes into
509 * @return the number of bytes read or -1 if the end of stream or the limit has been reached.
510 * @throws IOException if an I/O error occurs.
511 */
512 @Override
513 public int read(final byte[] b) throws IOException {
514 return read(b, 0, b.length);
515 }
516
517 /**
518 * Invokes the delegate's {@link InputStream#read(byte[], int, int)} method.
519 *
520 * @param b the buffer to read the bytes into
521 * @param off The start offset
522 * @param len The number of bytes to read
523 * @return the number of bytes read or -1 if the end of stream or the limit has been reached.
524 * @throws IOException if an I/O error occurs.
525 */
526 @Override
527 public int read(final byte[] b, final int off, final int len) throws IOException {
528 if (isMaxCount()) {
529 onMaxLength(maxCount, getCount());
530 return EOF;
531 }
532 return super.read(b, off, (int) toReadLen(len));
533 }
534
535 /**
536 * Invokes the delegate's {@link InputStream#reset()} method.
537 *
538 * @throws IOException if an I/O error occurs.
539 */
540 @Override
541 public synchronized void reset() throws IOException {
542 in.reset();
543 count = mark;
544 }
545
546 /**
547 * Sets whether the {@link #close()} method should propagate to the underling {@link InputStream}.
548 *
549 * @param propagateClose {@code true} if calling {@link #close()} propagates to the {@code close()} method of the underlying stream or {@code false} if it
550 * does not.
551 * @deprecated Use {@link AbstractBuilder#setPropagateClose(boolean)}.
552 */
553 @Deprecated
554 public void setPropagateClose(final boolean propagateClose) {
555 this.propagateClose = propagateClose;
556 }
557
558 /**
559 * Invokes the delegate's {@link InputStream#skip(long)} method.
560 *
561 * @param n the number of bytes to skip
562 * @return the actual number of bytes skipped
563 * @throws IOException if an I/O error occurs.
564 */
565 @Override
566 public synchronized long skip(final long n) throws IOException {
567 final long skip = super.skip(toReadLen(n));
568 count += skip;
569 return skip;
570 }
571
572 private long toReadLen(final long len) {
573 return maxCount >= 0 ? Math.min(len, maxCount - getCount()) : len;
574 }
575
576 /**
577 * Invokes the delegate's {@link InputStream#toString()} method.
578 *
579 * @return the delegate's {@link InputStream#toString()}
580 */
581 @Override
582 public String toString() {
583 return in.toString();
584 }
585 }