001/* 002 * Licensed under the Apache License, Version 2.0 (the "License"); 003 * you may not use this file except in compliance with the License. 004 * You may obtain a copy of the License at 005 * 006 * https://www.apache.org/licenses/LICENSE-2.0 007 * 008 * Unless required by applicable law or agreed to in writing, software 009 * distributed under the License is distributed on an "AS IS" BASIS, 010 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 011 * See the License for the specific language governing permissions and 012 * limitations under the License. 013 */ 014package org.apache.commons.io.input; 015 016import static org.apache.commons.io.IOUtils.EOF; 017 018// import javax.annotation.concurrent.GuardedBy; 019import java.io.EOFException; 020import java.io.FilterInputStream; 021import java.io.IOException; 022import java.io.InputStream; 023import java.io.InterruptedIOException; 024import java.nio.ByteBuffer; 025import java.util.Objects; 026import java.util.concurrent.ExecutorService; 027import java.util.concurrent.Executors; 028import java.util.concurrent.TimeUnit; 029import java.util.concurrent.atomic.AtomicBoolean; 030import java.util.concurrent.locks.Condition; 031import java.util.concurrent.locks.ReentrantLock; 032 033import org.apache.commons.io.IOUtils; 034import org.apache.commons.io.build.AbstractStreamBuilder; 035 036/** 037 * Implements {@link InputStream} to asynchronously read ahead from an underlying input stream when a specified amount of data has been read from the current 038 * buffer. It does so by maintaining two buffers: an active buffer and a read ahead buffer. The active buffer contains data which should be returned when a 039 * read() call is issued. The read ahead buffer is used to asynchronously read from the underlying input stream. When the current active buffer is exhausted, we 040 * flip the two buffers so that we can start reading from the read ahead buffer without being blocked by disk I/O. 041 * <p> 042 * To build an instance, use {@link Builder}. 043 * </p> 044 * <p> 045 * This class was ported and adapted from Apache Spark commit 933dc6cb7b3de1d8ccaf73d124d6eb95b947ed19. 046 * </p> 047 * 048 * @see Builder 049 * @since 2.9.0 050 */ 051public class ReadAheadInputStream extends FilterInputStream { 052 053 // @formatter:off 054 /** 055 * Builds a new {@link ReadAheadInputStream}. 056 * 057 * <p> 058 * For example: 059 * </p> 060 * <pre>{@code 061 * ReadAheadInputStream s = ReadAheadInputStream.builder() 062 * .setPath(path) 063 * .setExecutorService(Executors.newSingleThreadExecutor(ReadAheadInputStream::newThread)) 064 * .get();} 065 * </pre> 066 * 067 * @see #get() 068 * @since 2.12.0 069 */ 070 // @formatter:on 071 public static class Builder extends AbstractStreamBuilder<ReadAheadInputStream, Builder> { 072 073 private ExecutorService executorService; 074 075 /** 076 * Constructs a new builder of {@link ReadAheadInputStream}. 077 */ 078 public Builder() { 079 // empty 080 } 081 082 /** 083 * Builds a new {@link ReadAheadInputStream}. 084 * <p> 085 * You must set an aspect that supports {@link #getInputStream()}, otherwise, this method throws an exception. 086 * </p> 087 * <p> 088 * This builder uses the following aspects: 089 * </p> 090 * <ul> 091 * <li>{@link #getInputStream()} gets the target aspect.</li> 092 * <li>{@link #getBufferSize()}</li> 093 * <li>{@link ExecutorService}</li> 094 * </ul> 095 * 096 * @return a new instance. 097 * @throws IllegalStateException if the {@code origin} is {@code null}. 098 * @throws UnsupportedOperationException if the origin cannot be converted to an {@link InputStream}. 099 * @throws IOException if an I/O error occurs converting to an {@link InputStream} using {@link #getInputStream()}. 100 * @see #getInputStream() 101 * @see #getBufferSize() 102 * @see #getUnchecked() 103 */ 104 @Override 105 public ReadAheadInputStream get() throws IOException { 106 return new ReadAheadInputStream(this); 107 } 108 109 /** 110 * Sets the executor service for the read-ahead thread. 111 * 112 * @param executorService the executor service for the read-ahead thread. 113 * @return {@code this} instance. 114 */ 115 public Builder setExecutorService(final ExecutorService executorService) { 116 this.executorService = executorService; 117 return this; 118 } 119 120 } 121 122 private static final ThreadLocal<byte[]> BYTE_ARRAY_1 = ThreadLocal.withInitial(() -> new byte[1]); 123 124 /** 125 * Constructs a new {@link Builder}. 126 * 127 * @return a new {@link Builder}. 128 * @since 2.12.0 129 */ 130 public static Builder builder() { 131 return new Builder(); 132 } 133 134 /** 135 * Constructs a new daemon thread. 136 * 137 * @param r the thread's runnable. 138 * @return a new daemon thread. 139 */ 140 private static Thread newDaemonThread(final Runnable r) { 141 final Thread thread = new Thread(r, "commons-io-read-ahead"); 142 thread.setDaemon(true); 143 return thread; 144 } 145 146 /** 147 * Constructs a new daemon executor service. 148 * 149 * @return a new daemon executor service. 150 */ 151 private static ExecutorService newExecutorService() { 152 return Executors.newSingleThreadExecutor(ReadAheadInputStream::newDaemonThread); 153 } 154 155 private final ReentrantLock stateChangeLock = new ReentrantLock(); 156 157 // @GuardedBy("stateChangeLock") 158 private ByteBuffer activeBuffer; 159 160 // @GuardedBy("stateChangeLock") 161 private ByteBuffer readAheadBuffer; 162 163 // @GuardedBy("stateChangeLock") 164 private boolean endOfStream; 165 166 // @GuardedBy("stateChangeLock") 167 // true if async read is in progress 168 private boolean readInProgress; 169 170 // @GuardedBy("stateChangeLock") 171 // true if read is aborted due to an exception in reading from underlying input stream. 172 private boolean readAborted; 173 174 // @GuardedBy("stateChangeLock") 175 private Throwable readException; 176 177 // @GuardedBy("stateChangeLock") 178 // whether the close method is called. 179 private boolean isClosed; 180 181 // @GuardedBy("stateChangeLock") 182 // true when the close method will close the underlying input stream. This is valid only if 183 // `isClosed` is true. 184 private boolean isUnderlyingInputStreamBeingClosed; 185 186 // @GuardedBy("stateChangeLock") 187 // whether there is a read ahead task running, 188 private boolean isReading; 189 190 // Whether there is a reader waiting for data. 191 private final AtomicBoolean isWaiting = new AtomicBoolean(); 192 193 private final ExecutorService executorService; 194 195 private final boolean shutdownExecutorService; 196 197 private final Condition asyncReadComplete = stateChangeLock.newCondition(); 198 199 @SuppressWarnings("resource") 200 private ReadAheadInputStream(final Builder builder) throws IOException { 201 this(builder.getInputStream(), builder.getBufferSize(), builder.executorService != null ? builder.executorService : newExecutorService(), 202 builder.executorService == null); 203 } 204 205 /** 206 * Constructs an instance with the specified buffer size and read-ahead threshold 207 * 208 * @param inputStream The underlying input stream. 209 * @param bufferSizeInBytes The buffer size. 210 * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()} 211 */ 212 @Deprecated 213 public ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes) { 214 this(inputStream, bufferSizeInBytes, newExecutorService(), true); 215 } 216 217 /** 218 * Constructs an instance with the specified buffer size and read-ahead threshold 219 * 220 * @param inputStream The underlying input stream. 221 * @param bufferSizeInBytes The buffer size. 222 * @param executorService An executor service for the read-ahead thread. 223 * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()} 224 */ 225 @Deprecated 226 public ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes, final ExecutorService executorService) { 227 this(inputStream, bufferSizeInBytes, executorService, false); 228 } 229 230 /** 231 * Constructs an instance with the specified buffer size and read-ahead threshold 232 * 233 * @param inputStream The underlying input stream. 234 * @param bufferSizeInBytes The buffer size. 235 * @param executorService An executor service for the read-ahead thread. 236 * @param shutdownExecutorService Whether or not to shut down the given ExecutorService on close. 237 */ 238 private ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes, final ExecutorService executorService, 239 final boolean shutdownExecutorService) { 240 super(Objects.requireNonNull(inputStream, "inputStream")); 241 if (bufferSizeInBytes <= 0) { 242 throw new IllegalArgumentException(String.format("bufferSizeInBytes <= 0, bufferSizeInBytes = %,d", bufferSizeInBytes)); 243 } 244 this.executorService = Objects.requireNonNull(executorService, "executorService"); 245 this.shutdownExecutorService = shutdownExecutorService; 246 this.activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); 247 this.readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); 248 this.activeBuffer.flip(); 249 this.readAheadBuffer.flip(); 250 } 251 252 @Override 253 public int available() throws IOException { 254 stateChangeLock.lock(); 255 // Make sure we have no integer overflow. 256 try { 257 return (int) Math.min(Integer.MAX_VALUE, (long) activeBuffer.remaining() + readAheadBuffer.remaining()); 258 } finally { 259 stateChangeLock.unlock(); 260 } 261 } 262 263 private void checkReadException() throws IOException { 264 if (readAborted) { 265 if (readException instanceof IOException) { 266 throw (IOException) readException; 267 } 268 throw new IOException(readException); 269 } 270 } 271 272 @Override 273 public void close() throws IOException { 274 boolean isSafeToCloseUnderlyingInputStream = false; 275 stateChangeLock.lock(); 276 try { 277 if (isClosed) { 278 return; 279 } 280 isClosed = true; 281 if (!isReading) { 282 // Nobody is reading, so we can close the underlying input stream in this method. 283 isSafeToCloseUnderlyingInputStream = true; 284 // Flip this to make sure the read ahead task will not close the underlying input stream. 285 isUnderlyingInputStreamBeingClosed = true; 286 } 287 } finally { 288 stateChangeLock.unlock(); 289 } 290 291 if (shutdownExecutorService) { 292 try { 293 executorService.shutdownNow(); 294 executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); 295 } catch (final InterruptedException e) { 296 final InterruptedIOException iio = new InterruptedIOException(e.getMessage()); 297 iio.initCause(e); 298 throw iio; 299 } finally { 300 if (isSafeToCloseUnderlyingInputStream) { 301 super.close(); 302 } 303 } 304 } 305 } 306 307 private void closeUnderlyingInputStreamIfNecessary() { 308 boolean needToCloseUnderlyingInputStream = false; 309 stateChangeLock.lock(); 310 try { 311 isReading = false; 312 if (isClosed && !isUnderlyingInputStreamBeingClosed) { 313 // close method cannot close underlyingInputStream because we were reading. 314 needToCloseUnderlyingInputStream = true; 315 } 316 } finally { 317 stateChangeLock.unlock(); 318 } 319 if (needToCloseUnderlyingInputStream) { 320 try { 321 super.close(); 322 } catch (final IOException ignored) { 323 // TODO Rethrow as UncheckedIOException? 324 } 325 } 326 } 327 328 private boolean isEndOfStream() { 329 return !activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream; 330 } 331 332 @Override 333 public int read() throws IOException { 334 if (activeBuffer.hasRemaining()) { 335 // short path - just get one byte. 336 return activeBuffer.get() & 0xFF; 337 } 338 final byte[] oneByteArray = BYTE_ARRAY_1.get(); 339 oneByteArray[0] = 0; 340 return read(oneByteArray, 0, 1) == EOF ? EOF : oneByteArray[0] & 0xFF; 341 } 342 343 @Override 344 public int read(final byte[] b, final int offset, int len) throws IOException { 345 IOUtils.checkFromIndexSize(b, offset, len); 346 if (len == 0) { 347 return 0; 348 } 349 350 if (!activeBuffer.hasRemaining()) { 351 // No remaining in active buffer - lock and switch to write ahead buffer. 352 stateChangeLock.lock(); 353 try { 354 waitForAsyncReadComplete(); 355 if (!readAheadBuffer.hasRemaining()) { 356 // The first read. 357 readAsync(); 358 waitForAsyncReadComplete(); 359 if (isEndOfStream()) { 360 return EOF; 361 } 362 } 363 // Swap the newly read ahead buffer in place of empty active buffer. 364 swapBuffers(); 365 // After swapping buffers, trigger another async read for read ahead buffer. 366 readAsync(); 367 } finally { 368 stateChangeLock.unlock(); 369 } 370 } 371 len = Math.min(len, activeBuffer.remaining()); 372 activeBuffer.get(b, offset, len); 373 374 return len; 375 } 376 377 /** 378 * Reads data from underlyingInputStream to readAheadBuffer asynchronously. 379 * 380 * @throws IOException if an I/O error occurs. 381 */ 382 private void readAsync() throws IOException { 383 stateChangeLock.lock(); 384 final byte[] arr; 385 try { 386 arr = readAheadBuffer.array(); 387 if (endOfStream || readInProgress) { 388 return; 389 } 390 checkReadException(); 391 readAheadBuffer.position(0); 392 readAheadBuffer.flip(); 393 readInProgress = true; 394 } finally { 395 stateChangeLock.unlock(); 396 } 397 executorService.execute(() -> { 398 stateChangeLock.lock(); 399 try { 400 if (isClosed) { 401 readInProgress = false; 402 return; 403 } 404 // Flip this so that the close method will not close the underlying input stream when we 405 // are reading. 406 isReading = true; 407 } finally { 408 stateChangeLock.unlock(); 409 } 410 411 // Please note that it is safe to release the lock and read into the read ahead buffer 412 // because either of following two conditions will hold: 413 // 414 // 1. The active buffer has data available to read so the reader will not read from the read ahead buffer. 415 // 416 // 2. This is the first time read is called or the active buffer is exhausted, in that case the reader waits 417 // for this async read to complete. 418 // 419 // So there is no race condition in both the situations. 420 int read = 0; 421 int off = 0; 422 int len = arr.length; 423 Throwable exception = null; 424 try { 425 // try to fill the read ahead buffer. 426 // if a reader is waiting, possibly return early. 427 do { 428 read = in.read(arr, off, len); 429 if (read <= 0) { 430 break; 431 } 432 off += read; 433 len -= read; 434 } while (len > 0 && !isWaiting.get()); 435 } catch (final Throwable ex) { 436 exception = ex; 437 if (ex instanceof Error) { 438 // `readException` may not be reported to the user. Rethrow Error to make sure at least 439 // The user can see Error in UncaughtExceptionHandler. 440 throw (Error) ex; 441 } 442 } finally { 443 stateChangeLock.lock(); 444 try { 445 readAheadBuffer.limit(off); 446 if (read < 0 || exception instanceof EOFException) { 447 endOfStream = true; 448 } else if (exception != null) { 449 readAborted = true; 450 readException = exception; 451 } 452 readInProgress = false; 453 signalAsyncReadComplete(); 454 } finally { 455 stateChangeLock.unlock(); 456 } 457 closeUnderlyingInputStreamIfNecessary(); 458 } 459 }); 460 } 461 462 private void signalAsyncReadComplete() { 463 stateChangeLock.lock(); 464 try { 465 asyncReadComplete.signalAll(); 466 } finally { 467 stateChangeLock.unlock(); 468 } 469 } 470 471 @Override 472 public long skip(final long n) throws IOException { 473 if (n <= 0L) { 474 return 0L; 475 } 476 if (n <= activeBuffer.remaining()) { 477 // Only skipping from active buffer is sufficient 478 activeBuffer.position((int) n + activeBuffer.position()); 479 return n; 480 } 481 stateChangeLock.lock(); 482 final long skipped; 483 try { 484 skipped = skipInternal(n); 485 } finally { 486 stateChangeLock.unlock(); 487 } 488 return skipped; 489 } 490 491 /** 492 * Internal skip function which should be called only from skip(). The assumption is that the stateChangeLock is already acquired in the caller before 493 * calling this function. 494 * 495 * @param n the number of bytes to be skipped. 496 * @return the actual number of bytes skipped. 497 * @throws IOException if an I/O error occurs. 498 */ 499 private long skipInternal(final long n) throws IOException { 500 if (!stateChangeLock.isLocked()) { 501 throw new IllegalStateException("Expected stateChangeLock to be locked"); 502 } 503 waitForAsyncReadComplete(); 504 if (isEndOfStream()) { 505 return 0; 506 } 507 if (available() >= n) { 508 // we can skip from the internal buffers 509 int toSkip = (int) n; 510 // We need to skip from both active buffer and read ahead buffer 511 toSkip -= activeBuffer.remaining(); 512 if (toSkip <= 0) { // skipping from activeBuffer already handled. 513 throw new IllegalStateException("Expected toSkip > 0, actual: " + toSkip); 514 } 515 activeBuffer.position(0); 516 activeBuffer.flip(); 517 readAheadBuffer.position(toSkip + readAheadBuffer.position()); 518 swapBuffers(); 519 // Trigger async read to emptied read ahead buffer. 520 readAsync(); 521 return n; 522 } 523 final int skippedBytes = available(); 524 final long toSkip = n - skippedBytes; 525 activeBuffer.position(0); 526 activeBuffer.flip(); 527 readAheadBuffer.position(0); 528 readAheadBuffer.flip(); 529 final long skippedFromInputStream = in.skip(toSkip); 530 readAsync(); 531 return skippedBytes + skippedFromInputStream; 532 } 533 534 /** 535 * Flips the active and read ahead buffer 536 */ 537 private void swapBuffers() { 538 final ByteBuffer temp = activeBuffer; 539 activeBuffer = readAheadBuffer; 540 readAheadBuffer = temp; 541 } 542 543 private void waitForAsyncReadComplete() throws IOException { 544 stateChangeLock.lock(); 545 try { 546 isWaiting.set(true); 547 // There is only one reader, and one writer, so the writer should signal only once, 548 // but a while loop checking the wake-up condition is still needed to avoid spurious wakeups. 549 while (readInProgress) { 550 asyncReadComplete.await(); 551 } 552 } catch (final InterruptedException e) { 553 final InterruptedIOException iio = new InterruptedIOException(e.getMessage()); 554 iio.initCause(e); 555 throw iio; 556 } finally { 557 try { 558 isWaiting.set(false); 559 } finally { 560 stateChangeLock.unlock(); 561 } 562 } 563 checkReadException(); 564 } 565}