001/* 002 * Licensed under the Apache License, Version 2.0 (the "License"); 003 * you may not use this file except in compliance with the License. 004 * You may obtain a copy of the License at 005 * 006 * https://www.apache.org/licenses/LICENSE-2.0 007 * 008 * Unless required by applicable law or agreed to in writing, software 009 * distributed under the License is distributed on an "AS IS" BASIS, 010 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 011 * See the License for the specific language governing permissions and 012 * limitations under the License. 013 */ 014package org.apache.commons.io.input; 015 016import static org.apache.commons.io.IOUtils.EOF; 017 018// import javax.annotation.concurrent.GuardedBy; 019import java.io.EOFException; 020import java.io.FilterInputStream; 021import java.io.IOException; 022import java.io.InputStream; 023import java.io.InterruptedIOException; 024import java.nio.ByteBuffer; 025import java.util.Objects; 026import java.util.concurrent.ExecutorService; 027import java.util.concurrent.Executors; 028import java.util.concurrent.TimeUnit; 029import java.util.concurrent.atomic.AtomicBoolean; 030import java.util.concurrent.locks.Condition; 031import java.util.concurrent.locks.ReentrantLock; 032 033import org.apache.commons.io.build.AbstractStreamBuilder; 034 035/** 036 * Implements {@link InputStream} to asynchronously read ahead from an underlying input stream when a specified amount of data has been read from the current 037 * buffer. It does so by maintaining two buffers: an active buffer and a read ahead buffer. The active buffer contains data which should be returned when a 038 * read() call is issued. The read ahead buffer is used to asynchronously read from the underlying input stream. When the current active buffer is exhausted, we 039 * flip the two buffers so that we can start reading from the read ahead buffer without being blocked by disk I/O. 040 * <p> 041 * To build an instance, use {@link Builder}. 042 * </p> 043 * <p> 044 * This class was ported and adapted from Apache Spark commit 933dc6cb7b3de1d8ccaf73d124d6eb95b947ed19. 045 * </p> 046 * 047 * @see Builder 048 * @since 2.9.0 049 */ 050public class ReadAheadInputStream extends FilterInputStream { 051 052 // @formatter:off 053 /** 054 * Builds a new {@link ReadAheadInputStream}. 055 * 056 * <p> 057 * For example: 058 * </p> 059 * <pre>{@code 060 * ReadAheadInputStream s = ReadAheadInputStream.builder() 061 * .setPath(path) 062 * .setExecutorService(Executors.newSingleThreadExecutor(ReadAheadInputStream::newThread)) 063 * .get();} 064 * </pre> 065 * 066 * @see #get() 067 * @since 2.12.0 068 */ 069 // @formatter:on 070 public static class Builder extends AbstractStreamBuilder<ReadAheadInputStream, Builder> { 071 072 private ExecutorService executorService; 073 074 /** 075 * Constructs a new builder of {@link ReadAheadInputStream}. 076 */ 077 public Builder() { 078 // empty 079 } 080 081 /** 082 * Builds a new {@link ReadAheadInputStream}. 083 * <p> 084 * You must set an aspect that supports {@link #getInputStream()}, otherwise, this method throws an exception. 085 * </p> 086 * <p> 087 * This builder uses the following aspects: 088 * </p> 089 * <ul> 090 * <li>{@link #getInputStream()} gets the target aspect.</li> 091 * <li>{@link #getBufferSize()}</li> 092 * <li>{@link ExecutorService}</li> 093 * </ul> 094 * 095 * @return a new instance. 096 * @throws IllegalStateException if the {@code origin} is {@code null}. 097 * @throws UnsupportedOperationException if the origin cannot be converted to an {@link InputStream}. 098 * @throws IOException if an I/O error occurs converting to an {@link InputStream} using {@link #getInputStream()}. 099 * @see #getInputStream() 100 * @see #getBufferSize() 101 * @see #getUnchecked() 102 */ 103 @Override 104 public ReadAheadInputStream get() throws IOException { 105 return new ReadAheadInputStream(this); 106 } 107 108 /** 109 * Sets the executor service for the read-ahead thread. 110 * 111 * @param executorService the executor service for the read-ahead thread. 112 * @return {@code this} instance. 113 */ 114 public Builder setExecutorService(final ExecutorService executorService) { 115 this.executorService = executorService; 116 return this; 117 } 118 119 } 120 121 private static final ThreadLocal<byte[]> BYTE_ARRAY_1 = ThreadLocal.withInitial(() -> new byte[1]); 122 123 /** 124 * Constructs a new {@link Builder}. 125 * 126 * @return a new {@link Builder}. 127 * @since 2.12.0 128 */ 129 public static Builder builder() { 130 return new Builder(); 131 } 132 133 /** 134 * Constructs a new daemon thread. 135 * 136 * @param r the thread's runnable. 137 * @return a new daemon thread. 138 */ 139 private static Thread newDaemonThread(final Runnable r) { 140 final Thread thread = new Thread(r, "commons-io-read-ahead"); 141 thread.setDaemon(true); 142 return thread; 143 } 144 145 /** 146 * Constructs a new daemon executor service. 147 * 148 * @return a new daemon executor service. 149 */ 150 private static ExecutorService newExecutorService() { 151 return Executors.newSingleThreadExecutor(ReadAheadInputStream::newDaemonThread); 152 } 153 154 private final ReentrantLock stateChangeLock = new ReentrantLock(); 155 156 // @GuardedBy("stateChangeLock") 157 private ByteBuffer activeBuffer; 158 159 // @GuardedBy("stateChangeLock") 160 private ByteBuffer readAheadBuffer; 161 162 // @GuardedBy("stateChangeLock") 163 private boolean endOfStream; 164 165 // @GuardedBy("stateChangeLock") 166 // true if async read is in progress 167 private boolean readInProgress; 168 169 // @GuardedBy("stateChangeLock") 170 // true if read is aborted due to an exception in reading from underlying input stream. 171 private boolean readAborted; 172 173 // @GuardedBy("stateChangeLock") 174 private Throwable readException; 175 176 // @GuardedBy("stateChangeLock") 177 // whether the close method is called. 178 private boolean isClosed; 179 180 // @GuardedBy("stateChangeLock") 181 // true when the close method will close the underlying input stream. This is valid only if 182 // `isClosed` is true. 183 private boolean isUnderlyingInputStreamBeingClosed; 184 185 // @GuardedBy("stateChangeLock") 186 // whether there is a read ahead task running, 187 private boolean isReading; 188 189 // Whether there is a reader waiting for data. 190 private final AtomicBoolean isWaiting = new AtomicBoolean(); 191 192 private final ExecutorService executorService; 193 194 private final boolean shutdownExecutorService; 195 196 private final Condition asyncReadComplete = stateChangeLock.newCondition(); 197 198 @SuppressWarnings("resource") 199 private ReadAheadInputStream(final Builder builder) throws IOException { 200 this(builder.getInputStream(), builder.getBufferSize(), builder.executorService != null ? builder.executorService : newExecutorService(), 201 builder.executorService == null); 202 } 203 204 /** 205 * Constructs an instance with the specified buffer size and read-ahead threshold 206 * 207 * @param inputStream The underlying input stream. 208 * @param bufferSizeInBytes The buffer size. 209 * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()} 210 */ 211 @Deprecated 212 public ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes) { 213 this(inputStream, bufferSizeInBytes, newExecutorService(), true); 214 } 215 216 /** 217 * Constructs an instance with the specified buffer size and read-ahead threshold 218 * 219 * @param inputStream The underlying input stream. 220 * @param bufferSizeInBytes The buffer size. 221 * @param executorService An executor service for the read-ahead thread. 222 * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()} 223 */ 224 @Deprecated 225 public ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes, final ExecutorService executorService) { 226 this(inputStream, bufferSizeInBytes, executorService, false); 227 } 228 229 /** 230 * Constructs an instance with the specified buffer size and read-ahead threshold 231 * 232 * @param inputStream The underlying input stream. 233 * @param bufferSizeInBytes The buffer size. 234 * @param executorService An executor service for the read-ahead thread. 235 * @param shutdownExecutorService Whether or not to shut down the given ExecutorService on close. 236 */ 237 private ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes, final ExecutorService executorService, 238 final boolean shutdownExecutorService) { 239 super(Objects.requireNonNull(inputStream, "inputStream")); 240 if (bufferSizeInBytes <= 0) { 241 throw new IllegalArgumentException("bufferSizeInBytes should be greater than 0, but the value is " + bufferSizeInBytes); 242 } 243 this.executorService = Objects.requireNonNull(executorService, "executorService"); 244 this.shutdownExecutorService = shutdownExecutorService; 245 this.activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); 246 this.readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); 247 this.activeBuffer.flip(); 248 this.readAheadBuffer.flip(); 249 } 250 251 @Override 252 public int available() throws IOException { 253 stateChangeLock.lock(); 254 // Make sure we have no integer overflow. 255 try { 256 return (int) Math.min(Integer.MAX_VALUE, (long) activeBuffer.remaining() + readAheadBuffer.remaining()); 257 } finally { 258 stateChangeLock.unlock(); 259 } 260 } 261 262 private void checkReadException() throws IOException { 263 if (readAborted) { 264 if (readException instanceof IOException) { 265 throw (IOException) readException; 266 } 267 throw new IOException(readException); 268 } 269 } 270 271 @Override 272 public void close() throws IOException { 273 boolean isSafeToCloseUnderlyingInputStream = false; 274 stateChangeLock.lock(); 275 try { 276 if (isClosed) { 277 return; 278 } 279 isClosed = true; 280 if (!isReading) { 281 // Nobody is reading, so we can close the underlying input stream in this method. 282 isSafeToCloseUnderlyingInputStream = true; 283 // Flip this to make sure the read ahead task will not close the underlying input stream. 284 isUnderlyingInputStreamBeingClosed = true; 285 } 286 } finally { 287 stateChangeLock.unlock(); 288 } 289 290 if (shutdownExecutorService) { 291 try { 292 executorService.shutdownNow(); 293 executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); 294 } catch (final InterruptedException e) { 295 final InterruptedIOException iio = new InterruptedIOException(e.getMessage()); 296 iio.initCause(e); 297 throw iio; 298 } finally { 299 if (isSafeToCloseUnderlyingInputStream) { 300 super.close(); 301 } 302 } 303 } 304 } 305 306 private void closeUnderlyingInputStreamIfNecessary() { 307 boolean needToCloseUnderlyingInputStream = false; 308 stateChangeLock.lock(); 309 try { 310 isReading = false; 311 if (isClosed && !isUnderlyingInputStreamBeingClosed) { 312 // close method cannot close underlyingInputStream because we were reading. 313 needToCloseUnderlyingInputStream = true; 314 } 315 } finally { 316 stateChangeLock.unlock(); 317 } 318 if (needToCloseUnderlyingInputStream) { 319 try { 320 super.close(); 321 } catch (final IOException ignored) { 322 // TODO Rethrow as UncheckedIOException? 323 } 324 } 325 } 326 327 private boolean isEndOfStream() { 328 return !activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream; 329 } 330 331 @Override 332 public int read() throws IOException { 333 if (activeBuffer.hasRemaining()) { 334 // short path - just get one byte. 335 return activeBuffer.get() & 0xFF; 336 } 337 final byte[] oneByteArray = BYTE_ARRAY_1.get(); 338 oneByteArray[0] = 0; 339 return read(oneByteArray, 0, 1) == EOF ? EOF : oneByteArray[0] & 0xFF; 340 } 341 342 @Override 343 public int read(final byte[] b, final int offset, int len) throws IOException { 344 if (offset < 0 || len < 0 || len > b.length - offset) { 345 throw new IndexOutOfBoundsException(); 346 } 347 if (len == 0) { 348 return 0; 349 } 350 351 if (!activeBuffer.hasRemaining()) { 352 // No remaining in active buffer - lock and switch to write ahead buffer. 353 stateChangeLock.lock(); 354 try { 355 waitForAsyncReadComplete(); 356 if (!readAheadBuffer.hasRemaining()) { 357 // The first read. 358 readAsync(); 359 waitForAsyncReadComplete(); 360 if (isEndOfStream()) { 361 return EOF; 362 } 363 } 364 // Swap the newly read ahead buffer in place of empty active buffer. 365 swapBuffers(); 366 // After swapping buffers, trigger another async read for read ahead buffer. 367 readAsync(); 368 } finally { 369 stateChangeLock.unlock(); 370 } 371 } 372 len = Math.min(len, activeBuffer.remaining()); 373 activeBuffer.get(b, offset, len); 374 375 return len; 376 } 377 378 /** 379 * Reads data from underlyingInputStream to readAheadBuffer asynchronously. 380 * 381 * @throws IOException if an I/O error occurs. 382 */ 383 private void readAsync() throws IOException { 384 stateChangeLock.lock(); 385 final byte[] arr; 386 try { 387 arr = readAheadBuffer.array(); 388 if (endOfStream || readInProgress) { 389 return; 390 } 391 checkReadException(); 392 readAheadBuffer.position(0); 393 readAheadBuffer.flip(); 394 readInProgress = true; 395 } finally { 396 stateChangeLock.unlock(); 397 } 398 executorService.execute(() -> { 399 stateChangeLock.lock(); 400 try { 401 if (isClosed) { 402 readInProgress = false; 403 return; 404 } 405 // Flip this so that the close method will not close the underlying input stream when we 406 // are reading. 407 isReading = true; 408 } finally { 409 stateChangeLock.unlock(); 410 } 411 412 // Please note that it is safe to release the lock and read into the read ahead buffer 413 // because either of following two conditions will hold: 414 // 415 // 1. The active buffer has data available to read so the reader will not read from the read ahead buffer. 416 // 417 // 2. This is the first time read is called or the active buffer is exhausted, in that case the reader waits 418 // for this async read to complete. 419 // 420 // So there is no race condition in both the situations. 421 int read = 0; 422 int off = 0; 423 int len = arr.length; 424 Throwable exception = null; 425 try { 426 // try to fill the read ahead buffer. 427 // if a reader is waiting, possibly return early. 428 do { 429 read = in.read(arr, off, len); 430 if (read <= 0) { 431 break; 432 } 433 off += read; 434 len -= read; 435 } while (len > 0 && !isWaiting.get()); 436 } catch (final Throwable ex) { 437 exception = ex; 438 if (ex instanceof Error) { 439 // `readException` may not be reported to the user. Rethrow Error to make sure at least 440 // The user can see Error in UncaughtExceptionHandler. 441 throw (Error) ex; 442 } 443 } finally { 444 stateChangeLock.lock(); 445 try { 446 readAheadBuffer.limit(off); 447 if (read < 0 || exception instanceof EOFException) { 448 endOfStream = true; 449 } else if (exception != null) { 450 readAborted = true; 451 readException = exception; 452 } 453 readInProgress = false; 454 signalAsyncReadComplete(); 455 } finally { 456 stateChangeLock.unlock(); 457 } 458 closeUnderlyingInputStreamIfNecessary(); 459 } 460 }); 461 } 462 463 private void signalAsyncReadComplete() { 464 stateChangeLock.lock(); 465 try { 466 asyncReadComplete.signalAll(); 467 } finally { 468 stateChangeLock.unlock(); 469 } 470 } 471 472 @Override 473 public long skip(final long n) throws IOException { 474 if (n <= 0L) { 475 return 0L; 476 } 477 if (n <= activeBuffer.remaining()) { 478 // Only skipping from active buffer is sufficient 479 activeBuffer.position((int) n + activeBuffer.position()); 480 return n; 481 } 482 stateChangeLock.lock(); 483 final long skipped; 484 try { 485 skipped = skipInternal(n); 486 } finally { 487 stateChangeLock.unlock(); 488 } 489 return skipped; 490 } 491 492 /** 493 * Internal skip function which should be called only from skip(). The assumption is that the stateChangeLock is already acquired in the caller before 494 * calling this function. 495 * 496 * @param n the number of bytes to be skipped. 497 * @return the actual number of bytes skipped. 498 * @throws IOException if an I/O error occurs. 499 */ 500 private long skipInternal(final long n) throws IOException { 501 if (!stateChangeLock.isLocked()) { 502 throw new IllegalStateException("Expected stateChangeLock to be locked"); 503 } 504 waitForAsyncReadComplete(); 505 if (isEndOfStream()) { 506 return 0; 507 } 508 if (available() >= n) { 509 // we can skip from the internal buffers 510 int toSkip = (int) n; 511 // We need to skip from both active buffer and read ahead buffer 512 toSkip -= activeBuffer.remaining(); 513 if (toSkip <= 0) { // skipping from activeBuffer already handled. 514 throw new IllegalStateException("Expected toSkip > 0, actual: " + toSkip); 515 } 516 activeBuffer.position(0); 517 activeBuffer.flip(); 518 readAheadBuffer.position(toSkip + readAheadBuffer.position()); 519 swapBuffers(); 520 // Trigger async read to emptied read ahead buffer. 521 readAsync(); 522 return n; 523 } 524 final int skippedBytes = available(); 525 final long toSkip = n - skippedBytes; 526 activeBuffer.position(0); 527 activeBuffer.flip(); 528 readAheadBuffer.position(0); 529 readAheadBuffer.flip(); 530 final long skippedFromInputStream = in.skip(toSkip); 531 readAsync(); 532 return skippedBytes + skippedFromInputStream; 533 } 534 535 /** 536 * Flips the active and read ahead buffer 537 */ 538 private void swapBuffers() { 539 final ByteBuffer temp = activeBuffer; 540 activeBuffer = readAheadBuffer; 541 readAheadBuffer = temp; 542 } 543 544 private void waitForAsyncReadComplete() throws IOException { 545 stateChangeLock.lock(); 546 try { 547 isWaiting.set(true); 548 // There is only one reader, and one writer, so the writer should signal only once, 549 // but a while loop checking the wake-up condition is still needed to avoid spurious wakeups. 550 while (readInProgress) { 551 asyncReadComplete.await(); 552 } 553 } catch (final InterruptedException e) { 554 final InterruptedIOException iio = new InterruptedIOException(e.getMessage()); 555 iio.initCause(e); 556 throw iio; 557 } finally { 558 try { 559 isWaiting.set(false); 560 } finally { 561 stateChangeLock.unlock(); 562 } 563 } 564 checkReadException(); 565 } 566}