ReadAheadInputStream.java
- /*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package org.apache.commons.io.input;
- import static org.apache.commons.io.IOUtils.EOF;
- // import javax.annotation.concurrent.GuardedBy;
- import java.io.EOFException;
- import java.io.FilterInputStream;
- import java.io.IOException;
- import java.io.InputStream;
- import java.io.InterruptedIOException;
- import java.nio.ByteBuffer;
- import java.util.Objects;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.concurrent.TimeUnit;
- import java.util.concurrent.atomic.AtomicBoolean;
- import java.util.concurrent.locks.Condition;
- import java.util.concurrent.locks.ReentrantLock;
- import org.apache.commons.io.build.AbstractStreamBuilder;
- /**
- * Implements {@link InputStream} to asynchronously read ahead from an underlying input stream when a specified amount of data has been read from the current
- * buffer. It does so by maintaining two buffers: an active buffer and a read ahead buffer. The active buffer contains data which should be returned when a
- * read() call is issued. The read ahead buffer is used to asynchronously read from the underlying input stream. When the current active buffer is exhausted, we
- * flip the two buffers so that we can start reading from the read ahead buffer without being blocked by disk I/O.
- * <p>
- * To build an instance, use {@link Builder}.
- * </p>
- * <p>
- * This class was ported and adapted from Apache Spark commit 933dc6cb7b3de1d8ccaf73d124d6eb95b947ed19.
- * </p>
- *
- * @see Builder
- * @since 2.9.0
- */
- public class ReadAheadInputStream extends FilterInputStream {
- // @formatter:off
- /**
- * Builds a new {@link ReadAheadInputStream}.
- *
- * <p>
- * For example:
- * </p>
- * <pre>{@code
- * ReadAheadInputStream s = ReadAheadInputStream.builder()
- * .setPath(path)
- * .setExecutorService(Executors.newSingleThreadExecutor(ReadAheadInputStream::newThread))
- * .get();}
- * </pre>
- *
- * @see #get()
- * @since 2.12.0
- */
- // @formatter:on
- public static class Builder extends AbstractStreamBuilder<ReadAheadInputStream, Builder> {
- private ExecutorService executorService;
- /**
- * Constructs a new builder of {@link ReadAheadInputStream}.
- */
- public Builder() {
- // empty
- }
- /**
- * Builds a new {@link ReadAheadInputStream}.
- * <p>
- * You must set an aspect that supports {@link #getInputStream()}, otherwise, this method throws an exception.
- * </p>
- * <p>
- * This builder uses the following aspects:
- * </p>
- * <ul>
- * <li>{@link #getInputStream()} gets the target aspect.</li>
- * <li>{@link #getBufferSize()}</li>
- * <li>{@link ExecutorService}</li>
- * </ul>
- *
- * @return a new instance.
- * @throws IllegalStateException if the {@code origin} is {@code null}.
- * @throws UnsupportedOperationException if the origin cannot be converted to an {@link InputStream}.
- * @throws IOException if an I/O error occurs converting to an {@link InputStream} using {@link #getInputStream()}.
- * @see #getInputStream()
- * @see #getBufferSize()
- * @see #getUnchecked()
- */
- @Override
- public ReadAheadInputStream get() throws IOException {
- return new ReadAheadInputStream(getInputStream(), getBufferSize(), executorService != null ? executorService : newExecutorService(),
- executorService == null);
- }
- /**
- * Sets the executor service for the read-ahead thread.
- *
- * @param executorService the executor service for the read-ahead thread.
- * @return {@code this} instance.
- */
- public Builder setExecutorService(final ExecutorService executorService) {
- this.executorService = executorService;
- return this;
- }
- }
- private static final ThreadLocal<byte[]> BYTE_ARRAY_1 = ThreadLocal.withInitial(() -> new byte[1]);
- /**
- * Constructs a new {@link Builder}.
- *
- * @return a new {@link Builder}.
- * @since 2.12.0
- */
- public static Builder builder() {
- return new Builder();
- }
- /**
- * Constructs a new daemon thread.
- *
- * @param r the thread's runnable.
- * @return a new daemon thread.
- */
- private static Thread newDaemonThread(final Runnable r) {
- final Thread thread = new Thread(r, "commons-io-read-ahead");
- thread.setDaemon(true);
- return thread;
- }
- /**
- * Constructs a new daemon executor service.
- *
- * @return a new daemon executor service.
- */
- private static ExecutorService newExecutorService() {
- return Executors.newSingleThreadExecutor(ReadAheadInputStream::newDaemonThread);
- }
- private final ReentrantLock stateChangeLock = new ReentrantLock();
- // @GuardedBy("stateChangeLock")
- private ByteBuffer activeBuffer;
- // @GuardedBy("stateChangeLock")
- private ByteBuffer readAheadBuffer;
- // @GuardedBy("stateChangeLock")
- private boolean endOfStream;
- // @GuardedBy("stateChangeLock")
- // true if async read is in progress
- private boolean readInProgress;
- // @GuardedBy("stateChangeLock")
- // true if read is aborted due to an exception in reading from underlying input stream.
- private boolean readAborted;
- // @GuardedBy("stateChangeLock")
- private Throwable readException;
- // @GuardedBy("stateChangeLock")
- // whether the close method is called.
- private boolean isClosed;
- // @GuardedBy("stateChangeLock")
- // true when the close method will close the underlying input stream. This is valid only if
- // `isClosed` is true.
- private boolean isUnderlyingInputStreamBeingClosed;
- // @GuardedBy("stateChangeLock")
- // whether there is a read ahead task running,
- private boolean isReading;
- // Whether there is a reader waiting for data.
- private final AtomicBoolean isWaiting = new AtomicBoolean();
- private final ExecutorService executorService;
- private final boolean shutdownExecutorService;
- private final Condition asyncReadComplete = stateChangeLock.newCondition();
- /**
- * Constructs an instance with the specified buffer size and read-ahead threshold
- *
- * @param inputStream The underlying input stream.
- * @param bufferSizeInBytes The buffer size.
- * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
- */
- @Deprecated
- public ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes) {
- this(inputStream, bufferSizeInBytes, newExecutorService(), true);
- }
- /**
- * Constructs an instance with the specified buffer size and read-ahead threshold
- *
- * @param inputStream The underlying input stream.
- * @param bufferSizeInBytes The buffer size.
- * @param executorService An executor service for the read-ahead thread.
- * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
- */
- @Deprecated
- public ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes, final ExecutorService executorService) {
- this(inputStream, bufferSizeInBytes, executorService, false);
- }
- /**
- * Constructs an instance with the specified buffer size and read-ahead threshold
- *
- * @param inputStream The underlying input stream.
- * @param bufferSizeInBytes The buffer size.
- * @param executorService An executor service for the read-ahead thread.
- * @param shutdownExecutorService Whether or not to shut down the given ExecutorService on close.
- */
- private ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes, final ExecutorService executorService,
- final boolean shutdownExecutorService) {
- super(Objects.requireNonNull(inputStream, "inputStream"));
- if (bufferSizeInBytes <= 0) {
- throw new IllegalArgumentException("bufferSizeInBytes should be greater than 0, but the value is " + bufferSizeInBytes);
- }
- this.executorService = Objects.requireNonNull(executorService, "executorService");
- this.shutdownExecutorService = shutdownExecutorService;
- this.activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
- this.readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
- this.activeBuffer.flip();
- this.readAheadBuffer.flip();
- }
- @Override
- public int available() throws IOException {
- stateChangeLock.lock();
- // Make sure we have no integer overflow.
- try {
- return (int) Math.min(Integer.MAX_VALUE, (long) activeBuffer.remaining() + readAheadBuffer.remaining());
- } finally {
- stateChangeLock.unlock();
- }
- }
- private void checkReadException() throws IOException {
- if (readAborted) {
- if (readException instanceof IOException) {
- throw (IOException) readException;
- }
- throw new IOException(readException);
- }
- }
- @Override
- public void close() throws IOException {
- boolean isSafeToCloseUnderlyingInputStream = false;
- stateChangeLock.lock();
- try {
- if (isClosed) {
- return;
- }
- isClosed = true;
- if (!isReading) {
- // Nobody is reading, so we can close the underlying input stream in this method.
- isSafeToCloseUnderlyingInputStream = true;
- // Flip this to make sure the read ahead task will not close the underlying input stream.
- isUnderlyingInputStreamBeingClosed = true;
- }
- } finally {
- stateChangeLock.unlock();
- }
- if (shutdownExecutorService) {
- try {
- executorService.shutdownNow();
- executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
- } catch (final InterruptedException e) {
- final InterruptedIOException iio = new InterruptedIOException(e.getMessage());
- iio.initCause(e);
- throw iio;
- } finally {
- if (isSafeToCloseUnderlyingInputStream) {
- super.close();
- }
- }
- }
- }
- private void closeUnderlyingInputStreamIfNecessary() {
- boolean needToCloseUnderlyingInputStream = false;
- stateChangeLock.lock();
- try {
- isReading = false;
- if (isClosed && !isUnderlyingInputStreamBeingClosed) {
- // close method cannot close underlyingInputStream because we were reading.
- needToCloseUnderlyingInputStream = true;
- }
- } finally {
- stateChangeLock.unlock();
- }
- if (needToCloseUnderlyingInputStream) {
- try {
- super.close();
- } catch (final IOException ignored) {
- // TODO Rethrow as UncheckedIOException?
- }
- }
- }
- private boolean isEndOfStream() {
- return !activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream;
- }
- @Override
- public int read() throws IOException {
- if (activeBuffer.hasRemaining()) {
- // short path - just get one byte.
- return activeBuffer.get() & 0xFF;
- }
- final byte[] oneByteArray = BYTE_ARRAY_1.get();
- oneByteArray[0] = 0;
- return read(oneByteArray, 0, 1) == EOF ? EOF : oneByteArray[0] & 0xFF;
- }
- @Override
- public int read(final byte[] b, final int offset, int len) throws IOException {
- if (offset < 0 || len < 0 || len > b.length - offset) {
- throw new IndexOutOfBoundsException();
- }
- if (len == 0) {
- return 0;
- }
- if (!activeBuffer.hasRemaining()) {
- // No remaining in active buffer - lock and switch to write ahead buffer.
- stateChangeLock.lock();
- try {
- waitForAsyncReadComplete();
- if (!readAheadBuffer.hasRemaining()) {
- // The first read.
- readAsync();
- waitForAsyncReadComplete();
- if (isEndOfStream()) {
- return EOF;
- }
- }
- // Swap the newly read ahead buffer in place of empty active buffer.
- swapBuffers();
- // After swapping buffers, trigger another async read for read ahead buffer.
- readAsync();
- } finally {
- stateChangeLock.unlock();
- }
- }
- len = Math.min(len, activeBuffer.remaining());
- activeBuffer.get(b, offset, len);
- return len;
- }
- /**
- * Read data from underlyingInputStream to readAheadBuffer asynchronously.
- *
- * @throws IOException if an I/O error occurs.
- */
- private void readAsync() throws IOException {
- stateChangeLock.lock();
- final byte[] arr;
- try {
- arr = readAheadBuffer.array();
- if (endOfStream || readInProgress) {
- return;
- }
- checkReadException();
- readAheadBuffer.position(0);
- readAheadBuffer.flip();
- readInProgress = true;
- } finally {
- stateChangeLock.unlock();
- }
- executorService.execute(() -> {
- stateChangeLock.lock();
- try {
- if (isClosed) {
- readInProgress = false;
- return;
- }
- // Flip this so that the close method will not close the underlying input stream when we
- // are reading.
- isReading = true;
- } finally {
- stateChangeLock.unlock();
- }
- // Please note that it is safe to release the lock and read into the read ahead buffer
- // because either of following two conditions will hold:
- //
- // 1. The active buffer has data available to read so the reader will not read from the read ahead buffer.
- //
- // 2. This is the first time read is called or the active buffer is exhausted, in that case the reader waits
- // for this async read to complete.
- //
- // So there is no race condition in both the situations.
- int read = 0;
- int off = 0;
- int len = arr.length;
- Throwable exception = null;
- try {
- // try to fill the read ahead buffer.
- // if a reader is waiting, possibly return early.
- do {
- read = in.read(arr, off, len);
- if (read <= 0) {
- break;
- }
- off += read;
- len -= read;
- } while (len > 0 && !isWaiting.get());
- } catch (final Throwable ex) {
- exception = ex;
- if (ex instanceof Error) {
- // `readException` may not be reported to the user. Rethrow Error to make sure at least
- // The user can see Error in UncaughtExceptionHandler.
- throw (Error) ex;
- }
- } finally {
- stateChangeLock.lock();
- try {
- readAheadBuffer.limit(off);
- if (read < 0 || exception instanceof EOFException) {
- endOfStream = true;
- } else if (exception != null) {
- readAborted = true;
- readException = exception;
- }
- readInProgress = false;
- signalAsyncReadComplete();
- } finally {
- stateChangeLock.unlock();
- }
- closeUnderlyingInputStreamIfNecessary();
- }
- });
- }
- private void signalAsyncReadComplete() {
- stateChangeLock.lock();
- try {
- asyncReadComplete.signalAll();
- } finally {
- stateChangeLock.unlock();
- }
- }
- @Override
- public long skip(final long n) throws IOException {
- if (n <= 0L) {
- return 0L;
- }
- if (n <= activeBuffer.remaining()) {
- // Only skipping from active buffer is sufficient
- activeBuffer.position((int) n + activeBuffer.position());
- return n;
- }
- stateChangeLock.lock();
- final long skipped;
- try {
- skipped = skipInternal(n);
- } finally {
- stateChangeLock.unlock();
- }
- return skipped;
- }
- /**
- * Internal skip function which should be called only from skip(). The assumption is that the stateChangeLock is already acquired in the caller before
- * calling this function.
- *
- * @param n the number of bytes to be skipped.
- * @return the actual number of bytes skipped.
- * @throws IOException if an I/O error occurs.
- */
- private long skipInternal(final long n) throws IOException {
- if (!stateChangeLock.isLocked()) {
- throw new IllegalStateException("Expected stateChangeLock to be locked");
- }
- waitForAsyncReadComplete();
- if (isEndOfStream()) {
- return 0;
- }
- if (available() >= n) {
- // we can skip from the internal buffers
- int toSkip = (int) n;
- // We need to skip from both active buffer and read ahead buffer
- toSkip -= activeBuffer.remaining();
- if (toSkip <= 0) { // skipping from activeBuffer already handled.
- throw new IllegalStateException("Expected toSkip > 0, actual: " + toSkip);
- }
- activeBuffer.position(0);
- activeBuffer.flip();
- readAheadBuffer.position(toSkip + readAheadBuffer.position());
- swapBuffers();
- // Trigger async read to emptied read ahead buffer.
- readAsync();
- return n;
- }
- final int skippedBytes = available();
- final long toSkip = n - skippedBytes;
- activeBuffer.position(0);
- activeBuffer.flip();
- readAheadBuffer.position(0);
- readAheadBuffer.flip();
- final long skippedFromInputStream = in.skip(toSkip);
- readAsync();
- return skippedBytes + skippedFromInputStream;
- }
- /**
- * Flips the active and read ahead buffer
- */
- private void swapBuffers() {
- final ByteBuffer temp = activeBuffer;
- activeBuffer = readAheadBuffer;
- readAheadBuffer = temp;
- }
- private void waitForAsyncReadComplete() throws IOException {
- stateChangeLock.lock();
- try {
- isWaiting.set(true);
- // There is only one reader, and one writer, so the writer should signal only once,
- // but a while loop checking the wake-up condition is still needed to avoid spurious wakeups.
- while (readInProgress) {
- asyncReadComplete.await();
- }
- } catch (final InterruptedException e) {
- final InterruptedIOException iio = new InterruptedIOException(e.getMessage());
- iio.initCause(e);
- throw iio;
- } finally {
- try {
- isWaiting.set(false);
- } finally {
- stateChangeLock.unlock();
- }
- }
- checkReadException();
- }
- }