001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * https://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.commons.io.input; 018 019import static org.apache.commons.io.IOUtils.EOF; 020 021import java.io.IOException; 022import java.io.InputStream; 023 024import org.apache.commons.io.IOUtils; 025import org.apache.commons.io.function.IOBiConsumer; 026 027//@formatter:off 028/** 029 * Reads bytes up to a maximum count and stops once reached. 030 * <p> 031 * To build an instance: Use the {@link #builder()} to access all features. 032 * </p> 033 * <p> 034 * By default, a {@link BoundedInputStream} is <em>unbound</em>; so make sure to call {@link AbstractBuilder#setMaxCount(long)}. 035 * </p> 036 * <p> 037 * You can find out how many bytes this stream has seen so far by calling {@link BoundedInputStream#getCount()}. This value reflects bytes read and skipped. 038 * </p> 039 * <h2>Using a ServletInputStream</h2> 040 * <p> 041 * A {@code ServletInputStream} can block if you try to read content that isn't there 042 * because it doesn't know whether the content hasn't arrived yet or whether the content has finished. Initialize an {@link BoundedInputStream} with the 043 * {@code Content-Length} sent in the {@code ServletInputStream}'s header, this stop it from blocking, providing it's been sent with a correct content 044 * length in the first place. 045 * </p> 046 * <h2>Using NIO</h2> 047 * <pre>{@code 048 * BoundedInputStream s = BoundedInputStream.builder() 049 * .setPath(Paths.get("MyFile.xml")) 050 * .setMaxCount(1024) 051 * .setPropagateClose(false) 052 * .get(); 053 * } 054 * </pre> 055 * <h2>Using IO</h2> 056 * <pre>{@code 057 * BoundedInputStream s = BoundedInputStream.builder() 058 * .setFile(new File("MyFile.xml")) 059 * .setMaxCount(1024) 060 * .setPropagateClose(false) 061 * .get(); 062 * } 063 * </pre> 064 * <h2>Counting Bytes</h2> 065 * <p>You can set the running count when building, which is most useful when starting from another stream: 066 * <pre>{@code 067 * InputStream in = ...; 068 * BoundedInputStream s = BoundedInputStream.builder() 069 * .setInputStream(in) 070 * .setCount(12) 071 * .setMaxCount(1024) 072 * .setPropagateClose(false) 073 * .get(); 074 * } 075 * </pre> 076 * <h2>Listening for the maximum count reached</h2> 077 * <pre>{@code 078 * BoundedInputStream s = BoundedInputStream.builder() 079 * .setPath(Paths.get("MyFile.xml")) 080 * .setMaxCount(1024) 081 * .setOnMaxCount((max, count) -> System.out.printf("Maximum count %,d reached with a last read count of %,d%n", max, count)) 082 * .get(); 083 * } 084 * </pre> 085 * @see Builder 086 * @since 2.0 087 */ 088//@formatter:on 089public class BoundedInputStream extends ProxyInputStream { 090 091 /** 092 * For subclassing builders from {@link BoundedInputStream} subclassses. 093 * 094 * @param <T> The subclass. 095 */ 096 abstract static class AbstractBuilder<T extends AbstractBuilder<T>> extends ProxyInputStream.AbstractBuilder<BoundedInputStream, T> { 097 098 /** The current count of bytes counted. */ 099 private long count; 100 101 /** The maximum count of bytes to read. */ 102 private long maxCount = EOF; 103 104 private IOBiConsumer<Long, Long> onMaxCount = IOBiConsumer.noop(); 105 106 /** Flag if {@link #close()} should be propagated, {@code true} by default. */ 107 private boolean propagateClose = true; 108 109 long getCount() { 110 return count; 111 } 112 113 long getMaxCount() { 114 return maxCount; 115 } 116 117 IOBiConsumer<Long, Long> getOnMaxCount() { 118 return onMaxCount; 119 } 120 121 boolean isPropagateClose() { 122 return propagateClose; 123 } 124 125 /** 126 * Sets the current number of bytes counted. 127 * <p> 128 * Useful when building from another stream to carry forward a read count. 129 * </p> 130 * <p> 131 * Default is {@code 0}, negative means 0. 132 * </p> 133 * 134 * @param count The current number of bytes counted. 135 * @return {@code this} instance. 136 */ 137 public T setCount(final long count) { 138 this.count = Math.max(0, count); 139 return asThis(); 140 } 141 142 /** 143 * Sets the maximum number of bytes to return. 144 * <p> 145 * Default is {@value IOUtils#EOF}, negative means unbound. 146 * </p> 147 * 148 * @param maxCount The maximum number of bytes to return, negative means unbound. 149 * @return {@code this} instance. 150 */ 151 public T setMaxCount(final long maxCount) { 152 this.maxCount = Math.max(EOF, maxCount); 153 return asThis(); 154 } 155 156 /** 157 * Sets the default {@link BoundedInputStream#onMaxLength(long, long)} behavior, {@code null} resets to a NOOP. 158 * <p> 159 * The first Long is the number of bytes remaining to read before the maximum is reached count of bytes to read. The second Long is the count of bytes 160 * read. 161 * </p> 162 * <p> 163 * This does <em>not</em> override a {@code BoundedInputStream} subclass' implementation of the {@link BoundedInputStream#onMaxLength(long, long)} 164 * method. 165 * </p> 166 * 167 * @param onMaxCount the {@link ProxyInputStream#afterRead(int)} behavior. 168 * @return {@code this} instance. 169 * @since 2.18.0 170 */ 171 public T setOnMaxCount(final IOBiConsumer<Long, Long> onMaxCount) { 172 this.onMaxCount = onMaxCount != null ? onMaxCount : IOBiConsumer.noop(); 173 return asThis(); 174 } 175 176 /** 177 * Sets whether the {@link #close()} method should propagate to the underling {@link InputStream}. 178 * <p> 179 * Default is {@code true}. 180 * </p> 181 * 182 * @param propagateClose {@code true} if calling {@link #close()} propagates to the {@code close()} method of the underlying stream or {@code false} if 183 * it does not. 184 * @return {@code this} instance. 185 */ 186 public T setPropagateClose(final boolean propagateClose) { 187 this.propagateClose = propagateClose; 188 return asThis(); 189 } 190 191 } 192 193 //@formatter:off 194 /** 195 * Builds a new {@link BoundedInputStream}. 196 * <p> 197 * By default, a {@link BoundedInputStream} is <em>unbound</em>; so make sure to call {@link AbstractBuilder#setMaxCount(long)}. 198 * </p> 199 * <p> 200 * You can find out how many bytes this stream has seen so far by calling {@link BoundedInputStream#getCount()}. This value reflects bytes read and skipped. 201 * </p> 202 * <h2>Using a ServletInputStream</h2> 203 * <p> 204 * A {@code ServletInputStream} can block if you try to read content that isn't there 205 * because it doesn't know whether the content hasn't arrived yet or whether the content has finished. Initialize an {@link BoundedInputStream} with the 206 * {@code Content-Length} sent in the {@code ServletInputStream}'s header, this stop it from blocking, providing it's been sent with a correct content 207 * length in the first place. 208 * </p> 209 * <h2>Using NIO</h2> 210 * <pre>{@code 211 * BoundedInputStream s = BoundedInputStream.builder() 212 * .setPath(Paths.get("MyFile.xml")) 213 * .setMaxCount(1024) 214 * .setPropagateClose(false) 215 * .get(); 216 * } 217 * </pre> 218 * <h2>Using IO</h2> 219 * <pre>{@code 220 * BoundedInputStream s = BoundedInputStream.builder() 221 * .setFile(new File("MyFile.xml")) 222 * .setMaxCount(1024) 223 * .setPropagateClose(false) 224 * .get(); 225 * } 226 * </pre> 227 * <h2>Counting Bytes</h2> 228 * <p>You can set the running count when building, which is most useful when starting from another stream: 229 * <pre>{@code 230 * InputStream in = ...; 231 * BoundedInputStream s = BoundedInputStream.builder() 232 * .setInputStream(in) 233 * .setCount(12) 234 * .setMaxCount(1024) 235 * .setPropagateClose(false) 236 * .get(); 237 * } 238 * </pre> 239 * 240 * @see #get() 241 * @since 2.16.0 242 */ 243 //@formatter:on 244 public static class Builder extends AbstractBuilder<Builder> { 245 246 /** 247 * Constructs a new builder of {@link BoundedInputStream}. 248 */ 249 public Builder() { 250 // empty 251 } 252 253 /** 254 * Builds a new {@link BoundedInputStream}. 255 * <p> 256 * You must set an aspect that supports {@link #getInputStream()}, otherwise, this method throws an exception. 257 * </p> 258 * <p> 259 * If you start from an input stream, an exception can't be thrown, and you can call {@link #getUnchecked()} instead. 260 * </p> 261 * <p> 262 * This builder uses the following aspects: 263 * </p> 264 * <ul> 265 * <li>{@link #getInputStream()} gets the target aspect.</li> 266 * <li>{@link #getAfterRead()}</li> 267 * <li>{@code #getCount()}</li> 268 * <li>{@code #getMaxCount()}</li> 269 * <li>{@code #getOnMaxCount()}</li> 270 * <li>{@code #isPropagateClose()}</li> 271 * </ul> 272 * 273 * @return a new instance. 274 * @throws IllegalStateException if the {@code origin} is {@code null}. 275 * @throws UnsupportedOperationException if the origin cannot be converted to an {@link InputStream}. 276 * @throws IOException if an I/O error occurs converting to an {@link InputStream} using {@link #getInputStream()}. 277 * @see #getInputStream() 278 * @see #getUnchecked() 279 */ 280 @Override 281 public BoundedInputStream get() throws IOException { 282 return new BoundedInputStream(this); 283 } 284 285 } 286 287 /** 288 * Constructs a new {@link AbstractBuilder}. 289 * 290 * @return a new {@link AbstractBuilder}. 291 * @since 2.16.0 292 */ 293 public static Builder builder() { 294 return new Builder(); 295 } 296 297 /** The current count of bytes counted. */ 298 private long count; 299 300 /** The current mark. */ 301 private long mark; 302 303 /** The maximum count of bytes to read. */ 304 private final long maxCount; 305 306 private final IOBiConsumer<Long, Long> onMaxCount; 307 308 /** 309 * Flag if close should be propagated. 310 * 311 * TODO Make final in 3.0. 312 */ 313 private boolean propagateClose = true; 314 315 private BoundedInputStream(final Builder builder) throws IOException { 316 super(builder); 317 this.count = builder.getCount(); 318 this.maxCount = builder.getMaxCount(); 319 this.propagateClose = builder.isPropagateClose(); 320 this.onMaxCount = builder.getOnMaxCount(); 321 } 322 323 /** 324 * Constructs a new {@link BoundedInputStream} that wraps the given input stream and is <em>unbounded</em>. 325 * <p> 326 * To build an instance: Use the {@link #builder()} to access all features. 327 * </p> 328 * 329 * @param in The wrapped input stream. 330 * @deprecated Use {@link AbstractBuilder#get()}. 331 */ 332 @Deprecated 333 public BoundedInputStream(final InputStream in) { 334 this(in, EOF); 335 } 336 337 private BoundedInputStream(final InputStream inputStream, final Builder builder) { 338 super(inputStream, builder); 339 this.count = builder.getCount(); 340 this.maxCount = builder.getMaxCount(); 341 this.propagateClose = builder.isPropagateClose(); 342 this.onMaxCount = builder.getOnMaxCount(); 343 } 344 345 /** 346 * Constructs a new {@link BoundedInputStream} that wraps the given input stream and limits it to a certain size. 347 * 348 * @param inputStream The wrapped input stream. 349 * @param maxCount The maximum number of bytes to return, negative means unbound. 350 * @deprecated Use {@link AbstractBuilder#get()}. 351 */ 352 @Deprecated 353 public BoundedInputStream(final InputStream inputStream, final long maxCount) { 354 // Some badly designed methods, for example the Servlet API, overload length 355 // such that "-1" means stream finished 356 this(inputStream, builder().setMaxCount(maxCount)); 357 } 358 359 /** 360 * Adds the number of read bytes to the count. 361 * 362 * @param n number of bytes read, or -1 if no more bytes are available. 363 * @throws IOException Not thrown here but subclasses may throw. 364 * @since 2.0 365 */ 366 @Override 367 protected synchronized void afterRead(final int n) throws IOException { 368 if (n != EOF) { 369 count += n; 370 } 371 super.afterRead(n); 372 } 373 374 @Override 375 public int available() throws IOException { 376 // Safe cast: value is between 0 and Integer.MAX_VALUE 377 final int remaining = (int) Math.min(getRemaining(), Integer.MAX_VALUE); 378 return Math.min(super.available(), remaining); 379 } 380 381 /** 382 * Invokes the delegate's {@link InputStream#close()} method if {@link #isPropagateClose()} is {@code true}. 383 * 384 * @throws IOException if an I/O error occurs. 385 */ 386 @Override 387 public void close() throws IOException { 388 if (propagateClose) { 389 super.close(); 390 } 391 } 392 393 /** 394 * Gets the count of bytes read. 395 * 396 * @return The count of bytes read. 397 * @since 2.12.0 398 */ 399 public synchronized long getCount() { 400 return count; 401 } 402 403 /** 404 * Gets the maximum number of bytes to read. 405 * 406 * @return The maximum number of bytes to read, or {@value IOUtils#EOF} if unbounded. 407 * @since 2.16.0 408 */ 409 public long getMaxCount() { 410 return maxCount; 411 } 412 413 /** 414 * Gets the maximum count of bytes to read. 415 * 416 * @return The maximum count of bytes to read. 417 * @since 2.12.0 418 * @deprecated Use {@link #getMaxCount()}. 419 */ 420 @Deprecated 421 public long getMaxLength() { 422 return maxCount; 423 } 424 425 /** 426 * Gets the number of bytes remaining to read before the maximum is reached. 427 * 428 * <p> 429 * This method does <strong>not</strong> report the bytes available in the 430 * underlying stream; it only reflects the remaining allowance imposed by this 431 * {@code BoundedInputStream}. 432 * </p> 433 * 434 * @return The number of bytes remaining to read before the maximum is reached, 435 * or {@link Long#MAX_VALUE} if no bound is set. 436 * @since 2.16.0 437 */ 438 public long getRemaining() { 439 final long maxCount = getMaxCount(); 440 return maxCount == EOF ? Long.MAX_VALUE : Math.max(0, maxCount - getCount()); 441 } 442 443 private boolean isMaxCount() { 444 return maxCount >= 0 && getCount() >= maxCount; 445 } 446 447 /** 448 * Tests whether the {@link #close()} method should propagate to the underling {@link InputStream}. 449 * 450 * @return {@code true} if calling {@link #close()} propagates to the {@code close()} method of the underlying stream or {@code false} if it does not. 451 */ 452 public boolean isPropagateClose() { 453 return propagateClose; 454 } 455 456 /** 457 * Invokes the delegate's {@link InputStream#mark(int)} method. 458 * 459 * @param readLimit read ahead limit. 460 */ 461 @Override 462 public synchronized void mark(final int readLimit) { 463 in.mark(readLimit); 464 mark = count; 465 } 466 467 /** 468 * Invokes the delegate's {@link InputStream#markSupported()} method. 469 * 470 * @return true if mark is supported, otherwise false. 471 */ 472 @Override 473 public boolean markSupported() { 474 return in.markSupported(); 475 } 476 477 /** 478 * A caller has caused a request that would cross the {@code maxLength} boundary. 479 * <p> 480 * Delegates to the consumer set in {@link Builder#setOnMaxCount(IOBiConsumer)}. 481 * </p> 482 * 483 * @param max The maximum count of bytes to read. 484 * @param count The count of bytes read. 485 * @throws IOException Subclasses may throw. 486 * @since 2.12.0 487 */ 488 @SuppressWarnings("unused") 489 // TODO Rename to onMaxCount for 3.0 490 protected void onMaxLength(final long max, final long count) throws IOException { 491 onMaxCount.accept(max, count); 492 } 493 494 /** 495 * Invokes the delegate's {@link InputStream#read()} method if the current position is less than the limit. 496 * 497 * @return the byte read or -1 if the end of stream or the limit has been reached. 498 * @throws IOException if an I/O error occurs. 499 */ 500 @Override 501 public int read() throws IOException { 502 if (isMaxCount()) { 503 onMaxLength(maxCount, getCount()); 504 return EOF; 505 } 506 return super.read(); 507 } 508 509 /** 510 * Invokes the delegate's {@link InputStream#read(byte[])} method. 511 * 512 * @param b the buffer to read the bytes into. 513 * @return the number of bytes read or -1 if the end of stream or the limit has been reached. 514 * @throws IOException if an I/O error occurs. 515 */ 516 @Override 517 public int read(final byte[] b) throws IOException { 518 return read(b, 0, b.length); 519 } 520 521 /** 522 * Invokes the delegate's {@link InputStream#read(byte[], int, int)} method. 523 * 524 * @param b the buffer to read the bytes into. 525 * @param off The start offset. 526 * @param len The number of bytes to read. 527 * @return the number of bytes read or -1 if the end of stream or the limit has been reached. 528 * @throws IOException if an I/O error occurs. 529 */ 530 @Override 531 public int read(final byte[] b, final int off, final int len) throws IOException { 532 if (isMaxCount()) { 533 onMaxLength(maxCount, getCount()); 534 return EOF; 535 } 536 return super.read(b, off, (int) toReadLen(len)); 537 } 538 539 /** 540 * Invokes the delegate's {@link InputStream#reset()} method. 541 * 542 * @throws IOException if an I/O error occurs. 543 */ 544 @Override 545 public synchronized void reset() throws IOException { 546 in.reset(); 547 count = mark; 548 } 549 550 /** 551 * Sets whether the {@link #close()} method should propagate to the underling {@link InputStream}. 552 * 553 * @param propagateClose {@code true} if calling {@link #close()} propagates to the {@code close()} method of the underlying stream or {@code false} if it 554 * does not. 555 * @deprecated Use {@link AbstractBuilder#setPropagateClose(boolean)}. 556 */ 557 @Deprecated 558 public synchronized void setPropagateClose(final boolean propagateClose) { 559 this.propagateClose = propagateClose; 560 } 561 562 /** 563 * Invokes the delegate's {@link InputStream#skip(long)} method. 564 * 565 * @param n the number of bytes to skip. 566 * @return the actual number of bytes skipped. 567 * @throws IOException if an I/O error occurs. 568 */ 569 @Override 570 public synchronized long skip(final long n) throws IOException { 571 final long skip = super.skip(toReadLen(n)); 572 count += skip; 573 return skip; 574 } 575 576 /** 577 * Converts a request to read {@code len} bytes to a lower count if reading would put us over the limit. 578 * <p> 579 * If a {@code maxCount} is not set, then return max{@code maxCount}. 580 * </p> 581 * 582 * @param len The requested byte count. 583 * @return How many bytes to actually attempt to read. 584 */ 585 private long toReadLen(final long len) { 586 return maxCount >= 0 ? Math.min(len, maxCount - getCount()) : len; 587 } 588 589 /** 590 * Invokes the delegate's {@link InputStream#toString()} method. 591 * 592 * @return the delegate's {@link InputStream#toString()}. 593 */ 594 @Override 595 public String toString() { 596 return in.toString(); 597 } 598}