001/*
002 * Licensed under the Apache License, Version 2.0 (the "License");
003 * you may not use this file except in compliance with the License.
004 * You may obtain a copy of the License at
005 *
006 *     https://www.apache.org/licenses/LICENSE-2.0
007 *
008 * Unless required by applicable law or agreed to in writing, software
009 * distributed under the License is distributed on an "AS IS" BASIS,
010 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
011 * See the License for the specific language governing permissions and
012 * limitations under the License.
013 */
014package org.apache.commons.io.input;
015
016import static org.apache.commons.io.IOUtils.EOF;
017
018// import javax.annotation.concurrent.GuardedBy;
019import java.io.EOFException;
020import java.io.FilterInputStream;
021import java.io.IOException;
022import java.io.InputStream;
023import java.io.InterruptedIOException;
024import java.nio.ByteBuffer;
025import java.util.Objects;
026import java.util.concurrent.ExecutorService;
027import java.util.concurrent.Executors;
028import java.util.concurrent.TimeUnit;
029import java.util.concurrent.atomic.AtomicBoolean;
030import java.util.concurrent.locks.Condition;
031import java.util.concurrent.locks.ReentrantLock;
032
033import org.apache.commons.io.IOUtils;
034import org.apache.commons.io.build.AbstractStreamBuilder;
035
036/**
037 * Implements {@link InputStream} to asynchronously read ahead from an underlying input stream when a specified amount of data has been read from the current
038 * buffer. It does so by maintaining two buffers: an active buffer and a read ahead buffer. The active buffer contains data which should be returned when a
039 * read() call is issued. The read ahead buffer is used to asynchronously read from the underlying input stream. When the current active buffer is exhausted, we
040 * flip the two buffers so that we can start reading from the read ahead buffer without being blocked by disk I/O.
041 * <p>
042 * To build an instance, use {@link Builder}.
043 * </p>
044 * <p>
045 * This class was ported and adapted from Apache Spark commit 933dc6cb7b3de1d8ccaf73d124d6eb95b947ed19.
046 * </p>
047 *
048 * @see Builder
049 * @since 2.9.0
050 */
051public class ReadAheadInputStream extends FilterInputStream {
052
053    // @formatter:off
054    /**
055     * Builds a new {@link ReadAheadInputStream}.
056     *
057     * <p>
058     * For example:
059     * </p>
060     * <pre>{@code
061     * ReadAheadInputStream s = ReadAheadInputStream.builder()
062     *   .setPath(path)
063     *   .setExecutorService(Executors.newSingleThreadExecutor(ReadAheadInputStream::newThread))
064     *   .get();}
065     * </pre>
066     *
067     * @see #get()
068     * @since 2.12.0
069     */
070    // @formatter:on
071    public static class Builder extends AbstractStreamBuilder<ReadAheadInputStream, Builder> {
072
073        private ExecutorService executorService;
074
075        /**
076         * Constructs a new builder of {@link ReadAheadInputStream}.
077         */
078        public Builder() {
079            // empty
080        }
081
082        /**
083         * Builds a new {@link ReadAheadInputStream}.
084         * <p>
085         * You must set an aspect that supports {@link #getInputStream()}, otherwise, this method throws an exception.
086         * </p>
087         * <p>
088         * This builder uses the following aspects:
089         * </p>
090         * <ul>
091         * <li>{@link #getInputStream()} gets the target aspect.</li>
092         * <li>{@link #getBufferSize()}</li>
093         * <li>{@link ExecutorService}</li>
094         * </ul>
095         *
096         * @return a new instance.
097         * @throws IllegalStateException         if the {@code origin} is {@code null}.
098         * @throws UnsupportedOperationException if the origin cannot be converted to an {@link InputStream}.
099         * @throws IOException                   if an I/O error occurs converting to an {@link InputStream} using {@link #getInputStream()}.
100         * @see #getInputStream()
101         * @see #getBufferSize()
102         * @see #getUnchecked()
103         */
104        @Override
105        public ReadAheadInputStream get() throws IOException {
106            return new ReadAheadInputStream(this);
107        }
108
109        /**
110         * Sets the executor service for the read-ahead thread.
111         *
112         * @param executorService the executor service for the read-ahead thread.
113         * @return {@code this} instance.
114         */
115        public Builder setExecutorService(final ExecutorService executorService) {
116            this.executorService = executorService;
117            return this;
118        }
119
120    }
121
122    private static final ThreadLocal<byte[]> BYTE_ARRAY_1 = ThreadLocal.withInitial(() -> new byte[1]);
123
124    /**
125     * Constructs a new {@link Builder}.
126     *
127     * @return a new {@link Builder}.
128     * @since 2.12.0
129     */
130    public static Builder builder() {
131        return new Builder();
132    }
133
134    /**
135     * Constructs a new daemon thread.
136     *
137     * @param r the thread's runnable.
138     * @return a new daemon thread.
139     */
140    private static Thread newDaemonThread(final Runnable r) {
141        final Thread thread = new Thread(r, "commons-io-read-ahead");
142        thread.setDaemon(true);
143        return thread;
144    }
145
146    /**
147     * Constructs a new daemon executor service.
148     *
149     * @return a new daemon executor service.
150     */
151    private static ExecutorService newExecutorService() {
152        return Executors.newSingleThreadExecutor(ReadAheadInputStream::newDaemonThread);
153    }
154
155    private final ReentrantLock stateChangeLock = new ReentrantLock();
156
157    // @GuardedBy("stateChangeLock")
158    private ByteBuffer activeBuffer;
159
160    // @GuardedBy("stateChangeLock")
161    private ByteBuffer readAheadBuffer;
162
163    // @GuardedBy("stateChangeLock")
164    private boolean endOfStream;
165
166    // @GuardedBy("stateChangeLock")
167    // true if async read is in progress
168    private boolean readInProgress;
169
170    // @GuardedBy("stateChangeLock")
171    // true if read is aborted due to an exception in reading from underlying input stream.
172    private boolean readAborted;
173
174    // @GuardedBy("stateChangeLock")
175    private Throwable readException;
176
177    // @GuardedBy("stateChangeLock")
178    // whether the close method is called.
179    private boolean isClosed;
180
181    // @GuardedBy("stateChangeLock")
182    // true when the close method will close the underlying input stream. This is valid only if
183    // `isClosed` is true.
184    private boolean isUnderlyingInputStreamBeingClosed;
185
186    // @GuardedBy("stateChangeLock")
187    // whether there is a read ahead task running,
188    private boolean isReading;
189
190    // Whether there is a reader waiting for data.
191    private final AtomicBoolean isWaiting = new AtomicBoolean();
192
193    private final ExecutorService executorService;
194
195    private final boolean shutdownExecutorService;
196
197    private final Condition asyncReadComplete = stateChangeLock.newCondition();
198
199    @SuppressWarnings("resource")
200    private ReadAheadInputStream(final Builder builder) throws IOException {
201        this(builder.getInputStream(), builder.getBufferSize(), builder.executorService != null ? builder.executorService : newExecutorService(),
202                builder.executorService == null);
203    }
204
205    /**
206     * Constructs an instance with the specified buffer size and read-ahead threshold
207     *
208     * @param inputStream       The underlying input stream.
209     * @param bufferSizeInBytes The buffer size.
210     * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
211     */
212    @Deprecated
213    public ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes) {
214        this(inputStream, bufferSizeInBytes, newExecutorService(), true);
215    }
216
217    /**
218     * Constructs an instance with the specified buffer size and read-ahead threshold
219     *
220     * @param inputStream       The underlying input stream.
221     * @param bufferSizeInBytes The buffer size.
222     * @param executorService   An executor service for the read-ahead thread.
223     * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
224     */
225    @Deprecated
226    public ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes, final ExecutorService executorService) {
227        this(inputStream, bufferSizeInBytes, executorService, false);
228    }
229
230    /**
231     * Constructs an instance with the specified buffer size and read-ahead threshold
232     *
233     * @param inputStream             The underlying input stream.
234     * @param bufferSizeInBytes       The buffer size.
235     * @param executorService         An executor service for the read-ahead thread.
236     * @param shutdownExecutorService Whether or not to shut down the given ExecutorService on close.
237     */
238    private ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes, final ExecutorService executorService,
239            final boolean shutdownExecutorService) {
240        super(Objects.requireNonNull(inputStream, "inputStream"));
241        if (bufferSizeInBytes <= 0) {
242            throw new IllegalArgumentException(String.format("bufferSizeInBytes <= 0, bufferSizeInBytes = %,d", bufferSizeInBytes));
243        }
244        this.executorService = Objects.requireNonNull(executorService, "executorService");
245        this.shutdownExecutorService = shutdownExecutorService;
246        this.activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
247        this.readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
248        this.activeBuffer.flip();
249        this.readAheadBuffer.flip();
250    }
251
252    @Override
253    public int available() throws IOException {
254        stateChangeLock.lock();
255        // Make sure we have no integer overflow.
256        try {
257            return (int) Math.min(Integer.MAX_VALUE, (long) activeBuffer.remaining() + readAheadBuffer.remaining());
258        } finally {
259            stateChangeLock.unlock();
260        }
261    }
262
263    private void checkReadException() throws IOException {
264        if (readAborted) {
265            if (readException instanceof IOException) {
266                throw (IOException) readException;
267            }
268            throw new IOException(readException);
269        }
270    }
271
272    @Override
273    public void close() throws IOException {
274        boolean isSafeToCloseUnderlyingInputStream = false;
275        stateChangeLock.lock();
276        try {
277            if (isClosed) {
278                return;
279            }
280            isClosed = true;
281            if (!isReading) {
282                // Nobody is reading, so we can close the underlying input stream in this method.
283                isSafeToCloseUnderlyingInputStream = true;
284                // Flip this to make sure the read ahead task will not close the underlying input stream.
285                isUnderlyingInputStreamBeingClosed = true;
286            }
287        } finally {
288            stateChangeLock.unlock();
289        }
290
291        if (shutdownExecutorService) {
292            try {
293                executorService.shutdownNow();
294                executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
295            } catch (final InterruptedException e) {
296                final InterruptedIOException iio = new InterruptedIOException(e.getMessage());
297                iio.initCause(e);
298                throw iio;
299            } finally {
300                if (isSafeToCloseUnderlyingInputStream) {
301                    super.close();
302                }
303            }
304        }
305    }
306
307    private void closeUnderlyingInputStreamIfNecessary() {
308        boolean needToCloseUnderlyingInputStream = false;
309        stateChangeLock.lock();
310        try {
311            isReading = false;
312            if (isClosed && !isUnderlyingInputStreamBeingClosed) {
313                // close method cannot close underlyingInputStream because we were reading.
314                needToCloseUnderlyingInputStream = true;
315            }
316        } finally {
317            stateChangeLock.unlock();
318        }
319        if (needToCloseUnderlyingInputStream) {
320            try {
321                super.close();
322            } catch (final IOException ignored) {
323                // TODO Rethrow as UncheckedIOException?
324            }
325        }
326    }
327
328    private boolean isEndOfStream() {
329        return !activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream;
330    }
331
332    @Override
333    public int read() throws IOException {
334        if (activeBuffer.hasRemaining()) {
335            // short path - just get one byte.
336            return activeBuffer.get() & 0xFF;
337        }
338        final byte[] oneByteArray = BYTE_ARRAY_1.get();
339        oneByteArray[0] = 0;
340        return read(oneByteArray, 0, 1) == EOF ? EOF : oneByteArray[0] & 0xFF;
341    }
342
343    @Override
344    public int read(final byte[] b, final int offset, int len) throws IOException {
345        IOUtils.checkFromIndexSize(b, offset, len);
346        if (len == 0) {
347            return 0;
348        }
349
350        if (!activeBuffer.hasRemaining()) {
351            // No remaining in active buffer - lock and switch to write ahead buffer.
352            stateChangeLock.lock();
353            try {
354                waitForAsyncReadComplete();
355                if (!readAheadBuffer.hasRemaining()) {
356                    // The first read.
357                    readAsync();
358                    waitForAsyncReadComplete();
359                    if (isEndOfStream()) {
360                        return EOF;
361                    }
362                }
363                // Swap the newly read ahead buffer in place of empty active buffer.
364                swapBuffers();
365                // After swapping buffers, trigger another async read for read ahead buffer.
366                readAsync();
367            } finally {
368                stateChangeLock.unlock();
369            }
370        }
371        len = Math.min(len, activeBuffer.remaining());
372        activeBuffer.get(b, offset, len);
373
374        return len;
375    }
376
377    /**
378     * Reads data from underlyingInputStream to readAheadBuffer asynchronously.
379     *
380     * @throws IOException if an I/O error occurs.
381     */
382    private void readAsync() throws IOException {
383        stateChangeLock.lock();
384        final byte[] arr;
385        try {
386            arr = readAheadBuffer.array();
387            if (endOfStream || readInProgress) {
388                return;
389            }
390            checkReadException();
391            readAheadBuffer.position(0);
392            readAheadBuffer.flip();
393            readInProgress = true;
394        } finally {
395            stateChangeLock.unlock();
396        }
397        executorService.execute(() -> {
398            stateChangeLock.lock();
399            try {
400                if (isClosed) {
401                    readInProgress = false;
402                    return;
403                }
404                // Flip this so that the close method will not close the underlying input stream when we
405                // are reading.
406                isReading = true;
407            } finally {
408                stateChangeLock.unlock();
409            }
410
411            // Please note that it is safe to release the lock and read into the read ahead buffer
412            // because either of following two conditions will hold:
413            //
414            // 1. The active buffer has data available to read so the reader will not read from the read ahead buffer.
415            //
416            // 2. This is the first time read is called or the active buffer is exhausted, in that case the reader waits
417            // for this async read to complete.
418            //
419            // So there is no race condition in both the situations.
420            int read = 0;
421            int off = 0;
422            int len = arr.length;
423            Throwable exception = null;
424            try {
425                // try to fill the read ahead buffer.
426                // if a reader is waiting, possibly return early.
427                do {
428                    read = in.read(arr, off, len);
429                    if (read <= 0) {
430                        break;
431                    }
432                    off += read;
433                    len -= read;
434                } while (len > 0 && !isWaiting.get());
435            } catch (final Throwable ex) {
436                exception = ex;
437                if (ex instanceof Error) {
438                    // `readException` may not be reported to the user. Rethrow Error to make sure at least
439                    // The user can see Error in UncaughtExceptionHandler.
440                    throw (Error) ex;
441                }
442            } finally {
443                stateChangeLock.lock();
444                try {
445                    readAheadBuffer.limit(off);
446                    if (read < 0 || exception instanceof EOFException) {
447                        endOfStream = true;
448                    } else if (exception != null) {
449                        readAborted = true;
450                        readException = exception;
451                    }
452                    readInProgress = false;
453                    signalAsyncReadComplete();
454                } finally {
455                    stateChangeLock.unlock();
456                }
457                closeUnderlyingInputStreamIfNecessary();
458            }
459        });
460    }
461
462    private void signalAsyncReadComplete() {
463        stateChangeLock.lock();
464        try {
465            asyncReadComplete.signalAll();
466        } finally {
467            stateChangeLock.unlock();
468        }
469    }
470
471    @Override
472    public long skip(final long n) throws IOException {
473        if (n <= 0L) {
474            return 0L;
475        }
476        if (n <= activeBuffer.remaining()) {
477            // Only skipping from active buffer is sufficient
478            activeBuffer.position((int) n + activeBuffer.position());
479            return n;
480        }
481        stateChangeLock.lock();
482        final long skipped;
483        try {
484            skipped = skipInternal(n);
485        } finally {
486            stateChangeLock.unlock();
487        }
488        return skipped;
489    }
490
491    /**
492     * Internal skip function which should be called only from skip(). The assumption is that the stateChangeLock is already acquired in the caller before
493     * calling this function.
494     *
495     * @param n the number of bytes to be skipped.
496     * @return the actual number of bytes skipped.
497     * @throws IOException if an I/O error occurs.
498     */
499    private long skipInternal(final long n) throws IOException {
500        if (!stateChangeLock.isLocked()) {
501            throw new IllegalStateException("Expected stateChangeLock to be locked");
502        }
503        waitForAsyncReadComplete();
504        if (isEndOfStream()) {
505            return 0;
506        }
507        if (available() >= n) {
508            // we can skip from the internal buffers
509            int toSkip = (int) n;
510            // We need to skip from both active buffer and read ahead buffer
511            toSkip -= activeBuffer.remaining();
512            if (toSkip <= 0) { // skipping from activeBuffer already handled.
513                throw new IllegalStateException("Expected toSkip > 0, actual: " + toSkip);
514            }
515            activeBuffer.position(0);
516            activeBuffer.flip();
517            readAheadBuffer.position(toSkip + readAheadBuffer.position());
518            swapBuffers();
519            // Trigger async read to emptied read ahead buffer.
520            readAsync();
521            return n;
522        }
523        final int skippedBytes = available();
524        final long toSkip = n - skippedBytes;
525        activeBuffer.position(0);
526        activeBuffer.flip();
527        readAheadBuffer.position(0);
528        readAheadBuffer.flip();
529        final long skippedFromInputStream = in.skip(toSkip);
530        readAsync();
531        return skippedBytes + skippedFromInputStream;
532    }
533
534    /**
535     * Flips the active and read ahead buffer
536     */
537    private void swapBuffers() {
538        final ByteBuffer temp = activeBuffer;
539        activeBuffer = readAheadBuffer;
540        readAheadBuffer = temp;
541    }
542
543    private void waitForAsyncReadComplete() throws IOException {
544        stateChangeLock.lock();
545        try {
546            isWaiting.set(true);
547            // There is only one reader, and one writer, so the writer should signal only once,
548            // but a while loop checking the wake-up condition is still needed to avoid spurious wakeups.
549            while (readInProgress) {
550                asyncReadComplete.await();
551            }
552        } catch (final InterruptedException e) {
553            final InterruptedIOException iio = new InterruptedIOException(e.getMessage());
554            iio.initCause(e);
555            throw iio;
556        } finally {
557            try {
558                isWaiting.set(false);
559            } finally {
560                stateChangeLock.unlock();
561            }
562        }
563        checkReadException();
564    }
565}