001/*
002 * Licensed under the Apache License, Version 2.0 (the "License");
003 * you may not use this file except in compliance with the License.
004 * You may obtain a copy of the License at
005 *
006 *     http://www.apache.org/licenses/LICENSE-2.0
007 *
008 * Unless required by applicable law or agreed to in writing, software
009 * distributed under the License is distributed on an "AS IS" BASIS,
010 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
011 * See the License for the specific language governing permissions and
012 * limitations under the License.
013 */
014package org.apache.commons.io.input;
015
016import static org.apache.commons.io.IOUtils.EOF;
017
018// import javax.annotation.concurrent.GuardedBy;
019import java.io.EOFException;
020import java.io.FilterInputStream;
021import java.io.IOException;
022import java.io.InputStream;
023import java.io.InterruptedIOException;
024import java.nio.ByteBuffer;
025import java.util.Objects;
026import java.util.concurrent.ExecutorService;
027import java.util.concurrent.Executors;
028import java.util.concurrent.TimeUnit;
029import java.util.concurrent.atomic.AtomicBoolean;
030import java.util.concurrent.locks.Condition;
031import java.util.concurrent.locks.ReentrantLock;
032
033import org.apache.commons.io.build.AbstractStreamBuilder;
034
035/**
036 * Implements {@link InputStream} to asynchronously read ahead from an underlying input stream when a specified amount of data has been read from the current
037 * buffer. It does so by maintaining two buffers: an active buffer and a read ahead buffer. The active buffer contains data which should be returned when a
038 * read() call is issued. The read ahead buffer is used to asynchronously read from the underlying input stream. When the current active buffer is exhausted, we
039 * flip the two buffers so that we can start reading from the read ahead buffer without being blocked by disk I/O.
040 * <p>
041 * To build an instance, use {@link Builder}.
042 * </p>
043 * <p>
044 * This class was ported and adapted from Apache Spark commit 933dc6cb7b3de1d8ccaf73d124d6eb95b947ed19.
045 * </p>
046 *
047 * @see Builder
048 * @since 2.9.0
049 */
050public class ReadAheadInputStream extends FilterInputStream {
051
052    // @formatter:off
053    /**
054     * Builds a new {@link ReadAheadInputStream}.
055     *
056     * <p>
057     * For example:
058     * </p>
059     * <pre>{@code
060     * ReadAheadInputStream s = ReadAheadInputStream.builder()
061     *   .setPath(path)
062     *   .setExecutorService(Executors.newSingleThreadExecutor(ReadAheadInputStream::newThread))
063     *   .get();}
064     * </pre>
065     *
066     * @see #get()
067     * @since 2.12.0
068     */
069    // @formatter:on
070    public static class Builder extends AbstractStreamBuilder<ReadAheadInputStream, Builder> {
071
072        private ExecutorService executorService;
073
074        /**
075         * Constructs a new builder of {@link ReadAheadInputStream}.
076         */
077        public Builder() {
078            // empty
079        }
080
081        /**
082         * Builds a new {@link ReadAheadInputStream}.
083         * <p>
084         * You must set an aspect that supports {@link #getInputStream()}, otherwise, this method throws an exception.
085         * </p>
086         * <p>
087         * This builder uses the following aspects:
088         * </p>
089         * <ul>
090         * <li>{@link #getInputStream()} gets the target aspect.</li>
091         * <li>{@link #getBufferSize()}</li>
092         * <li>{@link ExecutorService}</li>
093         * </ul>
094         *
095         * @return a new instance.
096         * @throws IllegalStateException         if the {@code origin} is {@code null}.
097         * @throws UnsupportedOperationException if the origin cannot be converted to an {@link InputStream}.
098         * @throws IOException                   if an I/O error occurs converting to an {@link InputStream} using {@link #getInputStream()}.
099         * @see #getInputStream()
100         * @see #getBufferSize()
101         * @see #getUnchecked()
102         */
103        @Override
104        public ReadAheadInputStream get() throws IOException {
105            return new ReadAheadInputStream(getInputStream(), getBufferSize(), executorService != null ? executorService : newExecutorService(),
106                    executorService == null);
107        }
108
109        /**
110         * Sets the executor service for the read-ahead thread.
111         *
112         * @param executorService the executor service for the read-ahead thread.
113         * @return {@code this} instance.
114         */
115        public Builder setExecutorService(final ExecutorService executorService) {
116            this.executorService = executorService;
117            return this;
118        }
119
120    }
121
122    private static final ThreadLocal<byte[]> BYTE_ARRAY_1 = ThreadLocal.withInitial(() -> new byte[1]);
123
124    /**
125     * Constructs a new {@link Builder}.
126     *
127     * @return a new {@link Builder}.
128     * @since 2.12.0
129     */
130    public static Builder builder() {
131        return new Builder();
132    }
133
134    /**
135     * Constructs a new daemon thread.
136     *
137     * @param r the thread's runnable.
138     * @return a new daemon thread.
139     */
140    private static Thread newDaemonThread(final Runnable r) {
141        final Thread thread = new Thread(r, "commons-io-read-ahead");
142        thread.setDaemon(true);
143        return thread;
144    }
145
146    /**
147     * Constructs a new daemon executor service.
148     *
149     * @return a new daemon executor service.
150     */
151    private static ExecutorService newExecutorService() {
152        return Executors.newSingleThreadExecutor(ReadAheadInputStream::newDaemonThread);
153    }
154
155    private final ReentrantLock stateChangeLock = new ReentrantLock();
156
157    // @GuardedBy("stateChangeLock")
158    private ByteBuffer activeBuffer;
159
160    // @GuardedBy("stateChangeLock")
161    private ByteBuffer readAheadBuffer;
162
163    // @GuardedBy("stateChangeLock")
164    private boolean endOfStream;
165
166    // @GuardedBy("stateChangeLock")
167    // true if async read is in progress
168    private boolean readInProgress;
169
170    // @GuardedBy("stateChangeLock")
171    // true if read is aborted due to an exception in reading from underlying input stream.
172    private boolean readAborted;
173
174    // @GuardedBy("stateChangeLock")
175    private Throwable readException;
176
177    // @GuardedBy("stateChangeLock")
178    // whether the close method is called.
179    private boolean isClosed;
180
181    // @GuardedBy("stateChangeLock")
182    // true when the close method will close the underlying input stream. This is valid only if
183    // `isClosed` is true.
184    private boolean isUnderlyingInputStreamBeingClosed;
185
186    // @GuardedBy("stateChangeLock")
187    // whether there is a read ahead task running,
188    private boolean isReading;
189
190    // Whether there is a reader waiting for data.
191    private final AtomicBoolean isWaiting = new AtomicBoolean();
192
193    private final ExecutorService executorService;
194
195    private final boolean shutdownExecutorService;
196
197    private final Condition asyncReadComplete = stateChangeLock.newCondition();
198
199    /**
200     * Constructs an instance with the specified buffer size and read-ahead threshold
201     *
202     * @param inputStream       The underlying input stream.
203     * @param bufferSizeInBytes The buffer size.
204     * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
205     */
206    @Deprecated
207    public ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes) {
208        this(inputStream, bufferSizeInBytes, newExecutorService(), true);
209    }
210
211    /**
212     * Constructs an instance with the specified buffer size and read-ahead threshold
213     *
214     * @param inputStream       The underlying input stream.
215     * @param bufferSizeInBytes The buffer size.
216     * @param executorService   An executor service for the read-ahead thread.
217     * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
218     */
219    @Deprecated
220    public ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes, final ExecutorService executorService) {
221        this(inputStream, bufferSizeInBytes, executorService, false);
222    }
223
224    /**
225     * Constructs an instance with the specified buffer size and read-ahead threshold
226     *
227     * @param inputStream             The underlying input stream.
228     * @param bufferSizeInBytes       The buffer size.
229     * @param executorService         An executor service for the read-ahead thread.
230     * @param shutdownExecutorService Whether or not to shut down the given ExecutorService on close.
231     */
232    private ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes, final ExecutorService executorService,
233            final boolean shutdownExecutorService) {
234        super(Objects.requireNonNull(inputStream, "inputStream"));
235        if (bufferSizeInBytes <= 0) {
236            throw new IllegalArgumentException("bufferSizeInBytes should be greater than 0, but the value is " + bufferSizeInBytes);
237        }
238        this.executorService = Objects.requireNonNull(executorService, "executorService");
239        this.shutdownExecutorService = shutdownExecutorService;
240        this.activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
241        this.readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
242        this.activeBuffer.flip();
243        this.readAheadBuffer.flip();
244    }
245
246    @Override
247    public int available() throws IOException {
248        stateChangeLock.lock();
249        // Make sure we have no integer overflow.
250        try {
251            return (int) Math.min(Integer.MAX_VALUE, (long) activeBuffer.remaining() + readAheadBuffer.remaining());
252        } finally {
253            stateChangeLock.unlock();
254        }
255    }
256
257    private void checkReadException() throws IOException {
258        if (readAborted) {
259            if (readException instanceof IOException) {
260                throw (IOException) readException;
261            }
262            throw new IOException(readException);
263        }
264    }
265
266    @Override
267    public void close() throws IOException {
268        boolean isSafeToCloseUnderlyingInputStream = false;
269        stateChangeLock.lock();
270        try {
271            if (isClosed) {
272                return;
273            }
274            isClosed = true;
275            if (!isReading) {
276                // Nobody is reading, so we can close the underlying input stream in this method.
277                isSafeToCloseUnderlyingInputStream = true;
278                // Flip this to make sure the read ahead task will not close the underlying input stream.
279                isUnderlyingInputStreamBeingClosed = true;
280            }
281        } finally {
282            stateChangeLock.unlock();
283        }
284
285        if (shutdownExecutorService) {
286            try {
287                executorService.shutdownNow();
288                executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
289            } catch (final InterruptedException e) {
290                final InterruptedIOException iio = new InterruptedIOException(e.getMessage());
291                iio.initCause(e);
292                throw iio;
293            } finally {
294                if (isSafeToCloseUnderlyingInputStream) {
295                    super.close();
296                }
297            }
298        }
299    }
300
301    private void closeUnderlyingInputStreamIfNecessary() {
302        boolean needToCloseUnderlyingInputStream = false;
303        stateChangeLock.lock();
304        try {
305            isReading = false;
306            if (isClosed && !isUnderlyingInputStreamBeingClosed) {
307                // close method cannot close underlyingInputStream because we were reading.
308                needToCloseUnderlyingInputStream = true;
309            }
310        } finally {
311            stateChangeLock.unlock();
312        }
313        if (needToCloseUnderlyingInputStream) {
314            try {
315                super.close();
316            } catch (final IOException ignored) {
317                // TODO Rethrow as UncheckedIOException?
318            }
319        }
320    }
321
322    private boolean isEndOfStream() {
323        return !activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream;
324    }
325
326    @Override
327    public int read() throws IOException {
328        if (activeBuffer.hasRemaining()) {
329            // short path - just get one byte.
330            return activeBuffer.get() & 0xFF;
331        }
332        final byte[] oneByteArray = BYTE_ARRAY_1.get();
333        oneByteArray[0] = 0;
334        return read(oneByteArray, 0, 1) == EOF ? EOF : oneByteArray[0] & 0xFF;
335    }
336
337    @Override
338    public int read(final byte[] b, final int offset, int len) throws IOException {
339        if (offset < 0 || len < 0 || len > b.length - offset) {
340            throw new IndexOutOfBoundsException();
341        }
342        if (len == 0) {
343            return 0;
344        }
345
346        if (!activeBuffer.hasRemaining()) {
347            // No remaining in active buffer - lock and switch to write ahead buffer.
348            stateChangeLock.lock();
349            try {
350                waitForAsyncReadComplete();
351                if (!readAheadBuffer.hasRemaining()) {
352                    // The first read.
353                    readAsync();
354                    waitForAsyncReadComplete();
355                    if (isEndOfStream()) {
356                        return EOF;
357                    }
358                }
359                // Swap the newly read ahead buffer in place of empty active buffer.
360                swapBuffers();
361                // After swapping buffers, trigger another async read for read ahead buffer.
362                readAsync();
363            } finally {
364                stateChangeLock.unlock();
365            }
366        }
367        len = Math.min(len, activeBuffer.remaining());
368        activeBuffer.get(b, offset, len);
369
370        return len;
371    }
372
373    /**
374     * Read data from underlyingInputStream to readAheadBuffer asynchronously.
375     *
376     * @throws IOException if an I/O error occurs.
377     */
378    private void readAsync() throws IOException {
379        stateChangeLock.lock();
380        final byte[] arr;
381        try {
382            arr = readAheadBuffer.array();
383            if (endOfStream || readInProgress) {
384                return;
385            }
386            checkReadException();
387            readAheadBuffer.position(0);
388            readAheadBuffer.flip();
389            readInProgress = true;
390        } finally {
391            stateChangeLock.unlock();
392        }
393        executorService.execute(() -> {
394            stateChangeLock.lock();
395            try {
396                if (isClosed) {
397                    readInProgress = false;
398                    return;
399                }
400                // Flip this so that the close method will not close the underlying input stream when we
401                // are reading.
402                isReading = true;
403            } finally {
404                stateChangeLock.unlock();
405            }
406
407            // Please note that it is safe to release the lock and read into the read ahead buffer
408            // because either of following two conditions will hold:
409            //
410            // 1. The active buffer has data available to read so the reader will not read from the read ahead buffer.
411            //
412            // 2. This is the first time read is called or the active buffer is exhausted, in that case the reader waits
413            // for this async read to complete.
414            //
415            // So there is no race condition in both the situations.
416            int read = 0;
417            int off = 0;
418            int len = arr.length;
419            Throwable exception = null;
420            try {
421                // try to fill the read ahead buffer.
422                // if a reader is waiting, possibly return early.
423                do {
424                    read = in.read(arr, off, len);
425                    if (read <= 0) {
426                        break;
427                    }
428                    off += read;
429                    len -= read;
430                } while (len > 0 && !isWaiting.get());
431            } catch (final Throwable ex) {
432                exception = ex;
433                if (ex instanceof Error) {
434                    // `readException` may not be reported to the user. Rethrow Error to make sure at least
435                    // The user can see Error in UncaughtExceptionHandler.
436                    throw (Error) ex;
437                }
438            } finally {
439                stateChangeLock.lock();
440                try {
441                    readAheadBuffer.limit(off);
442                    if (read < 0 || exception instanceof EOFException) {
443                        endOfStream = true;
444                    } else if (exception != null) {
445                        readAborted = true;
446                        readException = exception;
447                    }
448                    readInProgress = false;
449                    signalAsyncReadComplete();
450                } finally {
451                    stateChangeLock.unlock();
452                }
453                closeUnderlyingInputStreamIfNecessary();
454            }
455        });
456    }
457
458    private void signalAsyncReadComplete() {
459        stateChangeLock.lock();
460        try {
461            asyncReadComplete.signalAll();
462        } finally {
463            stateChangeLock.unlock();
464        }
465    }
466
467    @Override
468    public long skip(final long n) throws IOException {
469        if (n <= 0L) {
470            return 0L;
471        }
472        if (n <= activeBuffer.remaining()) {
473            // Only skipping from active buffer is sufficient
474            activeBuffer.position((int) n + activeBuffer.position());
475            return n;
476        }
477        stateChangeLock.lock();
478        final long skipped;
479        try {
480            skipped = skipInternal(n);
481        } finally {
482            stateChangeLock.unlock();
483        }
484        return skipped;
485    }
486
487    /**
488     * Internal skip function which should be called only from skip(). The assumption is that the stateChangeLock is already acquired in the caller before
489     * calling this function.
490     *
491     * @param n the number of bytes to be skipped.
492     * @return the actual number of bytes skipped.
493     * @throws IOException if an I/O error occurs.
494     */
495    private long skipInternal(final long n) throws IOException {
496        if (!stateChangeLock.isLocked()) {
497            throw new IllegalStateException("Expected stateChangeLock to be locked");
498        }
499        waitForAsyncReadComplete();
500        if (isEndOfStream()) {
501            return 0;
502        }
503        if (available() >= n) {
504            // we can skip from the internal buffers
505            int toSkip = (int) n;
506            // We need to skip from both active buffer and read ahead buffer
507            toSkip -= activeBuffer.remaining();
508            if (toSkip <= 0) { // skipping from activeBuffer already handled.
509                throw new IllegalStateException("Expected toSkip > 0, actual: " + toSkip);
510            }
511            activeBuffer.position(0);
512            activeBuffer.flip();
513            readAheadBuffer.position(toSkip + readAheadBuffer.position());
514            swapBuffers();
515            // Trigger async read to emptied read ahead buffer.
516            readAsync();
517            return n;
518        }
519        final int skippedBytes = available();
520        final long toSkip = n - skippedBytes;
521        activeBuffer.position(0);
522        activeBuffer.flip();
523        readAheadBuffer.position(0);
524        readAheadBuffer.flip();
525        final long skippedFromInputStream = in.skip(toSkip);
526        readAsync();
527        return skippedBytes + skippedFromInputStream;
528    }
529
530    /**
531     * Flips the active and read ahead buffer
532     */
533    private void swapBuffers() {
534        final ByteBuffer temp = activeBuffer;
535        activeBuffer = readAheadBuffer;
536        readAheadBuffer = temp;
537    }
538
539    private void waitForAsyncReadComplete() throws IOException {
540        stateChangeLock.lock();
541        try {
542            isWaiting.set(true);
543            // There is only one reader, and one writer, so the writer should signal only once,
544            // but a while loop checking the wake-up condition is still needed to avoid spurious wakeups.
545            while (readInProgress) {
546                asyncReadComplete.await();
547            }
548        } catch (final InterruptedException e) {
549            final InterruptedIOException iio = new InterruptedIOException(e.getMessage());
550            iio.initCause(e);
551            throw iio;
552        } finally {
553            try {
554                isWaiting.set(false);
555            } finally {
556                stateChangeLock.unlock();
557            }
558        }
559        checkReadException();
560    }
561}