001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.commons.io.input; 018 019import static org.apache.commons.io.IOUtils.EOF; 020 021import java.io.IOException; 022import java.io.InputStream; 023 024import org.apache.commons.io.IOUtils; 025import org.apache.commons.io.function.IOBiConsumer; 026 027//@formatter:off 028/** 029 * Reads bytes up to a maximum count and stops once reached. 030 * <p> 031 * To build an instance: Use the {@link #builder()} to access all features. 032 * </p> 033 * <p> 034 * By default, a {@link BoundedInputStream} is <em>unbound</em>; so make sure to call {@link AbstractBuilder#setMaxCount(long)}. 035 * </p> 036 * <p> 037 * You can find out how many bytes this stream has seen so far by calling {@link BoundedInputStream#getCount()}. This value reflects bytes read and skipped. 038 * </p> 039 * <h2>Using a ServletInputStream</h2> 040 * <p> 041 * A {@code ServletInputStream} can block if you try to read content that isn't there 042 * because it doesn't know whether the content hasn't arrived yet or whether the content has finished. Initialize an {@link BoundedInputStream} with the 043 * {@code Content-Length} sent in the {@code ServletInputStream}'s header, this stop it from blocking, providing it's been sent with a correct content 044 * length in the first place. 045 * </p> 046 * <h2>Using NIO</h2> 047 * <pre>{@code 048 * BoundedInputStream s = BoundedInputStream.builder() 049 * .setPath(Paths.get("MyFile.xml")) 050 * .setMaxCount(1024) 051 * .setPropagateClose(false) 052 * .get(); 053 * } 054 * </pre> 055 * <h2>Using IO</h2> 056 * <pre>{@code 057 * BoundedInputStream s = BoundedInputStream.builder() 058 * .setFile(new File("MyFile.xml")) 059 * .setMaxCount(1024) 060 * .setPropagateClose(false) 061 * .get(); 062 * } 063 * </pre> 064 * <h2>Counting Bytes</h2> 065 * <p>You can set the running count when building, which is most useful when starting from another stream: 066 * <pre>{@code 067 * InputStream in = ...; 068 * BoundedInputStream s = BoundedInputStream.builder() 069 * .setInputStream(in) 070 * .setCount(12) 071 * .setMaxCount(1024) 072 * .setPropagateClose(false) 073 * .get(); 074 * } 075 * </pre> 076 * <h2>Listening for the max count reached</h2> 077 * <pre>{@code 078 * BoundedInputStream s = BoundedInputStream.builder() 079 * .setPath(Paths.get("MyFile.xml")) 080 * .setMaxCount(1024) 081 * .setOnMaxCount((max, count) -> System.out.printf("Max count %,d reached with a last read count of %,d%n", max, count)) 082 * .get(); 083 * } 084 * </pre> 085 * @see Builder 086 * @since 2.0 087 */ 088//@formatter:on 089public class BoundedInputStream extends ProxyInputStream { 090 091 /** 092 * For subclassing builders from {@link BoundedInputStream} subclassses. 093 * 094 * @param <T> The subclass. 095 */ 096 abstract static class AbstractBuilder<T extends AbstractBuilder<T>> extends ProxyInputStream.AbstractBuilder<BoundedInputStream, T> { 097 098 /** The current count of bytes counted. */ 099 private long count; 100 101 /** The max count of bytes to read. */ 102 private long maxCount = EOF; 103 104 private IOBiConsumer<Long, Long> onMaxCount = IOBiConsumer.noop(); 105 106 /** Flag if {@link #close()} should be propagated, {@code true} by default. */ 107 private boolean propagateClose = true; 108 109 long getCount() { 110 return count; 111 } 112 113 long getMaxCount() { 114 return maxCount; 115 } 116 117 IOBiConsumer<Long, Long> getOnMaxCount() { 118 return onMaxCount; 119 } 120 121 boolean isPropagateClose() { 122 return propagateClose; 123 } 124 125 /** 126 * Sets the current number of bytes counted. 127 * <p> 128 * Useful when building from another stream to carry forward a read count. 129 * </p> 130 * <p> 131 * Default is {@code 0}, negative means 0. 132 * </p> 133 * 134 * @param count The current number of bytes counted. 135 * @return {@code this} instance. 136 */ 137 public T setCount(final long count) { 138 this.count = Math.max(0, count); 139 return asThis(); 140 } 141 142 /** 143 * Sets the maximum number of bytes to return. 144 * <p> 145 * Default is {@value IOUtils#EOF}, negative means unbound. 146 * </p> 147 * 148 * @param maxCount The maximum number of bytes to return, negative means unbound. 149 * @return {@code this} instance. 150 */ 151 public T setMaxCount(final long maxCount) { 152 this.maxCount = Math.max(EOF, maxCount); 153 return asThis(); 154 } 155 156 /** 157 * Sets the default {@link BoundedInputStream#onMaxLength(long, long)} behavior, {@code null} resets to a NOOP. 158 * <p> 159 * The first Long is the max count of bytes to read. The second Long is the count of bytes read. 160 * </p> 161 * <p> 162 * This does <em>not</em> override a {@code BoundedInputStream} subclass' implementation of the {@link BoundedInputStream#onMaxLength(long, long)} 163 * method. 164 * </p> 165 * 166 * @param onMaxCount the {@link ProxyInputStream#afterRead(int)} behavior. 167 * @return this instance. 168 * @since 2.18.0 169 */ 170 public T setOnMaxCount(final IOBiConsumer<Long, Long> onMaxCount) { 171 this.onMaxCount = onMaxCount != null ? onMaxCount : IOBiConsumer.noop(); 172 return asThis(); 173 } 174 175 /** 176 * Sets whether the {@link #close()} method should propagate to the underling {@link InputStream}. 177 * <p> 178 * Default is {@code true}. 179 * </p> 180 * 181 * @param propagateClose {@code true} if calling {@link #close()} propagates to the {@code close()} method of the underlying stream or {@code false} if 182 * it does not. 183 * @return {@code this} instance. 184 */ 185 public T setPropagateClose(final boolean propagateClose) { 186 this.propagateClose = propagateClose; 187 return asThis(); 188 } 189 190 } 191 192 //@formatter:off 193 /** 194 * Builds a new {@link BoundedInputStream}. 195 * <p> 196 * By default, a {@link BoundedInputStream} is <em>unbound</em>; so make sure to call {@link AbstractBuilder#setMaxCount(long)}. 197 * </p> 198 * <p> 199 * You can find out how many bytes this stream has seen so far by calling {@link BoundedInputStream#getCount()}. This value reflects bytes read and skipped. 200 * </p> 201 * <h2>Using a ServletInputStream</h2> 202 * <p> 203 * A {@code ServletInputStream} can block if you try to read content that isn't there 204 * because it doesn't know whether the content hasn't arrived yet or whether the content has finished. Initialize an {@link BoundedInputStream} with the 205 * {@code Content-Length} sent in the {@code ServletInputStream}'s header, this stop it from blocking, providing it's been sent with a correct content 206 * length in the first place. 207 * </p> 208 * <h2>Using NIO</h2> 209 * <pre>{@code 210 * BoundedInputStream s = BoundedInputStream.builder() 211 * .setPath(Paths.get("MyFile.xml")) 212 * .setMaxCount(1024) 213 * .setPropagateClose(false) 214 * .get(); 215 * } 216 * </pre> 217 * <h2>Using IO</h2> 218 * <pre>{@code 219 * BoundedInputStream s = BoundedInputStream.builder() 220 * .setFile(new File("MyFile.xml")) 221 * .setMaxCount(1024) 222 * .setPropagateClose(false) 223 * .get(); 224 * } 225 * </pre> 226 * <h2>Counting Bytes</h2> 227 * <p>You can set the running count when building, which is most useful when starting from another stream: 228 * <pre>{@code 229 * InputStream in = ...; 230 * BoundedInputStream s = BoundedInputStream.builder() 231 * .setInputStream(in) 232 * .setCount(12) 233 * .setMaxCount(1024) 234 * .setPropagateClose(false) 235 * .get(); 236 * } 237 * </pre> 238 * 239 * @see #get() 240 * @since 2.16.0 241 */ 242 //@formatter:on 243 public static class Builder extends AbstractBuilder<Builder> { 244 245 /** 246 * Constructs a new builder of {@link BoundedInputStream}. 247 */ 248 public Builder() { 249 // empty 250 } 251 252 /** 253 * Builds a new {@link BoundedInputStream}. 254 * <p> 255 * You must set an aspect that supports {@link #getInputStream()}, otherwise, this method throws an exception. 256 * </p> 257 * <p> 258 * If you start from an input stream, an exception can't be thrown, and you can call {@link #getUnchecked()} instead. 259 * </p> 260 * <p> 261 * This builder uses the following aspects: 262 * </p> 263 * <ul> 264 * <li>{@link #getInputStream()} gets the target aspect.</li> 265 * <li>{@link #getAfterRead()}</li> 266 * <li>{@link #getCount()}</li> 267 * <li>{@link #getMaxCount()}</li> 268 * <li>{@link #getOnMaxCount()}</li> 269 * <li>{@link #isPropagateClose()}</li> 270 * </ul> 271 * 272 * @return a new instance. 273 * @throws IllegalStateException if the {@code origin} is {@code null}. 274 * @throws UnsupportedOperationException if the origin cannot be converted to an {@link InputStream}. 275 * @throws IOException if an I/O error occurs converting to an {@link InputStream} using {@link #getInputStream()}. 276 * @see #getInputStream() 277 * @see #getUnchecked() 278 */ 279 @Override 280 public BoundedInputStream get() throws IOException { 281 return new BoundedInputStream(this); 282 } 283 284 } 285 286 /** 287 * Constructs a new {@link AbstractBuilder}. 288 * 289 * @return a new {@link AbstractBuilder}. 290 * @since 2.16.0 291 */ 292 public static Builder builder() { 293 return new Builder(); 294 } 295 296 /** The current count of bytes counted. */ 297 private long count; 298 299 /** The current mark. */ 300 private long mark; 301 302 /** The max count of bytes to read. */ 303 private final long maxCount; 304 305 private final IOBiConsumer<Long, Long> onMaxCount; 306 307 /** 308 * Flag if close should be propagated. 309 * 310 * TODO Make final in 3.0. 311 */ 312 private boolean propagateClose = true; 313 314 BoundedInputStream(final Builder builder) throws IOException { 315 super(builder); 316 this.count = builder.getCount(); 317 this.maxCount = builder.getMaxCount(); 318 this.propagateClose = builder.isPropagateClose(); 319 this.onMaxCount = builder.getOnMaxCount(); 320 } 321 322 /** 323 * Constructs a new {@link BoundedInputStream} that wraps the given input stream and is <em>unbounded</em>. 324 * <p> 325 * To build an instance: Use the {@link #builder()} to access all features. 326 * </p> 327 * 328 * @param in The wrapped input stream. 329 * @deprecated Use {@link AbstractBuilder#get()}. 330 */ 331 @Deprecated 332 public BoundedInputStream(final InputStream in) { 333 this(in, EOF); 334 } 335 336 BoundedInputStream(final InputStream inputStream, final Builder builder) { 337 super(inputStream, builder); 338 this.count = builder.getCount(); 339 this.maxCount = builder.getMaxCount(); 340 this.propagateClose = builder.isPropagateClose(); 341 this.onMaxCount = builder.getOnMaxCount(); 342 } 343 344 /** 345 * Constructs a new {@link BoundedInputStream} that wraps the given input stream and limits it to a certain size. 346 * 347 * @param inputStream The wrapped input stream. 348 * @param maxCount The maximum number of bytes to return, negative means unbound. 349 * @deprecated Use {@link AbstractBuilder#get()}. 350 */ 351 @Deprecated 352 public BoundedInputStream(final InputStream inputStream, final long maxCount) { 353 // Some badly designed methods - e.g. the Servlet API - overload length 354 // such that "-1" means stream finished 355 this(inputStream, builder().setMaxCount(maxCount)); 356 } 357 358 /** 359 * Adds the number of read bytes to the count. 360 * 361 * @param n number of bytes read, or -1 if no more bytes are available 362 * @throws IOException Not thrown here but subclasses may throw. 363 * @since 2.0 364 */ 365 @Override 366 protected synchronized void afterRead(final int n) throws IOException { 367 if (n != EOF) { 368 count += n; 369 } 370 super.afterRead(n); 371 } 372 373 /** 374 * {@inheritDoc} 375 */ 376 @Override 377 public int available() throws IOException { 378 if (isMaxCount()) { 379 onMaxLength(maxCount, getCount()); 380 return 0; 381 } 382 return in.available(); 383 } 384 385 /** 386 * Invokes the delegate's {@link InputStream#close()} method if {@link #isPropagateClose()} is {@code true}. 387 * 388 * @throws IOException if an I/O error occurs. 389 */ 390 @Override 391 public void close() throws IOException { 392 if (propagateClose) { 393 super.close(); 394 } 395 } 396 397 /** 398 * Gets the count of bytes read. 399 * 400 * @return The count of bytes read. 401 * @since 2.12.0 402 */ 403 public synchronized long getCount() { 404 return count; 405 } 406 407 /** 408 * Gets the max count of bytes to read. 409 * 410 * @return The max count of bytes to read. 411 * @since 2.16.0 412 */ 413 public long getMaxCount() { 414 return maxCount; 415 } 416 417 /** 418 * Gets the max count of bytes to read. 419 * 420 * @return The max count of bytes to read. 421 * @since 2.12.0 422 * @deprecated Use {@link #getMaxCount()}. 423 */ 424 @Deprecated 425 public long getMaxLength() { 426 return maxCount; 427 } 428 429 /** 430 * Gets how many bytes remain to read. 431 * 432 * @return bytes how many bytes remain to read. 433 * @since 2.16.0 434 */ 435 public long getRemaining() { 436 return Math.max(0, getMaxCount() - getCount()); 437 } 438 439 private boolean isMaxCount() { 440 return maxCount >= 0 && getCount() >= maxCount; 441 } 442 443 /** 444 * Tests whether the {@link #close()} method should propagate to the underling {@link InputStream}. 445 * 446 * @return {@code true} if calling {@link #close()} propagates to the {@code close()} method of the underlying stream or {@code false} if it does not. 447 */ 448 public boolean isPropagateClose() { 449 return propagateClose; 450 } 451 452 /** 453 * Invokes the delegate's {@link InputStream#mark(int)} method. 454 * 455 * @param readLimit read ahead limit 456 */ 457 @Override 458 public synchronized void mark(final int readLimit) { 459 in.mark(readLimit); 460 mark = count; 461 } 462 463 /** 464 * Invokes the delegate's {@link InputStream#markSupported()} method. 465 * 466 * @return true if mark is supported, otherwise false 467 */ 468 @Override 469 public boolean markSupported() { 470 return in.markSupported(); 471 } 472 473 /** 474 * A caller has caused a request that would cross the {@code maxLength} boundary. 475 * <p> 476 * Delegates to the consumer set in {@link Builder#setOnMaxCount(IOBiConsumer)}. 477 * </p> 478 * 479 * @param max The max count of bytes to read. 480 * @param count The count of bytes read. 481 * @throws IOException Subclasses may throw. 482 * @since 2.12.0 483 */ 484 @SuppressWarnings("unused") 485 // TODO Rename to onMaxCount for 3.0 486 protected void onMaxLength(final long max, final long count) throws IOException { 487 onMaxCount.accept(max, count); 488 } 489 490 /** 491 * Invokes the delegate's {@link InputStream#read()} method if the current position is less than the limit. 492 * 493 * @return the byte read or -1 if the end of stream or the limit has been reached. 494 * @throws IOException if an I/O error occurs. 495 */ 496 @Override 497 public int read() throws IOException { 498 if (isMaxCount()) { 499 onMaxLength(maxCount, getCount()); 500 return EOF; 501 } 502 return super.read(); 503 } 504 505 /** 506 * Invokes the delegate's {@link InputStream#read(byte[])} method. 507 * 508 * @param b the buffer to read the bytes into 509 * @return the number of bytes read or -1 if the end of stream or the limit has been reached. 510 * @throws IOException if an I/O error occurs. 511 */ 512 @Override 513 public int read(final byte[] b) throws IOException { 514 return read(b, 0, b.length); 515 } 516 517 /** 518 * Invokes the delegate's {@link InputStream#read(byte[], int, int)} method. 519 * 520 * @param b the buffer to read the bytes into 521 * @param off The start offset 522 * @param len The number of bytes to read 523 * @return the number of bytes read or -1 if the end of stream or the limit has been reached. 524 * @throws IOException if an I/O error occurs. 525 */ 526 @Override 527 public int read(final byte[] b, final int off, final int len) throws IOException { 528 if (isMaxCount()) { 529 onMaxLength(maxCount, getCount()); 530 return EOF; 531 } 532 return super.read(b, off, (int) toReadLen(len)); 533 } 534 535 /** 536 * Invokes the delegate's {@link InputStream#reset()} method. 537 * 538 * @throws IOException if an I/O error occurs. 539 */ 540 @Override 541 public synchronized void reset() throws IOException { 542 in.reset(); 543 count = mark; 544 } 545 546 /** 547 * Sets whether the {@link #close()} method should propagate to the underling {@link InputStream}. 548 * 549 * @param propagateClose {@code true} if calling {@link #close()} propagates to the {@code close()} method of the underlying stream or {@code false} if it 550 * does not. 551 * @deprecated Use {@link AbstractBuilder#setPropagateClose(boolean)}. 552 */ 553 @Deprecated 554 public void setPropagateClose(final boolean propagateClose) { 555 this.propagateClose = propagateClose; 556 } 557 558 /** 559 * Invokes the delegate's {@link InputStream#skip(long)} method. 560 * 561 * @param n the number of bytes to skip 562 * @return the actual number of bytes skipped 563 * @throws IOException if an I/O error occurs. 564 */ 565 @Override 566 public synchronized long skip(final long n) throws IOException { 567 final long skip = super.skip(toReadLen(n)); 568 count += skip; 569 return skip; 570 } 571 572 private long toReadLen(final long len) { 573 return maxCount >= 0 ? Math.min(len, maxCount - getCount()) : len; 574 } 575 576 /** 577 * Invokes the delegate's {@link InputStream#toString()} method. 578 * 579 * @return the delegate's {@link InputStream#toString()} 580 */ 581 @Override 582 public String toString() { 583 return in.toString(); 584 } 585}