001/* 002 * Licensed under the Apache License, Version 2.0 (the "License"); 003 * you may not use this file except in compliance with the License. 004 * You may obtain a copy of the License at 005 * 006 * http://www.apache.org/licenses/LICENSE-2.0 007 * 008 * Unless required by applicable law or agreed to in writing, software 009 * distributed under the License is distributed on an "AS IS" BASIS, 010 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 011 * See the License for the specific language governing permissions and 012 * limitations under the License. 013 */ 014package org.apache.commons.io.input; 015 016import static org.apache.commons.io.IOUtils.EOF; 017 018// import javax.annotation.concurrent.GuardedBy; 019import java.io.EOFException; 020import java.io.FilterInputStream; 021import java.io.IOException; 022import java.io.InputStream; 023import java.io.InterruptedIOException; 024import java.nio.ByteBuffer; 025import java.util.Objects; 026import java.util.concurrent.ExecutorService; 027import java.util.concurrent.Executors; 028import java.util.concurrent.TimeUnit; 029import java.util.concurrent.atomic.AtomicBoolean; 030import java.util.concurrent.locks.Condition; 031import java.util.concurrent.locks.ReentrantLock; 032 033import org.apache.commons.io.build.AbstractStreamBuilder; 034 035/** 036 * Implements {@link InputStream} to asynchronously read ahead from an underlying input stream when a specified amount of data has been read from the current 037 * buffer. It does so by maintaining two buffers: an active buffer and a read ahead buffer. The active buffer contains data which should be returned when a 038 * read() call is issued. The read ahead buffer is used to asynchronously read from the underlying input stream. When the current active buffer is exhausted, we 039 * flip the two buffers so that we can start reading from the read ahead buffer without being blocked by disk I/O. 040 * <p> 041 * To build an instance, use {@link Builder}. 042 * </p> 043 * <p> 044 * This class was ported and adapted from Apache Spark commit 933dc6cb7b3de1d8ccaf73d124d6eb95b947ed19. 045 * </p> 046 * 047 * @see Builder 048 * @since 2.9.0 049 */ 050public class ReadAheadInputStream extends FilterInputStream { 051 052 // @formatter:off 053 /** 054 * Builds a new {@link ReadAheadInputStream}. 055 * 056 * <p> 057 * For example: 058 * </p> 059 * <pre>{@code 060 * ReadAheadInputStream s = ReadAheadInputStream.builder() 061 * .setPath(path) 062 * .setExecutorService(Executors.newSingleThreadExecutor(ReadAheadInputStream::newThread)) 063 * .get();} 064 * </pre> 065 * 066 * @see #get() 067 * @since 2.12.0 068 */ 069 // @formatter:on 070 public static class Builder extends AbstractStreamBuilder<ReadAheadInputStream, Builder> { 071 072 private ExecutorService executorService; 073 074 /** 075 * Constructs a new builder of {@link ReadAheadInputStream}. 076 */ 077 public Builder() { 078 // empty 079 } 080 081 /** 082 * Builds a new {@link ReadAheadInputStream}. 083 * <p> 084 * You must set an aspect that supports {@link #getInputStream()}, otherwise, this method throws an exception. 085 * </p> 086 * <p> 087 * This builder uses the following aspects: 088 * </p> 089 * <ul> 090 * <li>{@link #getInputStream()} gets the target aspect.</li> 091 * <li>{@link #getBufferSize()}</li> 092 * <li>{@link ExecutorService}</li> 093 * </ul> 094 * 095 * @return a new instance. 096 * @throws IllegalStateException if the {@code origin} is {@code null}. 097 * @throws UnsupportedOperationException if the origin cannot be converted to an {@link InputStream}. 098 * @throws IOException if an I/O error occurs converting to an {@link InputStream} using {@link #getInputStream()}. 099 * @see #getInputStream() 100 * @see #getBufferSize() 101 * @see #getUnchecked() 102 */ 103 @Override 104 public ReadAheadInputStream get() throws IOException { 105 return new ReadAheadInputStream(getInputStream(), getBufferSize(), executorService != null ? executorService : newExecutorService(), 106 executorService == null); 107 } 108 109 /** 110 * Sets the executor service for the read-ahead thread. 111 * 112 * @param executorService the executor service for the read-ahead thread. 113 * @return {@code this} instance. 114 */ 115 public Builder setExecutorService(final ExecutorService executorService) { 116 this.executorService = executorService; 117 return this; 118 } 119 120 } 121 122 private static final ThreadLocal<byte[]> BYTE_ARRAY_1 = ThreadLocal.withInitial(() -> new byte[1]); 123 124 /** 125 * Constructs a new {@link Builder}. 126 * 127 * @return a new {@link Builder}. 128 * @since 2.12.0 129 */ 130 public static Builder builder() { 131 return new Builder(); 132 } 133 134 /** 135 * Constructs a new daemon thread. 136 * 137 * @param r the thread's runnable. 138 * @return a new daemon thread. 139 */ 140 private static Thread newDaemonThread(final Runnable r) { 141 final Thread thread = new Thread(r, "commons-io-read-ahead"); 142 thread.setDaemon(true); 143 return thread; 144 } 145 146 /** 147 * Constructs a new daemon executor service. 148 * 149 * @return a new daemon executor service. 150 */ 151 private static ExecutorService newExecutorService() { 152 return Executors.newSingleThreadExecutor(ReadAheadInputStream::newDaemonThread); 153 } 154 155 private final ReentrantLock stateChangeLock = new ReentrantLock(); 156 157 // @GuardedBy("stateChangeLock") 158 private ByteBuffer activeBuffer; 159 160 // @GuardedBy("stateChangeLock") 161 private ByteBuffer readAheadBuffer; 162 163 // @GuardedBy("stateChangeLock") 164 private boolean endOfStream; 165 166 // @GuardedBy("stateChangeLock") 167 // true if async read is in progress 168 private boolean readInProgress; 169 170 // @GuardedBy("stateChangeLock") 171 // true if read is aborted due to an exception in reading from underlying input stream. 172 private boolean readAborted; 173 174 // @GuardedBy("stateChangeLock") 175 private Throwable readException; 176 177 // @GuardedBy("stateChangeLock") 178 // whether the close method is called. 179 private boolean isClosed; 180 181 // @GuardedBy("stateChangeLock") 182 // true when the close method will close the underlying input stream. This is valid only if 183 // `isClosed` is true. 184 private boolean isUnderlyingInputStreamBeingClosed; 185 186 // @GuardedBy("stateChangeLock") 187 // whether there is a read ahead task running, 188 private boolean isReading; 189 190 // Whether there is a reader waiting for data. 191 private final AtomicBoolean isWaiting = new AtomicBoolean(); 192 193 private final ExecutorService executorService; 194 195 private final boolean shutdownExecutorService; 196 197 private final Condition asyncReadComplete = stateChangeLock.newCondition(); 198 199 /** 200 * Constructs an instance with the specified buffer size and read-ahead threshold 201 * 202 * @param inputStream The underlying input stream. 203 * @param bufferSizeInBytes The buffer size. 204 * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()} 205 */ 206 @Deprecated 207 public ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes) { 208 this(inputStream, bufferSizeInBytes, newExecutorService(), true); 209 } 210 211 /** 212 * Constructs an instance with the specified buffer size and read-ahead threshold 213 * 214 * @param inputStream The underlying input stream. 215 * @param bufferSizeInBytes The buffer size. 216 * @param executorService An executor service for the read-ahead thread. 217 * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()} 218 */ 219 @Deprecated 220 public ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes, final ExecutorService executorService) { 221 this(inputStream, bufferSizeInBytes, executorService, false); 222 } 223 224 /** 225 * Constructs an instance with the specified buffer size and read-ahead threshold 226 * 227 * @param inputStream The underlying input stream. 228 * @param bufferSizeInBytes The buffer size. 229 * @param executorService An executor service for the read-ahead thread. 230 * @param shutdownExecutorService Whether or not to shut down the given ExecutorService on close. 231 */ 232 private ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes, final ExecutorService executorService, 233 final boolean shutdownExecutorService) { 234 super(Objects.requireNonNull(inputStream, "inputStream")); 235 if (bufferSizeInBytes <= 0) { 236 throw new IllegalArgumentException("bufferSizeInBytes should be greater than 0, but the value is " + bufferSizeInBytes); 237 } 238 this.executorService = Objects.requireNonNull(executorService, "executorService"); 239 this.shutdownExecutorService = shutdownExecutorService; 240 this.activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); 241 this.readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); 242 this.activeBuffer.flip(); 243 this.readAheadBuffer.flip(); 244 } 245 246 @Override 247 public int available() throws IOException { 248 stateChangeLock.lock(); 249 // Make sure we have no integer overflow. 250 try { 251 return (int) Math.min(Integer.MAX_VALUE, (long) activeBuffer.remaining() + readAheadBuffer.remaining()); 252 } finally { 253 stateChangeLock.unlock(); 254 } 255 } 256 257 private void checkReadException() throws IOException { 258 if (readAborted) { 259 if (readException instanceof IOException) { 260 throw (IOException) readException; 261 } 262 throw new IOException(readException); 263 } 264 } 265 266 @Override 267 public void close() throws IOException { 268 boolean isSafeToCloseUnderlyingInputStream = false; 269 stateChangeLock.lock(); 270 try { 271 if (isClosed) { 272 return; 273 } 274 isClosed = true; 275 if (!isReading) { 276 // Nobody is reading, so we can close the underlying input stream in this method. 277 isSafeToCloseUnderlyingInputStream = true; 278 // Flip this to make sure the read ahead task will not close the underlying input stream. 279 isUnderlyingInputStreamBeingClosed = true; 280 } 281 } finally { 282 stateChangeLock.unlock(); 283 } 284 285 if (shutdownExecutorService) { 286 try { 287 executorService.shutdownNow(); 288 executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); 289 } catch (final InterruptedException e) { 290 final InterruptedIOException iio = new InterruptedIOException(e.getMessage()); 291 iio.initCause(e); 292 throw iio; 293 } finally { 294 if (isSafeToCloseUnderlyingInputStream) { 295 super.close(); 296 } 297 } 298 } 299 } 300 301 private void closeUnderlyingInputStreamIfNecessary() { 302 boolean needToCloseUnderlyingInputStream = false; 303 stateChangeLock.lock(); 304 try { 305 isReading = false; 306 if (isClosed && !isUnderlyingInputStreamBeingClosed) { 307 // close method cannot close underlyingInputStream because we were reading. 308 needToCloseUnderlyingInputStream = true; 309 } 310 } finally { 311 stateChangeLock.unlock(); 312 } 313 if (needToCloseUnderlyingInputStream) { 314 try { 315 super.close(); 316 } catch (final IOException ignored) { 317 // TODO Rethrow as UncheckedIOException? 318 } 319 } 320 } 321 322 private boolean isEndOfStream() { 323 return !activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream; 324 } 325 326 @Override 327 public int read() throws IOException { 328 if (activeBuffer.hasRemaining()) { 329 // short path - just get one byte. 330 return activeBuffer.get() & 0xFF; 331 } 332 final byte[] oneByteArray = BYTE_ARRAY_1.get(); 333 oneByteArray[0] = 0; 334 return read(oneByteArray, 0, 1) == EOF ? EOF : oneByteArray[0] & 0xFF; 335 } 336 337 @Override 338 public int read(final byte[] b, final int offset, int len) throws IOException { 339 if (offset < 0 || len < 0 || len > b.length - offset) { 340 throw new IndexOutOfBoundsException(); 341 } 342 if (len == 0) { 343 return 0; 344 } 345 346 if (!activeBuffer.hasRemaining()) { 347 // No remaining in active buffer - lock and switch to write ahead buffer. 348 stateChangeLock.lock(); 349 try { 350 waitForAsyncReadComplete(); 351 if (!readAheadBuffer.hasRemaining()) { 352 // The first read. 353 readAsync(); 354 waitForAsyncReadComplete(); 355 if (isEndOfStream()) { 356 return EOF; 357 } 358 } 359 // Swap the newly read ahead buffer in place of empty active buffer. 360 swapBuffers(); 361 // After swapping buffers, trigger another async read for read ahead buffer. 362 readAsync(); 363 } finally { 364 stateChangeLock.unlock(); 365 } 366 } 367 len = Math.min(len, activeBuffer.remaining()); 368 activeBuffer.get(b, offset, len); 369 370 return len; 371 } 372 373 /** 374 * Read data from underlyingInputStream to readAheadBuffer asynchronously. 375 * 376 * @throws IOException if an I/O error occurs. 377 */ 378 private void readAsync() throws IOException { 379 stateChangeLock.lock(); 380 final byte[] arr; 381 try { 382 arr = readAheadBuffer.array(); 383 if (endOfStream || readInProgress) { 384 return; 385 } 386 checkReadException(); 387 readAheadBuffer.position(0); 388 readAheadBuffer.flip(); 389 readInProgress = true; 390 } finally { 391 stateChangeLock.unlock(); 392 } 393 executorService.execute(() -> { 394 stateChangeLock.lock(); 395 try { 396 if (isClosed) { 397 readInProgress = false; 398 return; 399 } 400 // Flip this so that the close method will not close the underlying input stream when we 401 // are reading. 402 isReading = true; 403 } finally { 404 stateChangeLock.unlock(); 405 } 406 407 // Please note that it is safe to release the lock and read into the read ahead buffer 408 // because either of following two conditions will hold: 409 // 410 // 1. The active buffer has data available to read so the reader will not read from the read ahead buffer. 411 // 412 // 2. This is the first time read is called or the active buffer is exhausted, in that case the reader waits 413 // for this async read to complete. 414 // 415 // So there is no race condition in both the situations. 416 int read = 0; 417 int off = 0; 418 int len = arr.length; 419 Throwable exception = null; 420 try { 421 // try to fill the read ahead buffer. 422 // if a reader is waiting, possibly return early. 423 do { 424 read = in.read(arr, off, len); 425 if (read <= 0) { 426 break; 427 } 428 off += read; 429 len -= read; 430 } while (len > 0 && !isWaiting.get()); 431 } catch (final Throwable ex) { 432 exception = ex; 433 if (ex instanceof Error) { 434 // `readException` may not be reported to the user. Rethrow Error to make sure at least 435 // The user can see Error in UncaughtExceptionHandler. 436 throw (Error) ex; 437 } 438 } finally { 439 stateChangeLock.lock(); 440 try { 441 readAheadBuffer.limit(off); 442 if (read < 0 || exception instanceof EOFException) { 443 endOfStream = true; 444 } else if (exception != null) { 445 readAborted = true; 446 readException = exception; 447 } 448 readInProgress = false; 449 signalAsyncReadComplete(); 450 } finally { 451 stateChangeLock.unlock(); 452 } 453 closeUnderlyingInputStreamIfNecessary(); 454 } 455 }); 456 } 457 458 private void signalAsyncReadComplete() { 459 stateChangeLock.lock(); 460 try { 461 asyncReadComplete.signalAll(); 462 } finally { 463 stateChangeLock.unlock(); 464 } 465 } 466 467 @Override 468 public long skip(final long n) throws IOException { 469 if (n <= 0L) { 470 return 0L; 471 } 472 if (n <= activeBuffer.remaining()) { 473 // Only skipping from active buffer is sufficient 474 activeBuffer.position((int) n + activeBuffer.position()); 475 return n; 476 } 477 stateChangeLock.lock(); 478 final long skipped; 479 try { 480 skipped = skipInternal(n); 481 } finally { 482 stateChangeLock.unlock(); 483 } 484 return skipped; 485 } 486 487 /** 488 * Internal skip function which should be called only from skip(). The assumption is that the stateChangeLock is already acquired in the caller before 489 * calling this function. 490 * 491 * @param n the number of bytes to be skipped. 492 * @return the actual number of bytes skipped. 493 * @throws IOException if an I/O error occurs. 494 */ 495 private long skipInternal(final long n) throws IOException { 496 if (!stateChangeLock.isLocked()) { 497 throw new IllegalStateException("Expected stateChangeLock to be locked"); 498 } 499 waitForAsyncReadComplete(); 500 if (isEndOfStream()) { 501 return 0; 502 } 503 if (available() >= n) { 504 // we can skip from the internal buffers 505 int toSkip = (int) n; 506 // We need to skip from both active buffer and read ahead buffer 507 toSkip -= activeBuffer.remaining(); 508 if (toSkip <= 0) { // skipping from activeBuffer already handled. 509 throw new IllegalStateException("Expected toSkip > 0, actual: " + toSkip); 510 } 511 activeBuffer.position(0); 512 activeBuffer.flip(); 513 readAheadBuffer.position(toSkip + readAheadBuffer.position()); 514 swapBuffers(); 515 // Trigger async read to emptied read ahead buffer. 516 readAsync(); 517 return n; 518 } 519 final int skippedBytes = available(); 520 final long toSkip = n - skippedBytes; 521 activeBuffer.position(0); 522 activeBuffer.flip(); 523 readAheadBuffer.position(0); 524 readAheadBuffer.flip(); 525 final long skippedFromInputStream = in.skip(toSkip); 526 readAsync(); 527 return skippedBytes + skippedFromInputStream; 528 } 529 530 /** 531 * Flips the active and read ahead buffer 532 */ 533 private void swapBuffers() { 534 final ByteBuffer temp = activeBuffer; 535 activeBuffer = readAheadBuffer; 536 readAheadBuffer = temp; 537 } 538 539 private void waitForAsyncReadComplete() throws IOException { 540 stateChangeLock.lock(); 541 try { 542 isWaiting.set(true); 543 // There is only one reader, and one writer, so the writer should signal only once, 544 // but a while loop checking the wake-up condition is still needed to avoid spurious wakeups. 545 while (readInProgress) { 546 asyncReadComplete.await(); 547 } 548 } catch (final InterruptedException e) { 549 final InterruptedIOException iio = new InterruptedIOException(e.getMessage()); 550 iio.initCause(e); 551 throw iio; 552 } finally { 553 try { 554 isWaiting.set(false); 555 } finally { 556 stateChangeLock.unlock(); 557 } 558 } 559 checkReadException(); 560 } 561}