001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * https://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.commons.io.input; 018 019import static org.apache.commons.io.IOUtils.EOF; 020 021import java.io.IOException; 022import java.io.InputStream; 023 024import org.apache.commons.io.IOUtils; 025import org.apache.commons.io.function.IOBiConsumer; 026 027//@formatter:off 028/** 029 * Reads bytes up to a maximum count and stops once reached. 030 * <p> 031 * To build an instance: Use the {@link #builder()} to access all features. 032 * </p> 033 * <p> 034 * By default, a {@link BoundedInputStream} is <em>unbound</em>; so make sure to call {@link AbstractBuilder#setMaxCount(long)}. 035 * </p> 036 * <p> 037 * You can find out how many bytes this stream has seen so far by calling {@link BoundedInputStream#getCount()}. This value reflects bytes read and skipped. 038 * </p> 039 * <h2>Using a ServletInputStream</h2> 040 * <p> 041 * A {@code ServletInputStream} can block if you try to read content that isn't there 042 * because it doesn't know whether the content hasn't arrived yet or whether the content has finished. Initialize an {@link BoundedInputStream} with the 043 * {@code Content-Length} sent in the {@code ServletInputStream}'s header, this stop it from blocking, providing it's been sent with a correct content 044 * length in the first place. 045 * </p> 046 * <h2>Using NIO</h2> 047 * <pre>{@code 048 * BoundedInputStream s = BoundedInputStream.builder() 049 * .setPath(Paths.get("MyFile.xml")) 050 * .setMaxCount(1024) 051 * .setPropagateClose(false) 052 * .get(); 053 * } 054 * </pre> 055 * <h2>Using IO</h2> 056 * <pre>{@code 057 * BoundedInputStream s = BoundedInputStream.builder() 058 * .setFile(new File("MyFile.xml")) 059 * .setMaxCount(1024) 060 * .setPropagateClose(false) 061 * .get(); 062 * } 063 * </pre> 064 * <h2>Counting Bytes</h2> 065 * <p>You can set the running count when building, which is most useful when starting from another stream: 066 * <pre>{@code 067 * InputStream in = ...; 068 * BoundedInputStream s = BoundedInputStream.builder() 069 * .setInputStream(in) 070 * .setCount(12) 071 * .setMaxCount(1024) 072 * .setPropagateClose(false) 073 * .get(); 074 * } 075 * </pre> 076 * <h2>Listening for the maximum count reached</h2> 077 * <pre>{@code 078 * BoundedInputStream s = BoundedInputStream.builder() 079 * .setPath(Paths.get("MyFile.xml")) 080 * .setMaxCount(1024) 081 * .setOnMaxCount((max, count) -> System.out.printf("Maximum count %,d reached with a last read count of %,d%n", max, count)) 082 * .get(); 083 * } 084 * </pre> 085 * 086 * @see Builder 087 * @since 2.0 088 */ 089//@formatter:on 090public class BoundedInputStream extends ProxyInputStream { 091 092 /** 093 * For subclassing builders from {@link BoundedInputStream} subclassses. 094 * 095 * @param <T> The subclass. 096 */ 097 abstract static class AbstractBuilder<T extends AbstractBuilder<T>> extends ProxyInputStream.AbstractBuilder<BoundedInputStream, T> { 098 099 /** The current count of bytes counted. */ 100 private long count; 101 102 /** The maximum count of bytes to read. */ 103 private long maxCount = EOF; 104 105 private IOBiConsumer<Long, Long> onMaxCount = IOBiConsumer.noop(); 106 107 /** Flag if {@link #close()} should be propagated, {@code true} by default. */ 108 private boolean propagateClose = true; 109 110 long getCount() { 111 return count; 112 } 113 114 long getMaxCount() { 115 return maxCount; 116 } 117 118 IOBiConsumer<Long, Long> getOnMaxCount() { 119 return onMaxCount; 120 } 121 122 boolean isPropagateClose() { 123 return propagateClose; 124 } 125 126 /** 127 * Sets the current number of bytes counted. 128 * <p> 129 * Useful when building from another stream to carry forward a read count. 130 * </p> 131 * <p> 132 * Default is {@code 0}, negative means 0. 133 * </p> 134 * 135 * @param count The current number of bytes counted. 136 * @return {@code this} instance. 137 */ 138 public T setCount(final long count) { 139 this.count = Math.max(0, count); 140 return asThis(); 141 } 142 143 /** 144 * Sets the maximum number of bytes to return. 145 * <p> 146 * Default is {@value IOUtils#EOF}, negative means unbound. 147 * </p> 148 * 149 * @param maxCount The maximum number of bytes to return, negative means unbound. 150 * @return {@code this} instance. 151 */ 152 public T setMaxCount(final long maxCount) { 153 this.maxCount = Math.max(EOF, maxCount); 154 return asThis(); 155 } 156 157 /** 158 * Sets the default {@link BoundedInputStream#onMaxLength(long, long)} behavior, {@code null} resets to a NOOP. 159 * <p> 160 * The first Long is the number of bytes remaining to read before the maximum is reached count of bytes to read. The second Long is the count of bytes 161 * read. 162 * </p> 163 * <p> 164 * This does <em>not</em> override a {@code BoundedInputStream} subclass' implementation of the {@link BoundedInputStream#onMaxLength(long, long)} 165 * method. 166 * </p> 167 * 168 * @param onMaxCount the {@link ProxyInputStream#afterRead(int)} behavior. 169 * @return {@code this} instance. 170 * @since 2.18.0 171 */ 172 public T setOnMaxCount(final IOBiConsumer<Long, Long> onMaxCount) { 173 this.onMaxCount = onMaxCount != null ? onMaxCount : IOBiConsumer.noop(); 174 return asThis(); 175 } 176 177 /** 178 * Sets whether the {@link #close()} method should propagate to the underling {@link InputStream}. 179 * <p> 180 * Default is {@code true}. 181 * </p> 182 * 183 * @param propagateClose {@code true} if calling {@link #close()} propagates to the {@code close()} method of the underlying stream or {@code false} if 184 * it does not. 185 * @return {@code this} instance. 186 */ 187 public T setPropagateClose(final boolean propagateClose) { 188 this.propagateClose = propagateClose; 189 return asThis(); 190 } 191 192 } 193 194 //@formatter:off 195 /** 196 * Builds a new {@link BoundedInputStream}. 197 * <p> 198 * By default, a {@link BoundedInputStream} is <em>unbound</em>; so make sure to call {@link AbstractBuilder#setMaxCount(long)}. 199 * </p> 200 * <p> 201 * You can find out how many bytes this stream has seen so far by calling {@link BoundedInputStream#getCount()}. This value reflects bytes read and skipped. 202 * </p> 203 * <h2>Using a ServletInputStream</h2> 204 * <p> 205 * A {@code ServletInputStream} can block if you try to read content that isn't there 206 * because it doesn't know whether the content hasn't arrived yet or whether the content has finished. Initialize an {@link BoundedInputStream} with the 207 * {@code Content-Length} sent in the {@code ServletInputStream}'s header, this stop it from blocking, providing it's been sent with a correct content 208 * length in the first place. 209 * </p> 210 * <h2>Using NIO</h2> 211 * <pre>{@code 212 * BoundedInputStream s = BoundedInputStream.builder() 213 * .setPath(Paths.get("MyFile.xml")) 214 * .setMaxCount(1024) 215 * .setPropagateClose(false) 216 * .get(); 217 * } 218 * </pre> 219 * <h2>Using IO</h2> 220 * <pre>{@code 221 * BoundedInputStream s = BoundedInputStream.builder() 222 * .setFile(new File("MyFile.xml")) 223 * .setMaxCount(1024) 224 * .setPropagateClose(false) 225 * .get(); 226 * } 227 * </pre> 228 * <h2>Counting Bytes</h2> 229 * <p>You can set the running count when building, which is most useful when starting from another stream: 230 * <pre>{@code 231 * InputStream in = ...; 232 * BoundedInputStream s = BoundedInputStream.builder() 233 * .setInputStream(in) 234 * .setCount(12) 235 * .setMaxCount(1024) 236 * .setPropagateClose(false) 237 * .get(); 238 * } 239 * </pre> 240 * 241 * @see #get() 242 * @since 2.16.0 243 */ 244 //@formatter:on 245 public static class Builder extends AbstractBuilder<Builder> { 246 247 /** 248 * Constructs a new builder of {@link BoundedInputStream}. 249 */ 250 public Builder() { 251 // empty 252 } 253 254 /** 255 * Builds a new {@link BoundedInputStream}. 256 * <p> 257 * You must set an aspect that supports {@link #getInputStream()}, otherwise, this method throws an exception. 258 * </p> 259 * <p> 260 * If you start from an input stream, an exception can't be thrown, and you can call {@link #getUnchecked()} instead. 261 * </p> 262 * <p> 263 * This builder uses the following aspects: 264 * </p> 265 * <ul> 266 * <li>{@link #getInputStream()} gets the target aspect.</li> 267 * <li>{@link #getAfterRead()}</li> 268 * <li>{@code #getCount()}</li> 269 * <li>{@code #getMaxCount()}</li> 270 * <li>{@code #getOnMaxCount()}</li> 271 * <li>{@code #isPropagateClose()}</li> 272 * </ul> 273 * 274 * @return a new instance. 275 * @throws IllegalStateException if the {@code origin} is {@code null}. 276 * @throws UnsupportedOperationException if the origin cannot be converted to an {@link InputStream}. 277 * @throws IOException if an I/O error occurs converting to an {@link InputStream} using {@link #getInputStream()}. 278 * @see #getInputStream() 279 * @see #getUnchecked() 280 */ 281 @Override 282 public BoundedInputStream get() throws IOException { 283 return new BoundedInputStream(this); 284 } 285 286 } 287 288 /** 289 * Constructs a new {@link AbstractBuilder}. 290 * 291 * @return a new {@link AbstractBuilder}. 292 * @since 2.16.0 293 */ 294 public static Builder builder() { 295 return new Builder(); 296 } 297 298 /** The current count of bytes counted. */ 299 private long count; 300 301 /** The current mark. */ 302 private long mark; 303 304 /** The maximum count of bytes to read. */ 305 private final long maxCount; 306 307 private final IOBiConsumer<Long, Long> onMaxCount; 308 309 /** 310 * Flag if close should be propagated. 311 * 312 * TODO Make final in 3.0. 313 */ 314 private boolean propagateClose = true; 315 316 private BoundedInputStream(final Builder builder) throws IOException { 317 super(builder); 318 this.count = builder.getCount(); 319 this.maxCount = builder.getMaxCount(); 320 this.propagateClose = builder.isPropagateClose(); 321 this.onMaxCount = builder.getOnMaxCount(); 322 } 323 324 /** 325 * Constructs a new {@link BoundedInputStream} that wraps the given input stream and is <em>unbounded</em>. 326 * <p> 327 * To build an instance: Use the {@link #builder()} to access all features. 328 * </p> 329 * 330 * @param in The wrapped input stream. 331 * @deprecated Use {@link AbstractBuilder#get()}. 332 */ 333 @Deprecated 334 public BoundedInputStream(final InputStream in) { 335 this(in, EOF); 336 } 337 338 private BoundedInputStream(final InputStream inputStream, final Builder builder) { 339 super(inputStream, builder); 340 this.count = builder.getCount(); 341 this.maxCount = builder.getMaxCount(); 342 this.propagateClose = builder.isPropagateClose(); 343 this.onMaxCount = builder.getOnMaxCount(); 344 } 345 346 /** 347 * Constructs a new {@link BoundedInputStream} that wraps the given input stream and limits it to a certain size. 348 * 349 * @param inputStream The wrapped input stream. 350 * @param maxCount The maximum number of bytes to return, negative means unbound. 351 * @deprecated Use {@link AbstractBuilder#get()}. 352 */ 353 @Deprecated 354 public BoundedInputStream(final InputStream inputStream, final long maxCount) { 355 // Some badly designed methods, for example the Servlet API, overload length 356 // such that "-1" means stream finished 357 this(inputStream, builder().setMaxCount(maxCount)); 358 } 359 360 /** 361 * Adds the number of read bytes to the count. 362 * 363 * @param n number of bytes read, or -1 if no more bytes are available. 364 * @throws IOException Not thrown here but subclasses may throw. 365 * @since 2.0 366 */ 367 @Override 368 protected synchronized void afterRead(final int n) throws IOException { 369 if (n != EOF) { 370 count += n; 371 } 372 super.afterRead(n); 373 } 374 375 @Override 376 public int available() throws IOException { 377 // Safe cast: value is between 0 and Integer.MAX_VALUE 378 final int remaining = (int) Math.min(getRemaining(), Integer.MAX_VALUE); 379 return Math.min(super.available(), remaining); 380 } 381 382 /** 383 * Invokes the delegate's {@link InputStream#close()} method if {@link #isPropagateClose()} is {@code true}. 384 * 385 * @throws IOException if an I/O error occurs. 386 */ 387 @Override 388 public void close() throws IOException { 389 if (propagateClose) { 390 super.close(); 391 } 392 } 393 394 /** 395 * Gets the count of bytes read. 396 * 397 * @return The count of bytes read. 398 * @since 2.12.0 399 */ 400 public synchronized long getCount() { 401 return count; 402 } 403 404 /** 405 * Gets the maximum number of bytes to read. 406 * 407 * @return The maximum number of bytes to read, or {@value IOUtils#EOF} if unbounded. 408 * @since 2.16.0 409 */ 410 public long getMaxCount() { 411 return maxCount; 412 } 413 414 /** 415 * Gets the maximum count of bytes to read. 416 * 417 * @return The maximum count of bytes to read. 418 * @since 2.12.0 419 * @deprecated Use {@link #getMaxCount()}. 420 */ 421 @Deprecated 422 public long getMaxLength() { 423 return maxCount; 424 } 425 426 /** 427 * Gets the number of bytes remaining to read before the maximum is reached. 428 * 429 * <p> 430 * This method does <strong>not</strong> report the bytes available in the 431 * underlying stream; it only reflects the remaining allowance imposed by this 432 * {@code BoundedInputStream}. 433 * </p> 434 * 435 * @return The number of bytes remaining to read before the maximum is reached, 436 * or {@link Long#MAX_VALUE} if no bound is set. 437 * @since 2.16.0 438 */ 439 public long getRemaining() { 440 final long maxCount = getMaxCount(); 441 return maxCount == EOF ? Long.MAX_VALUE : Math.max(0, maxCount - getCount()); 442 } 443 444 private boolean isMaxCount() { 445 return maxCount >= 0 && getCount() >= maxCount; 446 } 447 448 /** 449 * Tests whether the {@link #close()} method should propagate to the underling {@link InputStream}. 450 * 451 * @return {@code true} if calling {@link #close()} propagates to the {@code close()} method of the underlying stream or {@code false} if it does not. 452 */ 453 public boolean isPropagateClose() { 454 return propagateClose; 455 } 456 457 /** 458 * Invokes the delegate's {@link InputStream#mark(int)} method. 459 * 460 * @param readLimit read ahead limit. 461 */ 462 @Override 463 public synchronized void mark(final int readLimit) { 464 in.mark(readLimit); 465 mark = count; 466 } 467 468 /** 469 * Invokes the delegate's {@link InputStream#markSupported()} method. 470 * 471 * @return true if mark is supported, otherwise false. 472 */ 473 @Override 474 public boolean markSupported() { 475 return in.markSupported(); 476 } 477 478 /** 479 * A caller has caused a request that would cross the {@code maxLength} boundary. 480 * <p> 481 * Delegates to the consumer set in {@link Builder#setOnMaxCount(IOBiConsumer)}. 482 * </p> 483 * 484 * @param max The maximum count of bytes to read. 485 * @param count The count of bytes read. 486 * @throws IOException Subclasses may throw. 487 * @since 2.12.0 488 */ 489 @SuppressWarnings("unused") 490 // TODO Rename to onMaxCount for 3.0 491 protected void onMaxLength(final long max, final long count) throws IOException { 492 onMaxCount.accept(max, count); 493 } 494 495 /** 496 * Invokes the delegate's {@link InputStream#read()} method if the current position is less than the limit. 497 * 498 * @return the byte read or -1 if the end of stream or the limit has been reached. 499 * @throws IOException if an I/O error occurs. 500 */ 501 @Override 502 public int read() throws IOException { 503 if (isMaxCount()) { 504 onMaxLength(maxCount, getCount()); 505 return EOF; 506 } 507 return super.read(); 508 } 509 510 /** 511 * Invokes the delegate's {@link InputStream#read(byte[])} method. 512 * 513 * @param b the buffer to read the bytes into. 514 * @return the number of bytes read or -1 if the end of stream or the limit has been reached. 515 * @throws IOException if an I/O error occurs. 516 */ 517 @Override 518 public int read(final byte[] b) throws IOException { 519 return read(b, 0, b.length); 520 } 521 522 /** 523 * Invokes the delegate's {@link InputStream#read(byte[], int, int)} method. 524 * 525 * @param b the buffer to read the bytes into. 526 * @param off The start offset. 527 * @param len The number of bytes to read. 528 * @return the number of bytes read or -1 if the end of stream or the limit has been reached. 529 * @throws IOException if an I/O error occurs. 530 */ 531 @Override 532 public int read(final byte[] b, final int off, final int len) throws IOException { 533 if (isMaxCount()) { 534 onMaxLength(maxCount, getCount()); 535 return EOF; 536 } 537 return super.read(b, off, (int) toReadLen(len)); 538 } 539 540 /** 541 * Invokes the delegate's {@link InputStream#reset()} method. 542 * 543 * @throws IOException if an I/O error occurs. 544 */ 545 @Override 546 public synchronized void reset() throws IOException { 547 in.reset(); 548 count = mark; 549 } 550 551 /** 552 * Sets whether the {@link #close()} method should propagate to the underling {@link InputStream}. 553 * 554 * @param propagateClose {@code true} if calling {@link #close()} propagates to the {@code close()} method of the underlying stream or {@code false} if it 555 * does not. 556 * @deprecated Use {@link AbstractBuilder#setPropagateClose(boolean)}. 557 */ 558 @Deprecated 559 public synchronized void setPropagateClose(final boolean propagateClose) { 560 this.propagateClose = propagateClose; 561 } 562 563 /** 564 * Invokes the delegate's {@link InputStream#skip(long)} method. 565 * 566 * @param n the number of bytes to skip. 567 * @return the actual number of bytes skipped. 568 * @throws IOException if an I/O error occurs. 569 */ 570 @Override 571 public synchronized long skip(final long n) throws IOException { 572 final long skip = super.skip(toReadLen(n)); 573 count += skip; 574 return skip; 575 } 576 577 /** 578 * Converts a request to read {@code len} bytes to a lower count if reading would put us over the limit. 579 * <p> 580 * If a {@code maxCount} is not set, then return max{@code maxCount}. 581 * </p> 582 * 583 * @param len The requested byte count. 584 * @return How many bytes to actually attempt to read. 585 */ 586 private long toReadLen(final long len) { 587 return maxCount >= 0 ? Math.min(len, maxCount - getCount()) : len; 588 } 589 590 /** 591 * Invokes the delegate's {@link InputStream#toString()} method. 592 * 593 * @return the delegate's {@link InputStream#toString()}. 594 */ 595 @Override 596 public String toString() { 597 return in.toString(); 598 } 599}