001/*
002 * Licensed under the Apache License, Version 2.0 (the "License");
003 * you may not use this file except in compliance with the License.
004 * You may obtain a copy of the License at
005 *
006 *     https://www.apache.org/licenses/LICENSE-2.0
007 *
008 * Unless required by applicable law or agreed to in writing, software
009 * distributed under the License is distributed on an "AS IS" BASIS,
010 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
011 * See the License for the specific language governing permissions and
012 * limitations under the License.
013 */
014package org.apache.commons.io.input;
015
016import static org.apache.commons.io.IOUtils.EOF;
017
018// import javax.annotation.concurrent.GuardedBy;
019import java.io.EOFException;
020import java.io.FilterInputStream;
021import java.io.IOException;
022import java.io.InputStream;
023import java.io.InterruptedIOException;
024import java.nio.ByteBuffer;
025import java.util.Objects;
026import java.util.concurrent.ExecutorService;
027import java.util.concurrent.Executors;
028import java.util.concurrent.TimeUnit;
029import java.util.concurrent.atomic.AtomicBoolean;
030import java.util.concurrent.locks.Condition;
031import java.util.concurrent.locks.ReentrantLock;
032
033import org.apache.commons.io.build.AbstractStreamBuilder;
034
035/**
036 * Implements {@link InputStream} to asynchronously read ahead from an underlying input stream when a specified amount of data has been read from the current
037 * buffer. It does so by maintaining two buffers: an active buffer and a read ahead buffer. The active buffer contains data which should be returned when a
038 * read() call is issued. The read ahead buffer is used to asynchronously read from the underlying input stream. When the current active buffer is exhausted, we
039 * flip the two buffers so that we can start reading from the read ahead buffer without being blocked by disk I/O.
040 * <p>
041 * To build an instance, use {@link Builder}.
042 * </p>
043 * <p>
044 * This class was ported and adapted from Apache Spark commit 933dc6cb7b3de1d8ccaf73d124d6eb95b947ed19.
045 * </p>
046 *
047 * @see Builder
048 * @since 2.9.0
049 */
050public class ReadAheadInputStream extends FilterInputStream {
051
052    // @formatter:off
053    /**
054     * Builds a new {@link ReadAheadInputStream}.
055     *
056     * <p>
057     * For example:
058     * </p>
059     * <pre>{@code
060     * ReadAheadInputStream s = ReadAheadInputStream.builder()
061     *   .setPath(path)
062     *   .setExecutorService(Executors.newSingleThreadExecutor(ReadAheadInputStream::newThread))
063     *   .get();}
064     * </pre>
065     *
066     * @see #get()
067     * @since 2.12.0
068     */
069    // @formatter:on
070    public static class Builder extends AbstractStreamBuilder<ReadAheadInputStream, Builder> {
071
072        private ExecutorService executorService;
073
074        /**
075         * Constructs a new builder of {@link ReadAheadInputStream}.
076         */
077        public Builder() {
078            // empty
079        }
080
081        /**
082         * Builds a new {@link ReadAheadInputStream}.
083         * <p>
084         * You must set an aspect that supports {@link #getInputStream()}, otherwise, this method throws an exception.
085         * </p>
086         * <p>
087         * This builder uses the following aspects:
088         * </p>
089         * <ul>
090         * <li>{@link #getInputStream()} gets the target aspect.</li>
091         * <li>{@link #getBufferSize()}</li>
092         * <li>{@link ExecutorService}</li>
093         * </ul>
094         *
095         * @return a new instance.
096         * @throws IllegalStateException         if the {@code origin} is {@code null}.
097         * @throws UnsupportedOperationException if the origin cannot be converted to an {@link InputStream}.
098         * @throws IOException                   if an I/O error occurs converting to an {@link InputStream} using {@link #getInputStream()}.
099         * @see #getInputStream()
100         * @see #getBufferSize()
101         * @see #getUnchecked()
102         */
103        @Override
104        public ReadAheadInputStream get() throws IOException {
105            return new ReadAheadInputStream(this);
106        }
107
108        /**
109         * Sets the executor service for the read-ahead thread.
110         *
111         * @param executorService the executor service for the read-ahead thread.
112         * @return {@code this} instance.
113         */
114        public Builder setExecutorService(final ExecutorService executorService) {
115            this.executorService = executorService;
116            return this;
117        }
118
119    }
120
121    private static final ThreadLocal<byte[]> BYTE_ARRAY_1 = ThreadLocal.withInitial(() -> new byte[1]);
122
123    /**
124     * Constructs a new {@link Builder}.
125     *
126     * @return a new {@link Builder}.
127     * @since 2.12.0
128     */
129    public static Builder builder() {
130        return new Builder();
131    }
132
133    /**
134     * Constructs a new daemon thread.
135     *
136     * @param r the thread's runnable.
137     * @return a new daemon thread.
138     */
139    private static Thread newDaemonThread(final Runnable r) {
140        final Thread thread = new Thread(r, "commons-io-read-ahead");
141        thread.setDaemon(true);
142        return thread;
143    }
144
145    /**
146     * Constructs a new daemon executor service.
147     *
148     * @return a new daemon executor service.
149     */
150    private static ExecutorService newExecutorService() {
151        return Executors.newSingleThreadExecutor(ReadAheadInputStream::newDaemonThread);
152    }
153
154    private final ReentrantLock stateChangeLock = new ReentrantLock();
155
156    // @GuardedBy("stateChangeLock")
157    private ByteBuffer activeBuffer;
158
159    // @GuardedBy("stateChangeLock")
160    private ByteBuffer readAheadBuffer;
161
162    // @GuardedBy("stateChangeLock")
163    private boolean endOfStream;
164
165    // @GuardedBy("stateChangeLock")
166    // true if async read is in progress
167    private boolean readInProgress;
168
169    // @GuardedBy("stateChangeLock")
170    // true if read is aborted due to an exception in reading from underlying input stream.
171    private boolean readAborted;
172
173    // @GuardedBy("stateChangeLock")
174    private Throwable readException;
175
176    // @GuardedBy("stateChangeLock")
177    // whether the close method is called.
178    private boolean isClosed;
179
180    // @GuardedBy("stateChangeLock")
181    // true when the close method will close the underlying input stream. This is valid only if
182    // `isClosed` is true.
183    private boolean isUnderlyingInputStreamBeingClosed;
184
185    // @GuardedBy("stateChangeLock")
186    // whether there is a read ahead task running,
187    private boolean isReading;
188
189    // Whether there is a reader waiting for data.
190    private final AtomicBoolean isWaiting = new AtomicBoolean();
191
192    private final ExecutorService executorService;
193
194    private final boolean shutdownExecutorService;
195
196    private final Condition asyncReadComplete = stateChangeLock.newCondition();
197
198    @SuppressWarnings("resource")
199    private ReadAheadInputStream(final Builder builder) throws IOException {
200        this(builder.getInputStream(), builder.getBufferSize(), builder.executorService != null ? builder.executorService : newExecutorService(),
201                builder.executorService == null);
202    }
203
204    /**
205     * Constructs an instance with the specified buffer size and read-ahead threshold
206     *
207     * @param inputStream       The underlying input stream.
208     * @param bufferSizeInBytes The buffer size.
209     * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
210     */
211    @Deprecated
212    public ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes) {
213        this(inputStream, bufferSizeInBytes, newExecutorService(), true);
214    }
215
216    /**
217     * Constructs an instance with the specified buffer size and read-ahead threshold
218     *
219     * @param inputStream       The underlying input stream.
220     * @param bufferSizeInBytes The buffer size.
221     * @param executorService   An executor service for the read-ahead thread.
222     * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
223     */
224    @Deprecated
225    public ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes, final ExecutorService executorService) {
226        this(inputStream, bufferSizeInBytes, executorService, false);
227    }
228
229    /**
230     * Constructs an instance with the specified buffer size and read-ahead threshold
231     *
232     * @param inputStream             The underlying input stream.
233     * @param bufferSizeInBytes       The buffer size.
234     * @param executorService         An executor service for the read-ahead thread.
235     * @param shutdownExecutorService Whether or not to shut down the given ExecutorService on close.
236     */
237    private ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes, final ExecutorService executorService,
238            final boolean shutdownExecutorService) {
239        super(Objects.requireNonNull(inputStream, "inputStream"));
240        if (bufferSizeInBytes <= 0) {
241            throw new IllegalArgumentException("bufferSizeInBytes should be greater than 0, but the value is " + bufferSizeInBytes);
242        }
243        this.executorService = Objects.requireNonNull(executorService, "executorService");
244        this.shutdownExecutorService = shutdownExecutorService;
245        this.activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
246        this.readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
247        this.activeBuffer.flip();
248        this.readAheadBuffer.flip();
249    }
250
251    @Override
252    public int available() throws IOException {
253        stateChangeLock.lock();
254        // Make sure we have no integer overflow.
255        try {
256            return (int) Math.min(Integer.MAX_VALUE, (long) activeBuffer.remaining() + readAheadBuffer.remaining());
257        } finally {
258            stateChangeLock.unlock();
259        }
260    }
261
262    private void checkReadException() throws IOException {
263        if (readAborted) {
264            if (readException instanceof IOException) {
265                throw (IOException) readException;
266            }
267            throw new IOException(readException);
268        }
269    }
270
271    @Override
272    public void close() throws IOException {
273        boolean isSafeToCloseUnderlyingInputStream = false;
274        stateChangeLock.lock();
275        try {
276            if (isClosed) {
277                return;
278            }
279            isClosed = true;
280            if (!isReading) {
281                // Nobody is reading, so we can close the underlying input stream in this method.
282                isSafeToCloseUnderlyingInputStream = true;
283                // Flip this to make sure the read ahead task will not close the underlying input stream.
284                isUnderlyingInputStreamBeingClosed = true;
285            }
286        } finally {
287            stateChangeLock.unlock();
288        }
289
290        if (shutdownExecutorService) {
291            try {
292                executorService.shutdownNow();
293                executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
294            } catch (final InterruptedException e) {
295                final InterruptedIOException iio = new InterruptedIOException(e.getMessage());
296                iio.initCause(e);
297                throw iio;
298            } finally {
299                if (isSafeToCloseUnderlyingInputStream) {
300                    super.close();
301                }
302            }
303        }
304    }
305
306    private void closeUnderlyingInputStreamIfNecessary() {
307        boolean needToCloseUnderlyingInputStream = false;
308        stateChangeLock.lock();
309        try {
310            isReading = false;
311            if (isClosed && !isUnderlyingInputStreamBeingClosed) {
312                // close method cannot close underlyingInputStream because we were reading.
313                needToCloseUnderlyingInputStream = true;
314            }
315        } finally {
316            stateChangeLock.unlock();
317        }
318        if (needToCloseUnderlyingInputStream) {
319            try {
320                super.close();
321            } catch (final IOException ignored) {
322                // TODO Rethrow as UncheckedIOException?
323            }
324        }
325    }
326
327    private boolean isEndOfStream() {
328        return !activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream;
329    }
330
331    @Override
332    public int read() throws IOException {
333        if (activeBuffer.hasRemaining()) {
334            // short path - just get one byte.
335            return activeBuffer.get() & 0xFF;
336        }
337        final byte[] oneByteArray = BYTE_ARRAY_1.get();
338        oneByteArray[0] = 0;
339        return read(oneByteArray, 0, 1) == EOF ? EOF : oneByteArray[0] & 0xFF;
340    }
341
342    @Override
343    public int read(final byte[] b, final int offset, int len) throws IOException {
344        if (offset < 0 || len < 0 || len > b.length - offset) {
345            throw new IndexOutOfBoundsException();
346        }
347        if (len == 0) {
348            return 0;
349        }
350
351        if (!activeBuffer.hasRemaining()) {
352            // No remaining in active buffer - lock and switch to write ahead buffer.
353            stateChangeLock.lock();
354            try {
355                waitForAsyncReadComplete();
356                if (!readAheadBuffer.hasRemaining()) {
357                    // The first read.
358                    readAsync();
359                    waitForAsyncReadComplete();
360                    if (isEndOfStream()) {
361                        return EOF;
362                    }
363                }
364                // Swap the newly read ahead buffer in place of empty active buffer.
365                swapBuffers();
366                // After swapping buffers, trigger another async read for read ahead buffer.
367                readAsync();
368            } finally {
369                stateChangeLock.unlock();
370            }
371        }
372        len = Math.min(len, activeBuffer.remaining());
373        activeBuffer.get(b, offset, len);
374
375        return len;
376    }
377
378    /**
379     * Reads data from underlyingInputStream to readAheadBuffer asynchronously.
380     *
381     * @throws IOException if an I/O error occurs.
382     */
383    private void readAsync() throws IOException {
384        stateChangeLock.lock();
385        final byte[] arr;
386        try {
387            arr = readAheadBuffer.array();
388            if (endOfStream || readInProgress) {
389                return;
390            }
391            checkReadException();
392            readAheadBuffer.position(0);
393            readAheadBuffer.flip();
394            readInProgress = true;
395        } finally {
396            stateChangeLock.unlock();
397        }
398        executorService.execute(() -> {
399            stateChangeLock.lock();
400            try {
401                if (isClosed) {
402                    readInProgress = false;
403                    return;
404                }
405                // Flip this so that the close method will not close the underlying input stream when we
406                // are reading.
407                isReading = true;
408            } finally {
409                stateChangeLock.unlock();
410            }
411
412            // Please note that it is safe to release the lock and read into the read ahead buffer
413            // because either of following two conditions will hold:
414            //
415            // 1. The active buffer has data available to read so the reader will not read from the read ahead buffer.
416            //
417            // 2. This is the first time read is called or the active buffer is exhausted, in that case the reader waits
418            // for this async read to complete.
419            //
420            // So there is no race condition in both the situations.
421            int read = 0;
422            int off = 0;
423            int len = arr.length;
424            Throwable exception = null;
425            try {
426                // try to fill the read ahead buffer.
427                // if a reader is waiting, possibly return early.
428                do {
429                    read = in.read(arr, off, len);
430                    if (read <= 0) {
431                        break;
432                    }
433                    off += read;
434                    len -= read;
435                } while (len > 0 && !isWaiting.get());
436            } catch (final Throwable ex) {
437                exception = ex;
438                if (ex instanceof Error) {
439                    // `readException` may not be reported to the user. Rethrow Error to make sure at least
440                    // The user can see Error in UncaughtExceptionHandler.
441                    throw (Error) ex;
442                }
443            } finally {
444                stateChangeLock.lock();
445                try {
446                    readAheadBuffer.limit(off);
447                    if (read < 0 || exception instanceof EOFException) {
448                        endOfStream = true;
449                    } else if (exception != null) {
450                        readAborted = true;
451                        readException = exception;
452                    }
453                    readInProgress = false;
454                    signalAsyncReadComplete();
455                } finally {
456                    stateChangeLock.unlock();
457                }
458                closeUnderlyingInputStreamIfNecessary();
459            }
460        });
461    }
462
463    private void signalAsyncReadComplete() {
464        stateChangeLock.lock();
465        try {
466            asyncReadComplete.signalAll();
467        } finally {
468            stateChangeLock.unlock();
469        }
470    }
471
472    @Override
473    public long skip(final long n) throws IOException {
474        if (n <= 0L) {
475            return 0L;
476        }
477        if (n <= activeBuffer.remaining()) {
478            // Only skipping from active buffer is sufficient
479            activeBuffer.position((int) n + activeBuffer.position());
480            return n;
481        }
482        stateChangeLock.lock();
483        final long skipped;
484        try {
485            skipped = skipInternal(n);
486        } finally {
487            stateChangeLock.unlock();
488        }
489        return skipped;
490    }
491
492    /**
493     * Internal skip function which should be called only from skip(). The assumption is that the stateChangeLock is already acquired in the caller before
494     * calling this function.
495     *
496     * @param n the number of bytes to be skipped.
497     * @return the actual number of bytes skipped.
498     * @throws IOException if an I/O error occurs.
499     */
500    private long skipInternal(final long n) throws IOException {
501        if (!stateChangeLock.isLocked()) {
502            throw new IllegalStateException("Expected stateChangeLock to be locked");
503        }
504        waitForAsyncReadComplete();
505        if (isEndOfStream()) {
506            return 0;
507        }
508        if (available() >= n) {
509            // we can skip from the internal buffers
510            int toSkip = (int) n;
511            // We need to skip from both active buffer and read ahead buffer
512            toSkip -= activeBuffer.remaining();
513            if (toSkip <= 0) { // skipping from activeBuffer already handled.
514                throw new IllegalStateException("Expected toSkip > 0, actual: " + toSkip);
515            }
516            activeBuffer.position(0);
517            activeBuffer.flip();
518            readAheadBuffer.position(toSkip + readAheadBuffer.position());
519            swapBuffers();
520            // Trigger async read to emptied read ahead buffer.
521            readAsync();
522            return n;
523        }
524        final int skippedBytes = available();
525        final long toSkip = n - skippedBytes;
526        activeBuffer.position(0);
527        activeBuffer.flip();
528        readAheadBuffer.position(0);
529        readAheadBuffer.flip();
530        final long skippedFromInputStream = in.skip(toSkip);
531        readAsync();
532        return skippedBytes + skippedFromInputStream;
533    }
534
535    /**
536     * Flips the active and read ahead buffer
537     */
538    private void swapBuffers() {
539        final ByteBuffer temp = activeBuffer;
540        activeBuffer = readAheadBuffer;
541        readAheadBuffer = temp;
542    }
543
544    private void waitForAsyncReadComplete() throws IOException {
545        stateChangeLock.lock();
546        try {
547            isWaiting.set(true);
548            // There is only one reader, and one writer, so the writer should signal only once,
549            // but a while loop checking the wake-up condition is still needed to avoid spurious wakeups.
550            while (readInProgress) {
551                asyncReadComplete.await();
552            }
553        } catch (final InterruptedException e) {
554            final InterruptedIOException iio = new InterruptedIOException(e.getMessage());
555            iio.initCause(e);
556            throw iio;
557        } finally {
558            try {
559                isWaiting.set(false);
560            } finally {
561                stateChangeLock.unlock();
562            }
563        }
564        checkReadException();
565    }
566}