ReadAheadInputStream.java

  1. /*
  2.  * Licensed under the Apache License, Version 2.0 (the "License");
  3.  * you may not use this file except in compliance with the License.
  4.  * You may obtain a copy of the License at
  5.  *
  6.  *     http://www.apache.org/licenses/LICENSE-2.0
  7.  *
  8.  * Unless required by applicable law or agreed to in writing, software
  9.  * distributed under the License is distributed on an "AS IS" BASIS,
  10.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11.  * See the License for the specific language governing permissions and
  12.  * limitations under the License.
  13.  */
  14. package org.apache.commons.io.input;

  15. import static org.apache.commons.io.IOUtils.EOF;

  16. // import javax.annotation.concurrent.GuardedBy;
  17. import java.io.EOFException;
  18. import java.io.FilterInputStream;
  19. import java.io.IOException;
  20. import java.io.InputStream;
  21. import java.io.InterruptedIOException;
  22. import java.nio.ByteBuffer;
  23. import java.util.Objects;
  24. import java.util.concurrent.ExecutorService;
  25. import java.util.concurrent.Executors;
  26. import java.util.concurrent.TimeUnit;
  27. import java.util.concurrent.atomic.AtomicBoolean;
  28. import java.util.concurrent.locks.Condition;
  29. import java.util.concurrent.locks.ReentrantLock;

  30. import org.apache.commons.io.build.AbstractStreamBuilder;

  31. /**
  32.  * Implements {@link InputStream} to asynchronously read ahead from an underlying input stream when a specified amount of data has been read from the current
  33.  * buffer. It does so by maintaining two buffers: an active buffer and a read ahead buffer. The active buffer contains data which should be returned when a
  34.  * read() call is issued. The read ahead buffer is used to asynchronously read from the underlying input stream. When the current active buffer is exhausted, we
  35.  * flip the two buffers so that we can start reading from the read ahead buffer without being blocked by disk I/O.
  36.  * <p>
  37.  * To build an instance, use {@link Builder}.
  38.  * </p>
  39.  * <p>
  40.  * This class was ported and adapted from Apache Spark commit 933dc6cb7b3de1d8ccaf73d124d6eb95b947ed19.
  41.  * </p>
  42.  *
  43.  * @see Builder
  44.  * @since 2.9.0
  45.  */
  46. public class ReadAheadInputStream extends FilterInputStream {

  47.     // @formatter:off
  48.     /**
  49.      * Builds a new {@link ReadAheadInputStream}.
  50.      *
  51.      * <p>
  52.      * For example:
  53.      * </p>
  54.      * <pre>{@code
  55.      * ReadAheadInputStream s = ReadAheadInputStream.builder()
  56.      *   .setPath(path)
  57.      *   .setExecutorService(Executors.newSingleThreadExecutor(ReadAheadInputStream::newThread))
  58.      *   .get();}
  59.      * </pre>
  60.      *
  61.      * @see #get()
  62.      * @since 2.12.0
  63.      */
  64.     // @formatter:on
  65.     public static class Builder extends AbstractStreamBuilder<ReadAheadInputStream, Builder> {

  66.         private ExecutorService executorService;

  67.         /**
  68.          * Constructs a new builder of {@link ReadAheadInputStream}.
  69.          */
  70.         public Builder() {
  71.             // empty
  72.         }

  73.         /**
  74.          * Builds a new {@link ReadAheadInputStream}.
  75.          * <p>
  76.          * You must set an aspect that supports {@link #getInputStream()}, otherwise, this method throws an exception.
  77.          * </p>
  78.          * <p>
  79.          * This builder uses the following aspects:
  80.          * </p>
  81.          * <ul>
  82.          * <li>{@link #getInputStream()} gets the target aspect.</li>
  83.          * <li>{@link #getBufferSize()}</li>
  84.          * <li>{@link ExecutorService}</li>
  85.          * </ul>
  86.          *
  87.          * @return a new instance.
  88.          * @throws IllegalStateException         if the {@code origin} is {@code null}.
  89.          * @throws UnsupportedOperationException if the origin cannot be converted to an {@link InputStream}.
  90.          * @throws IOException                   if an I/O error occurs converting to an {@link InputStream} using {@link #getInputStream()}.
  91.          * @see #getInputStream()
  92.          * @see #getBufferSize()
  93.          * @see #getUnchecked()
  94.          */
  95.         @Override
  96.         public ReadAheadInputStream get() throws IOException {
  97.             return new ReadAheadInputStream(getInputStream(), getBufferSize(), executorService != null ? executorService : newExecutorService(),
  98.                     executorService == null);
  99.         }

  100.         /**
  101.          * Sets the executor service for the read-ahead thread.
  102.          *
  103.          * @param executorService the executor service for the read-ahead thread.
  104.          * @return {@code this} instance.
  105.          */
  106.         public Builder setExecutorService(final ExecutorService executorService) {
  107.             this.executorService = executorService;
  108.             return this;
  109.         }

  110.     }

  111.     private static final ThreadLocal<byte[]> BYTE_ARRAY_1 = ThreadLocal.withInitial(() -> new byte[1]);

  112.     /**
  113.      * Constructs a new {@link Builder}.
  114.      *
  115.      * @return a new {@link Builder}.
  116.      * @since 2.12.0
  117.      */
  118.     public static Builder builder() {
  119.         return new Builder();
  120.     }

  121.     /**
  122.      * Constructs a new daemon thread.
  123.      *
  124.      * @param r the thread's runnable.
  125.      * @return a new daemon thread.
  126.      */
  127.     private static Thread newDaemonThread(final Runnable r) {
  128.         final Thread thread = new Thread(r, "commons-io-read-ahead");
  129.         thread.setDaemon(true);
  130.         return thread;
  131.     }

  132.     /**
  133.      * Constructs a new daemon executor service.
  134.      *
  135.      * @return a new daemon executor service.
  136.      */
  137.     private static ExecutorService newExecutorService() {
  138.         return Executors.newSingleThreadExecutor(ReadAheadInputStream::newDaemonThread);
  139.     }

  140.     private final ReentrantLock stateChangeLock = new ReentrantLock();

  141.     // @GuardedBy("stateChangeLock")
  142.     private ByteBuffer activeBuffer;

  143.     // @GuardedBy("stateChangeLock")
  144.     private ByteBuffer readAheadBuffer;

  145.     // @GuardedBy("stateChangeLock")
  146.     private boolean endOfStream;

  147.     // @GuardedBy("stateChangeLock")
  148.     // true if async read is in progress
  149.     private boolean readInProgress;

  150.     // @GuardedBy("stateChangeLock")
  151.     // true if read is aborted due to an exception in reading from underlying input stream.
  152.     private boolean readAborted;

  153.     // @GuardedBy("stateChangeLock")
  154.     private Throwable readException;

  155.     // @GuardedBy("stateChangeLock")
  156.     // whether the close method is called.
  157.     private boolean isClosed;

  158.     // @GuardedBy("stateChangeLock")
  159.     // true when the close method will close the underlying input stream. This is valid only if
  160.     // `isClosed` is true.
  161.     private boolean isUnderlyingInputStreamBeingClosed;

  162.     // @GuardedBy("stateChangeLock")
  163.     // whether there is a read ahead task running,
  164.     private boolean isReading;

  165.     // Whether there is a reader waiting for data.
  166.     private final AtomicBoolean isWaiting = new AtomicBoolean();

  167.     private final ExecutorService executorService;

  168.     private final boolean shutdownExecutorService;

  169.     private final Condition asyncReadComplete = stateChangeLock.newCondition();

  170.     /**
  171.      * Constructs an instance with the specified buffer size and read-ahead threshold
  172.      *
  173.      * @param inputStream       The underlying input stream.
  174.      * @param bufferSizeInBytes The buffer size.
  175.      * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
  176.      */
  177.     @Deprecated
  178.     public ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes) {
  179.         this(inputStream, bufferSizeInBytes, newExecutorService(), true);
  180.     }

  181.     /**
  182.      * Constructs an instance with the specified buffer size and read-ahead threshold
  183.      *
  184.      * @param inputStream       The underlying input stream.
  185.      * @param bufferSizeInBytes The buffer size.
  186.      * @param executorService   An executor service for the read-ahead thread.
  187.      * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
  188.      */
  189.     @Deprecated
  190.     public ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes, final ExecutorService executorService) {
  191.         this(inputStream, bufferSizeInBytes, executorService, false);
  192.     }

  193.     /**
  194.      * Constructs an instance with the specified buffer size and read-ahead threshold
  195.      *
  196.      * @param inputStream             The underlying input stream.
  197.      * @param bufferSizeInBytes       The buffer size.
  198.      * @param executorService         An executor service for the read-ahead thread.
  199.      * @param shutdownExecutorService Whether or not to shut down the given ExecutorService on close.
  200.      */
  201.     private ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes, final ExecutorService executorService,
  202.             final boolean shutdownExecutorService) {
  203.         super(Objects.requireNonNull(inputStream, "inputStream"));
  204.         if (bufferSizeInBytes <= 0) {
  205.             throw new IllegalArgumentException("bufferSizeInBytes should be greater than 0, but the value is " + bufferSizeInBytes);
  206.         }
  207.         this.executorService = Objects.requireNonNull(executorService, "executorService");
  208.         this.shutdownExecutorService = shutdownExecutorService;
  209.         this.activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
  210.         this.readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
  211.         this.activeBuffer.flip();
  212.         this.readAheadBuffer.flip();
  213.     }

  214.     @Override
  215.     public int available() throws IOException {
  216.         stateChangeLock.lock();
  217.         // Make sure we have no integer overflow.
  218.         try {
  219.             return (int) Math.min(Integer.MAX_VALUE, (long) activeBuffer.remaining() + readAheadBuffer.remaining());
  220.         } finally {
  221.             stateChangeLock.unlock();
  222.         }
  223.     }

  224.     private void checkReadException() throws IOException {
  225.         if (readAborted) {
  226.             if (readException instanceof IOException) {
  227.                 throw (IOException) readException;
  228.             }
  229.             throw new IOException(readException);
  230.         }
  231.     }

  232.     @Override
  233.     public void close() throws IOException {
  234.         boolean isSafeToCloseUnderlyingInputStream = false;
  235.         stateChangeLock.lock();
  236.         try {
  237.             if (isClosed) {
  238.                 return;
  239.             }
  240.             isClosed = true;
  241.             if (!isReading) {
  242.                 // Nobody is reading, so we can close the underlying input stream in this method.
  243.                 isSafeToCloseUnderlyingInputStream = true;
  244.                 // Flip this to make sure the read ahead task will not close the underlying input stream.
  245.                 isUnderlyingInputStreamBeingClosed = true;
  246.             }
  247.         } finally {
  248.             stateChangeLock.unlock();
  249.         }

  250.         if (shutdownExecutorService) {
  251.             try {
  252.                 executorService.shutdownNow();
  253.                 executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
  254.             } catch (final InterruptedException e) {
  255.                 final InterruptedIOException iio = new InterruptedIOException(e.getMessage());
  256.                 iio.initCause(e);
  257.                 throw iio;
  258.             } finally {
  259.                 if (isSafeToCloseUnderlyingInputStream) {
  260.                     super.close();
  261.                 }
  262.             }
  263.         }
  264.     }

  265.     private void closeUnderlyingInputStreamIfNecessary() {
  266.         boolean needToCloseUnderlyingInputStream = false;
  267.         stateChangeLock.lock();
  268.         try {
  269.             isReading = false;
  270.             if (isClosed && !isUnderlyingInputStreamBeingClosed) {
  271.                 // close method cannot close underlyingInputStream because we were reading.
  272.                 needToCloseUnderlyingInputStream = true;
  273.             }
  274.         } finally {
  275.             stateChangeLock.unlock();
  276.         }
  277.         if (needToCloseUnderlyingInputStream) {
  278.             try {
  279.                 super.close();
  280.             } catch (final IOException ignored) {
  281.                 // TODO Rethrow as UncheckedIOException?
  282.             }
  283.         }
  284.     }

  285.     private boolean isEndOfStream() {
  286.         return !activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream;
  287.     }

  288.     @Override
  289.     public int read() throws IOException {
  290.         if (activeBuffer.hasRemaining()) {
  291.             // short path - just get one byte.
  292.             return activeBuffer.get() & 0xFF;
  293.         }
  294.         final byte[] oneByteArray = BYTE_ARRAY_1.get();
  295.         oneByteArray[0] = 0;
  296.         return read(oneByteArray, 0, 1) == EOF ? EOF : oneByteArray[0] & 0xFF;
  297.     }

  298.     @Override
  299.     public int read(final byte[] b, final int offset, int len) throws IOException {
  300.         if (offset < 0 || len < 0 || len > b.length - offset) {
  301.             throw new IndexOutOfBoundsException();
  302.         }
  303.         if (len == 0) {
  304.             return 0;
  305.         }

  306.         if (!activeBuffer.hasRemaining()) {
  307.             // No remaining in active buffer - lock and switch to write ahead buffer.
  308.             stateChangeLock.lock();
  309.             try {
  310.                 waitForAsyncReadComplete();
  311.                 if (!readAheadBuffer.hasRemaining()) {
  312.                     // The first read.
  313.                     readAsync();
  314.                     waitForAsyncReadComplete();
  315.                     if (isEndOfStream()) {
  316.                         return EOF;
  317.                     }
  318.                 }
  319.                 // Swap the newly read ahead buffer in place of empty active buffer.
  320.                 swapBuffers();
  321.                 // After swapping buffers, trigger another async read for read ahead buffer.
  322.                 readAsync();
  323.             } finally {
  324.                 stateChangeLock.unlock();
  325.             }
  326.         }
  327.         len = Math.min(len, activeBuffer.remaining());
  328.         activeBuffer.get(b, offset, len);

  329.         return len;
  330.     }

  331.     /**
  332.      * Read data from underlyingInputStream to readAheadBuffer asynchronously.
  333.      *
  334.      * @throws IOException if an I/O error occurs.
  335.      */
  336.     private void readAsync() throws IOException {
  337.         stateChangeLock.lock();
  338.         final byte[] arr;
  339.         try {
  340.             arr = readAheadBuffer.array();
  341.             if (endOfStream || readInProgress) {
  342.                 return;
  343.             }
  344.             checkReadException();
  345.             readAheadBuffer.position(0);
  346.             readAheadBuffer.flip();
  347.             readInProgress = true;
  348.         } finally {
  349.             stateChangeLock.unlock();
  350.         }
  351.         executorService.execute(() -> {
  352.             stateChangeLock.lock();
  353.             try {
  354.                 if (isClosed) {
  355.                     readInProgress = false;
  356.                     return;
  357.                 }
  358.                 // Flip this so that the close method will not close the underlying input stream when we
  359.                 // are reading.
  360.                 isReading = true;
  361.             } finally {
  362.                 stateChangeLock.unlock();
  363.             }

  364.             // Please note that it is safe to release the lock and read into the read ahead buffer
  365.             // because either of following two conditions will hold:
  366.             //
  367.             // 1. The active buffer has data available to read so the reader will not read from the read ahead buffer.
  368.             //
  369.             // 2. This is the first time read is called or the active buffer is exhausted, in that case the reader waits
  370.             // for this async read to complete.
  371.             //
  372.             // So there is no race condition in both the situations.
  373.             int read = 0;
  374.             int off = 0;
  375.             int len = arr.length;
  376.             Throwable exception = null;
  377.             try {
  378.                 // try to fill the read ahead buffer.
  379.                 // if a reader is waiting, possibly return early.
  380.                 do {
  381.                     read = in.read(arr, off, len);
  382.                     if (read <= 0) {
  383.                         break;
  384.                     }
  385.                     off += read;
  386.                     len -= read;
  387.                 } while (len > 0 && !isWaiting.get());
  388.             } catch (final Throwable ex) {
  389.                 exception = ex;
  390.                 if (ex instanceof Error) {
  391.                     // `readException` may not be reported to the user. Rethrow Error to make sure at least
  392.                     // The user can see Error in UncaughtExceptionHandler.
  393.                     throw (Error) ex;
  394.                 }
  395.             } finally {
  396.                 stateChangeLock.lock();
  397.                 try {
  398.                     readAheadBuffer.limit(off);
  399.                     if (read < 0 || exception instanceof EOFException) {
  400.                         endOfStream = true;
  401.                     } else if (exception != null) {
  402.                         readAborted = true;
  403.                         readException = exception;
  404.                     }
  405.                     readInProgress = false;
  406.                     signalAsyncReadComplete();
  407.                 } finally {
  408.                     stateChangeLock.unlock();
  409.                 }
  410.                 closeUnderlyingInputStreamIfNecessary();
  411.             }
  412.         });
  413.     }

  414.     private void signalAsyncReadComplete() {
  415.         stateChangeLock.lock();
  416.         try {
  417.             asyncReadComplete.signalAll();
  418.         } finally {
  419.             stateChangeLock.unlock();
  420.         }
  421.     }

  422.     @Override
  423.     public long skip(final long n) throws IOException {
  424.         if (n <= 0L) {
  425.             return 0L;
  426.         }
  427.         if (n <= activeBuffer.remaining()) {
  428.             // Only skipping from active buffer is sufficient
  429.             activeBuffer.position((int) n + activeBuffer.position());
  430.             return n;
  431.         }
  432.         stateChangeLock.lock();
  433.         final long skipped;
  434.         try {
  435.             skipped = skipInternal(n);
  436.         } finally {
  437.             stateChangeLock.unlock();
  438.         }
  439.         return skipped;
  440.     }

  441.     /**
  442.      * Internal skip function which should be called only from skip(). The assumption is that the stateChangeLock is already acquired in the caller before
  443.      * calling this function.
  444.      *
  445.      * @param n the number of bytes to be skipped.
  446.      * @return the actual number of bytes skipped.
  447.      * @throws IOException if an I/O error occurs.
  448.      */
  449.     private long skipInternal(final long n) throws IOException {
  450.         if (!stateChangeLock.isLocked()) {
  451.             throw new IllegalStateException("Expected stateChangeLock to be locked");
  452.         }
  453.         waitForAsyncReadComplete();
  454.         if (isEndOfStream()) {
  455.             return 0;
  456.         }
  457.         if (available() >= n) {
  458.             // we can skip from the internal buffers
  459.             int toSkip = (int) n;
  460.             // We need to skip from both active buffer and read ahead buffer
  461.             toSkip -= activeBuffer.remaining();
  462.             if (toSkip <= 0) { // skipping from activeBuffer already handled.
  463.                 throw new IllegalStateException("Expected toSkip > 0, actual: " + toSkip);
  464.             }
  465.             activeBuffer.position(0);
  466.             activeBuffer.flip();
  467.             readAheadBuffer.position(toSkip + readAheadBuffer.position());
  468.             swapBuffers();
  469.             // Trigger async read to emptied read ahead buffer.
  470.             readAsync();
  471.             return n;
  472.         }
  473.         final int skippedBytes = available();
  474.         final long toSkip = n - skippedBytes;
  475.         activeBuffer.position(0);
  476.         activeBuffer.flip();
  477.         readAheadBuffer.position(0);
  478.         readAheadBuffer.flip();
  479.         final long skippedFromInputStream = in.skip(toSkip);
  480.         readAsync();
  481.         return skippedBytes + skippedFromInputStream;
  482.     }

  483.     /**
  484.      * Flips the active and read ahead buffer
  485.      */
  486.     private void swapBuffers() {
  487.         final ByteBuffer temp = activeBuffer;
  488.         activeBuffer = readAheadBuffer;
  489.         readAheadBuffer = temp;
  490.     }

  491.     private void waitForAsyncReadComplete() throws IOException {
  492.         stateChangeLock.lock();
  493.         try {
  494.             isWaiting.set(true);
  495.             // There is only one reader, and one writer, so the writer should signal only once,
  496.             // but a while loop checking the wake-up condition is still needed to avoid spurious wakeups.
  497.             while (readInProgress) {
  498.                 asyncReadComplete.await();
  499.             }
  500.         } catch (final InterruptedException e) {
  501.             final InterruptedIOException iio = new InterruptedIOException(e.getMessage());
  502.             iio.initCause(e);
  503.             throw iio;
  504.         } finally {
  505.             try {
  506.                 isWaiting.set(false);
  507.             } finally {
  508.                 stateChangeLock.unlock();
  509.             }
  510.         }
  511.         checkReadException();
  512.     }
  513. }