ReadAheadInputStream.java

  1. /*
  2.  * Licensed under the Apache License, Version 2.0 (the "License");
  3.  * you may not use this file except in compliance with the License.
  4.  * You may obtain a copy of the License at
  5.  *
  6.  *     https://www.apache.org/licenses/LICENSE-2.0
  7.  *
  8.  * Unless required by applicable law or agreed to in writing, software
  9.  * distributed under the License is distributed on an "AS IS" BASIS,
  10.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11.  * See the License for the specific language governing permissions and
  12.  * limitations under the License.
  13.  */
  14. package org.apache.commons.io.input;

  15. import static org.apache.commons.io.IOUtils.EOF;

  16. // import javax.annotation.concurrent.GuardedBy;
  17. import java.io.EOFException;
  18. import java.io.FilterInputStream;
  19. import java.io.IOException;
  20. import java.io.InputStream;
  21. import java.io.InterruptedIOException;
  22. import java.nio.ByteBuffer;
  23. import java.util.Objects;
  24. import java.util.concurrent.ExecutorService;
  25. import java.util.concurrent.Executors;
  26. import java.util.concurrent.TimeUnit;
  27. import java.util.concurrent.atomic.AtomicBoolean;
  28. import java.util.concurrent.locks.Condition;
  29. import java.util.concurrent.locks.ReentrantLock;

  30. import org.apache.commons.io.build.AbstractStreamBuilder;

  31. /**
  32.  * Implements {@link InputStream} to asynchronously read ahead from an underlying input stream when a specified amount of data has been read from the current
  33.  * buffer. It does so by maintaining two buffers: an active buffer and a read ahead buffer. The active buffer contains data which should be returned when a
  34.  * read() call is issued. The read ahead buffer is used to asynchronously read from the underlying input stream. When the current active buffer is exhausted, we
  35.  * flip the two buffers so that we can start reading from the read ahead buffer without being blocked by disk I/O.
  36.  * <p>
  37.  * To build an instance, use {@link Builder}.
  38.  * </p>
  39.  * <p>
  40.  * This class was ported and adapted from Apache Spark commit 933dc6cb7b3de1d8ccaf73d124d6eb95b947ed19.
  41.  * </p>
  42.  *
  43.  * @see Builder
  44.  * @since 2.9.0
  45.  */
  46. public class ReadAheadInputStream extends FilterInputStream {

  47.     // @formatter:off
  48.     /**
  49.      * Builds a new {@link ReadAheadInputStream}.
  50.      *
  51.      * <p>
  52.      * For example:
  53.      * </p>
  54.      * <pre>{@code
  55.      * ReadAheadInputStream s = ReadAheadInputStream.builder()
  56.      *   .setPath(path)
  57.      *   .setExecutorService(Executors.newSingleThreadExecutor(ReadAheadInputStream::newThread))
  58.      *   .get();}
  59.      * </pre>
  60.      *
  61.      * @see #get()
  62.      * @since 2.12.0
  63.      */
  64.     // @formatter:on
  65.     public static class Builder extends AbstractStreamBuilder<ReadAheadInputStream, Builder> {

  66.         private ExecutorService executorService;

  67.         /**
  68.          * Constructs a new builder of {@link ReadAheadInputStream}.
  69.          */
  70.         public Builder() {
  71.             // empty
  72.         }

  73.         /**
  74.          * Builds a new {@link ReadAheadInputStream}.
  75.          * <p>
  76.          * You must set an aspect that supports {@link #getInputStream()}, otherwise, this method throws an exception.
  77.          * </p>
  78.          * <p>
  79.          * This builder uses the following aspects:
  80.          * </p>
  81.          * <ul>
  82.          * <li>{@link #getInputStream()} gets the target aspect.</li>
  83.          * <li>{@link #getBufferSize()}</li>
  84.          * <li>{@link ExecutorService}</li>
  85.          * </ul>
  86.          *
  87.          * @return a new instance.
  88.          * @throws IllegalStateException         if the {@code origin} is {@code null}.
  89.          * @throws UnsupportedOperationException if the origin cannot be converted to an {@link InputStream}.
  90.          * @throws IOException                   if an I/O error occurs converting to an {@link InputStream} using {@link #getInputStream()}.
  91.          * @see #getInputStream()
  92.          * @see #getBufferSize()
  93.          * @see #getUnchecked()
  94.          */
  95.         @Override
  96.         public ReadAheadInputStream get() throws IOException {
  97.             return new ReadAheadInputStream(this);
  98.         }

  99.         /**
  100.          * Sets the executor service for the read-ahead thread.
  101.          *
  102.          * @param executorService the executor service for the read-ahead thread.
  103.          * @return {@code this} instance.
  104.          */
  105.         public Builder setExecutorService(final ExecutorService executorService) {
  106.             this.executorService = executorService;
  107.             return this;
  108.         }

  109.     }

  110.     private static final ThreadLocal<byte[]> BYTE_ARRAY_1 = ThreadLocal.withInitial(() -> new byte[1]);

  111.     /**
  112.      * Constructs a new {@link Builder}.
  113.      *
  114.      * @return a new {@link Builder}.
  115.      * @since 2.12.0
  116.      */
  117.     public static Builder builder() {
  118.         return new Builder();
  119.     }

  120.     /**
  121.      * Constructs a new daemon thread.
  122.      *
  123.      * @param r the thread's runnable.
  124.      * @return a new daemon thread.
  125.      */
  126.     private static Thread newDaemonThread(final Runnable r) {
  127.         final Thread thread = new Thread(r, "commons-io-read-ahead");
  128.         thread.setDaemon(true);
  129.         return thread;
  130.     }

  131.     /**
  132.      * Constructs a new daemon executor service.
  133.      *
  134.      * @return a new daemon executor service.
  135.      */
  136.     private static ExecutorService newExecutorService() {
  137.         return Executors.newSingleThreadExecutor(ReadAheadInputStream::newDaemonThread);
  138.     }

  139.     private final ReentrantLock stateChangeLock = new ReentrantLock();

  140.     // @GuardedBy("stateChangeLock")
  141.     private ByteBuffer activeBuffer;

  142.     // @GuardedBy("stateChangeLock")
  143.     private ByteBuffer readAheadBuffer;

  144.     // @GuardedBy("stateChangeLock")
  145.     private boolean endOfStream;

  146.     // @GuardedBy("stateChangeLock")
  147.     // true if async read is in progress
  148.     private boolean readInProgress;

  149.     // @GuardedBy("stateChangeLock")
  150.     // true if read is aborted due to an exception in reading from underlying input stream.
  151.     private boolean readAborted;

  152.     // @GuardedBy("stateChangeLock")
  153.     private Throwable readException;

  154.     // @GuardedBy("stateChangeLock")
  155.     // whether the close method is called.
  156.     private boolean isClosed;

  157.     // @GuardedBy("stateChangeLock")
  158.     // true when the close method will close the underlying input stream. This is valid only if
  159.     // `isClosed` is true.
  160.     private boolean isUnderlyingInputStreamBeingClosed;

  161.     // @GuardedBy("stateChangeLock")
  162.     // whether there is a read ahead task running,
  163.     private boolean isReading;

  164.     // Whether there is a reader waiting for data.
  165.     private final AtomicBoolean isWaiting = new AtomicBoolean();

  166.     private final ExecutorService executorService;

  167.     private final boolean shutdownExecutorService;

  168.     private final Condition asyncReadComplete = stateChangeLock.newCondition();

  169.     @SuppressWarnings("resource")
  170.     private ReadAheadInputStream(final Builder builder) throws IOException {
  171.         this(builder.getInputStream(), builder.getBufferSize(), builder.executorService != null ? builder.executorService : newExecutorService(),
  172.                 builder.executorService == null);
  173.     }

  174.     /**
  175.      * Constructs an instance with the specified buffer size and read-ahead threshold
  176.      *
  177.      * @param inputStream       The underlying input stream.
  178.      * @param bufferSizeInBytes The buffer size.
  179.      * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
  180.      */
  181.     @Deprecated
  182.     public ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes) {
  183.         this(inputStream, bufferSizeInBytes, newExecutorService(), true);
  184.     }

  185.     /**
  186.      * Constructs an instance with the specified buffer size and read-ahead threshold
  187.      *
  188.      * @param inputStream       The underlying input stream.
  189.      * @param bufferSizeInBytes The buffer size.
  190.      * @param executorService   An executor service for the read-ahead thread.
  191.      * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
  192.      */
  193.     @Deprecated
  194.     public ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes, final ExecutorService executorService) {
  195.         this(inputStream, bufferSizeInBytes, executorService, false);
  196.     }

  197.     /**
  198.      * Constructs an instance with the specified buffer size and read-ahead threshold
  199.      *
  200.      * @param inputStream             The underlying input stream.
  201.      * @param bufferSizeInBytes       The buffer size.
  202.      * @param executorService         An executor service for the read-ahead thread.
  203.      * @param shutdownExecutorService Whether or not to shut down the given ExecutorService on close.
  204.      */
  205.     private ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes, final ExecutorService executorService,
  206.             final boolean shutdownExecutorService) {
  207.         super(Objects.requireNonNull(inputStream, "inputStream"));
  208.         if (bufferSizeInBytes <= 0) {
  209.             throw new IllegalArgumentException("bufferSizeInBytes should be greater than 0, but the value is " + bufferSizeInBytes);
  210.         }
  211.         this.executorService = Objects.requireNonNull(executorService, "executorService");
  212.         this.shutdownExecutorService = shutdownExecutorService;
  213.         this.activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
  214.         this.readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
  215.         this.activeBuffer.flip();
  216.         this.readAheadBuffer.flip();
  217.     }

  218.     @Override
  219.     public int available() throws IOException {
  220.         stateChangeLock.lock();
  221.         // Make sure we have no integer overflow.
  222.         try {
  223.             return (int) Math.min(Integer.MAX_VALUE, (long) activeBuffer.remaining() + readAheadBuffer.remaining());
  224.         } finally {
  225.             stateChangeLock.unlock();
  226.         }
  227.     }

  228.     private void checkReadException() throws IOException {
  229.         if (readAborted) {
  230.             if (readException instanceof IOException) {
  231.                 throw (IOException) readException;
  232.             }
  233.             throw new IOException(readException);
  234.         }
  235.     }

  236.     @Override
  237.     public void close() throws IOException {
  238.         boolean isSafeToCloseUnderlyingInputStream = false;
  239.         stateChangeLock.lock();
  240.         try {
  241.             if (isClosed) {
  242.                 return;
  243.             }
  244.             isClosed = true;
  245.             if (!isReading) {
  246.                 // Nobody is reading, so we can close the underlying input stream in this method.
  247.                 isSafeToCloseUnderlyingInputStream = true;
  248.                 // Flip this to make sure the read ahead task will not close the underlying input stream.
  249.                 isUnderlyingInputStreamBeingClosed = true;
  250.             }
  251.         } finally {
  252.             stateChangeLock.unlock();
  253.         }

  254.         if (shutdownExecutorService) {
  255.             try {
  256.                 executorService.shutdownNow();
  257.                 executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
  258.             } catch (final InterruptedException e) {
  259.                 final InterruptedIOException iio = new InterruptedIOException(e.getMessage());
  260.                 iio.initCause(e);
  261.                 throw iio;
  262.             } finally {
  263.                 if (isSafeToCloseUnderlyingInputStream) {
  264.                     super.close();
  265.                 }
  266.             }
  267.         }
  268.     }

  269.     private void closeUnderlyingInputStreamIfNecessary() {
  270.         boolean needToCloseUnderlyingInputStream = false;
  271.         stateChangeLock.lock();
  272.         try {
  273.             isReading = false;
  274.             if (isClosed && !isUnderlyingInputStreamBeingClosed) {
  275.                 // close method cannot close underlyingInputStream because we were reading.
  276.                 needToCloseUnderlyingInputStream = true;
  277.             }
  278.         } finally {
  279.             stateChangeLock.unlock();
  280.         }
  281.         if (needToCloseUnderlyingInputStream) {
  282.             try {
  283.                 super.close();
  284.             } catch (final IOException ignored) {
  285.                 // TODO Rethrow as UncheckedIOException?
  286.             }
  287.         }
  288.     }

  289.     private boolean isEndOfStream() {
  290.         return !activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream;
  291.     }

  292.     @Override
  293.     public int read() throws IOException {
  294.         if (activeBuffer.hasRemaining()) {
  295.             // short path - just get one byte.
  296.             return activeBuffer.get() & 0xFF;
  297.         }
  298.         final byte[] oneByteArray = BYTE_ARRAY_1.get();
  299.         oneByteArray[0] = 0;
  300.         return read(oneByteArray, 0, 1) == EOF ? EOF : oneByteArray[0] & 0xFF;
  301.     }

  302.     @Override
  303.     public int read(final byte[] b, final int offset, int len) throws IOException {
  304.         if (offset < 0 || len < 0 || len > b.length - offset) {
  305.             throw new IndexOutOfBoundsException();
  306.         }
  307.         if (len == 0) {
  308.             return 0;
  309.         }

  310.         if (!activeBuffer.hasRemaining()) {
  311.             // No remaining in active buffer - lock and switch to write ahead buffer.
  312.             stateChangeLock.lock();
  313.             try {
  314.                 waitForAsyncReadComplete();
  315.                 if (!readAheadBuffer.hasRemaining()) {
  316.                     // The first read.
  317.                     readAsync();
  318.                     waitForAsyncReadComplete();
  319.                     if (isEndOfStream()) {
  320.                         return EOF;
  321.                     }
  322.                 }
  323.                 // Swap the newly read ahead buffer in place of empty active buffer.
  324.                 swapBuffers();
  325.                 // After swapping buffers, trigger another async read for read ahead buffer.
  326.                 readAsync();
  327.             } finally {
  328.                 stateChangeLock.unlock();
  329.             }
  330.         }
  331.         len = Math.min(len, activeBuffer.remaining());
  332.         activeBuffer.get(b, offset, len);

  333.         return len;
  334.     }

  335.     /**
  336.      * Reads data from underlyingInputStream to readAheadBuffer asynchronously.
  337.      *
  338.      * @throws IOException if an I/O error occurs.
  339.      */
  340.     private void readAsync() throws IOException {
  341.         stateChangeLock.lock();
  342.         final byte[] arr;
  343.         try {
  344.             arr = readAheadBuffer.array();
  345.             if (endOfStream || readInProgress) {
  346.                 return;
  347.             }
  348.             checkReadException();
  349.             readAheadBuffer.position(0);
  350.             readAheadBuffer.flip();
  351.             readInProgress = true;
  352.         } finally {
  353.             stateChangeLock.unlock();
  354.         }
  355.         executorService.execute(() -> {
  356.             stateChangeLock.lock();
  357.             try {
  358.                 if (isClosed) {
  359.                     readInProgress = false;
  360.                     return;
  361.                 }
  362.                 // Flip this so that the close method will not close the underlying input stream when we
  363.                 // are reading.
  364.                 isReading = true;
  365.             } finally {
  366.                 stateChangeLock.unlock();
  367.             }

  368.             // Please note that it is safe to release the lock and read into the read ahead buffer
  369.             // because either of following two conditions will hold:
  370.             //
  371.             // 1. The active buffer has data available to read so the reader will not read from the read ahead buffer.
  372.             //
  373.             // 2. This is the first time read is called or the active buffer is exhausted, in that case the reader waits
  374.             // for this async read to complete.
  375.             //
  376.             // So there is no race condition in both the situations.
  377.             int read = 0;
  378.             int off = 0;
  379.             int len = arr.length;
  380.             Throwable exception = null;
  381.             try {
  382.                 // try to fill the read ahead buffer.
  383.                 // if a reader is waiting, possibly return early.
  384.                 do {
  385.                     read = in.read(arr, off, len);
  386.                     if (read <= 0) {
  387.                         break;
  388.                     }
  389.                     off += read;
  390.                     len -= read;
  391.                 } while (len > 0 && !isWaiting.get());
  392.             } catch (final Throwable ex) {
  393.                 exception = ex;
  394.                 if (ex instanceof Error) {
  395.                     // `readException` may not be reported to the user. Rethrow Error to make sure at least
  396.                     // The user can see Error in UncaughtExceptionHandler.
  397.                     throw (Error) ex;
  398.                 }
  399.             } finally {
  400.                 stateChangeLock.lock();
  401.                 try {
  402.                     readAheadBuffer.limit(off);
  403.                     if (read < 0 || exception instanceof EOFException) {
  404.                         endOfStream = true;
  405.                     } else if (exception != null) {
  406.                         readAborted = true;
  407.                         readException = exception;
  408.                     }
  409.                     readInProgress = false;
  410.                     signalAsyncReadComplete();
  411.                 } finally {
  412.                     stateChangeLock.unlock();
  413.                 }
  414.                 closeUnderlyingInputStreamIfNecessary();
  415.             }
  416.         });
  417.     }

  418.     private void signalAsyncReadComplete() {
  419.         stateChangeLock.lock();
  420.         try {
  421.             asyncReadComplete.signalAll();
  422.         } finally {
  423.             stateChangeLock.unlock();
  424.         }
  425.     }

  426.     @Override
  427.     public long skip(final long n) throws IOException {
  428.         if (n <= 0L) {
  429.             return 0L;
  430.         }
  431.         if (n <= activeBuffer.remaining()) {
  432.             // Only skipping from active buffer is sufficient
  433.             activeBuffer.position((int) n + activeBuffer.position());
  434.             return n;
  435.         }
  436.         stateChangeLock.lock();
  437.         final long skipped;
  438.         try {
  439.             skipped = skipInternal(n);
  440.         } finally {
  441.             stateChangeLock.unlock();
  442.         }
  443.         return skipped;
  444.     }

  445.     /**
  446.      * Internal skip function which should be called only from skip(). The assumption is that the stateChangeLock is already acquired in the caller before
  447.      * calling this function.
  448.      *
  449.      * @param n the number of bytes to be skipped.
  450.      * @return the actual number of bytes skipped.
  451.      * @throws IOException if an I/O error occurs.
  452.      */
  453.     private long skipInternal(final long n) throws IOException {
  454.         if (!stateChangeLock.isLocked()) {
  455.             throw new IllegalStateException("Expected stateChangeLock to be locked");
  456.         }
  457.         waitForAsyncReadComplete();
  458.         if (isEndOfStream()) {
  459.             return 0;
  460.         }
  461.         if (available() >= n) {
  462.             // we can skip from the internal buffers
  463.             int toSkip = (int) n;
  464.             // We need to skip from both active buffer and read ahead buffer
  465.             toSkip -= activeBuffer.remaining();
  466.             if (toSkip <= 0) { // skipping from activeBuffer already handled.
  467.                 throw new IllegalStateException("Expected toSkip > 0, actual: " + toSkip);
  468.             }
  469.             activeBuffer.position(0);
  470.             activeBuffer.flip();
  471.             readAheadBuffer.position(toSkip + readAheadBuffer.position());
  472.             swapBuffers();
  473.             // Trigger async read to emptied read ahead buffer.
  474.             readAsync();
  475.             return n;
  476.         }
  477.         final int skippedBytes = available();
  478.         final long toSkip = n - skippedBytes;
  479.         activeBuffer.position(0);
  480.         activeBuffer.flip();
  481.         readAheadBuffer.position(0);
  482.         readAheadBuffer.flip();
  483.         final long skippedFromInputStream = in.skip(toSkip);
  484.         readAsync();
  485.         return skippedBytes + skippedFromInputStream;
  486.     }

  487.     /**
  488.      * Flips the active and read ahead buffer
  489.      */
  490.     private void swapBuffers() {
  491.         final ByteBuffer temp = activeBuffer;
  492.         activeBuffer = readAheadBuffer;
  493.         readAheadBuffer = temp;
  494.     }

  495.     private void waitForAsyncReadComplete() throws IOException {
  496.         stateChangeLock.lock();
  497.         try {
  498.             isWaiting.set(true);
  499.             // There is only one reader, and one writer, so the writer should signal only once,
  500.             // but a while loop checking the wake-up condition is still needed to avoid spurious wakeups.
  501.             while (readInProgress) {
  502.                 asyncReadComplete.await();
  503.             }
  504.         } catch (final InterruptedException e) {
  505.             final InterruptedIOException iio = new InterruptedIOException(e.getMessage());
  506.             iio.initCause(e);
  507.             throw iio;
  508.         } finally {
  509.             try {
  510.                 isWaiting.set(false);
  511.             } finally {
  512.                 stateChangeLock.unlock();
  513.             }
  514.         }
  515.         checkReadException();
  516.     }
  517. }