001/*
002 * Licensed under the Apache License, Version 2.0 (the "License");
003 * you may not use this file except in compliance with the License.
004 * You may obtain a copy of the License at
005 *
006 *     http://www.apache.org/licenses/LICENSE-2.0
007 *
008 * Unless required by applicable law or agreed to in writing, software
009 * distributed under the License is distributed on an "AS IS" BASIS,
010 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
011 * See the License for the specific language governing permissions and
012 * limitations under the License.
013 */
014package org.apache.commons.io.input;
015
016import static org.apache.commons.io.IOUtils.EOF;
017
018// import javax.annotation.concurrent.GuardedBy;
019import java.io.EOFException;
020import java.io.FilterInputStream;
021import java.io.IOException;
022import java.io.InputStream;
023import java.io.InterruptedIOException;
024import java.nio.ByteBuffer;
025import java.util.Objects;
026import java.util.concurrent.ExecutorService;
027import java.util.concurrent.Executors;
028import java.util.concurrent.TimeUnit;
029import java.util.concurrent.atomic.AtomicBoolean;
030import java.util.concurrent.locks.Condition;
031import java.util.concurrent.locks.ReentrantLock;
032
033import org.apache.commons.io.build.AbstractStreamBuilder;
034
035/**
036 * Implements {@link InputStream} to asynchronously read ahead from an underlying input stream when a specified amount of data has been read from the current
037 * buffer. It does so by maintaining two buffers: an active buffer and a read ahead buffer. The active buffer contains data which should be returned when a
038 * read() call is issued. The read ahead buffer is used to asynchronously read from the underlying input stream. When the current active buffer is exhausted, we
039 * flip the two buffers so that we can start reading from the read ahead buffer without being blocked by disk I/O.
040 * <p>
041 * To build an instance, use {@link Builder}.
042 * </p>
043 * <p>
044 * This class was ported and adapted from Apache Spark commit 933dc6cb7b3de1d8ccaf73d124d6eb95b947ed19.
045 * </p>
046 *
047 * @see Builder
048 * @since 2.9.0
049 */
050public class ReadAheadInputStream extends FilterInputStream {
051
052    // @formatter:off
053    /**
054     * Builds a new {@link ReadAheadInputStream}.
055     *
056     * <p>
057     * For example:
058     * </p>
059     * <pre>{@code
060     * ReadAheadInputStream s = ReadAheadInputStream.builder()
061     *   .setPath(path)
062     *   .setExecutorService(Executors.newSingleThreadExecutor(ReadAheadInputStream::newThread))
063     *   .get();}
064     * </pre>
065     *
066     * @see #get()
067     * @since 2.12.0
068     */
069    // @formatter:on
070    public static class Builder extends AbstractStreamBuilder<ReadAheadInputStream, Builder> {
071
072        private ExecutorService executorService;
073
074        /**
075         * Builds a new {@link ReadAheadInputStream}.
076         * <p>
077         * You must set input that supports {@link #getInputStream()}, otherwise, this method throws an exception.
078         * </p>
079         * <p>
080         * This builder use the following aspects:
081         * </p>
082         * <ul>
083         * <li>{@link #getInputStream()}</li>
084         * <li>{@link #getBufferSize()}</li>
085         * <li>{@link ExecutorService}</li>
086         * </ul>
087         *
088         * @return a new instance.
089         * @throws IllegalStateException         if the {@code origin} is {@code null}.
090         * @throws UnsupportedOperationException if the origin cannot be converted to an {@link InputStream}.
091         * @throws IOException                   if an I/O error occurs.
092         * @see #getInputStream()
093         * @see #getBufferSize()
094         */
095        @SuppressWarnings("resource")
096        @Override
097        public ReadAheadInputStream get() throws IOException {
098            return new ReadAheadInputStream(getInputStream(), getBufferSize(), executorService != null ? executorService : newExecutorService(),
099                    executorService == null);
100        }
101
102        /**
103         * Sets the executor service for the read-ahead thread.
104         *
105         * @param executorService the executor service for the read-ahead thread.
106         * @return this
107         */
108        public Builder setExecutorService(final ExecutorService executorService) {
109            this.executorService = executorService;
110            return this;
111        }
112
113    }
114
115    private static final ThreadLocal<byte[]> BYTE_ARRAY_1 = ThreadLocal.withInitial(() -> new byte[1]);
116
117    /**
118     * Constructs a new {@link Builder}.
119     *
120     * @return a new {@link Builder}.
121     * @since 2.12.0
122     */
123    public static Builder builder() {
124        return new Builder();
125    }
126
127    /**
128     * Constructs a new daemon thread.
129     *
130     * @param r the thread's runnable.
131     * @return a new daemon thread.
132     */
133    private static Thread newDaemonThread(final Runnable r) {
134        final Thread thread = new Thread(r, "commons-io-read-ahead");
135        thread.setDaemon(true);
136        return thread;
137    }
138
139    /**
140     * Constructs a new daemon executor service.
141     *
142     * @return a new daemon executor service.
143     */
144    private static ExecutorService newExecutorService() {
145        return Executors.newSingleThreadExecutor(ReadAheadInputStream::newDaemonThread);
146    }
147
148    private final ReentrantLock stateChangeLock = new ReentrantLock();
149
150    // @GuardedBy("stateChangeLock")
151    private ByteBuffer activeBuffer;
152
153    // @GuardedBy("stateChangeLock")
154    private ByteBuffer readAheadBuffer;
155
156    // @GuardedBy("stateChangeLock")
157    private boolean endOfStream;
158
159    // @GuardedBy("stateChangeLock")
160    // true if async read is in progress
161    private boolean readInProgress;
162
163    // @GuardedBy("stateChangeLock")
164    // true if read is aborted due to an exception in reading from underlying input stream.
165    private boolean readAborted;
166
167    // @GuardedBy("stateChangeLock")
168    private Throwable readException;
169
170    // @GuardedBy("stateChangeLock")
171    // whether the close method is called.
172    private boolean isClosed;
173
174    // @GuardedBy("stateChangeLock")
175    // true when the close method will close the underlying input stream. This is valid only if
176    // `isClosed` is true.
177    private boolean isUnderlyingInputStreamBeingClosed;
178
179    // @GuardedBy("stateChangeLock")
180    // whether there is a read ahead task running,
181    private boolean isReading;
182
183    // Whether there is a reader waiting for data.
184    private final AtomicBoolean isWaiting = new AtomicBoolean();
185
186    private final ExecutorService executorService;
187
188    private final boolean shutdownExecutorService;
189
190    private final Condition asyncReadComplete = stateChangeLock.newCondition();
191
192    /**
193     * Constructs an instance with the specified buffer size and read-ahead threshold
194     *
195     * @param inputStream       The underlying input stream.
196     * @param bufferSizeInBytes The buffer size.
197     * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
198     */
199    @Deprecated
200    public ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes) {
201        this(inputStream, bufferSizeInBytes, newExecutorService(), true);
202    }
203
204    /**
205     * Constructs an instance with the specified buffer size and read-ahead threshold
206     *
207     * @param inputStream       The underlying input stream.
208     * @param bufferSizeInBytes The buffer size.
209     * @param executorService   An executor service for the read-ahead thread.
210     * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
211     */
212    @Deprecated
213    public ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes, final ExecutorService executorService) {
214        this(inputStream, bufferSizeInBytes, executorService, false);
215    }
216
217    /**
218     * Constructs an instance with the specified buffer size and read-ahead threshold
219     *
220     * @param inputStream             The underlying input stream.
221     * @param bufferSizeInBytes       The buffer size.
222     * @param executorService         An executor service for the read-ahead thread.
223     * @param shutdownExecutorService Whether or not to shut down the given ExecutorService on close.
224     */
225    private ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes, final ExecutorService executorService,
226            final boolean shutdownExecutorService) {
227        super(Objects.requireNonNull(inputStream, "inputStream"));
228        if (bufferSizeInBytes <= 0) {
229            throw new IllegalArgumentException("bufferSizeInBytes should be greater than 0, but the value is " + bufferSizeInBytes);
230        }
231        this.executorService = Objects.requireNonNull(executorService, "executorService");
232        this.shutdownExecutorService = shutdownExecutorService;
233        this.activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
234        this.readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
235        this.activeBuffer.flip();
236        this.readAheadBuffer.flip();
237    }
238
239    @Override
240    public int available() throws IOException {
241        stateChangeLock.lock();
242        // Make sure we have no integer overflow.
243        try {
244            return (int) Math.min(Integer.MAX_VALUE, (long) activeBuffer.remaining() + readAheadBuffer.remaining());
245        } finally {
246            stateChangeLock.unlock();
247        }
248    }
249
250    private void checkReadException() throws IOException {
251        if (readAborted) {
252            if (readException instanceof IOException) {
253                throw (IOException) readException;
254            }
255            throw new IOException(readException);
256        }
257    }
258
259    @Override
260    public void close() throws IOException {
261        boolean isSafeToCloseUnderlyingInputStream = false;
262        stateChangeLock.lock();
263        try {
264            if (isClosed) {
265                return;
266            }
267            isClosed = true;
268            if (!isReading) {
269                // Nobody is reading, so we can close the underlying input stream in this method.
270                isSafeToCloseUnderlyingInputStream = true;
271                // Flip this to make sure the read ahead task will not close the underlying input stream.
272                isUnderlyingInputStreamBeingClosed = true;
273            }
274        } finally {
275            stateChangeLock.unlock();
276        }
277
278        if (shutdownExecutorService) {
279            try {
280                executorService.shutdownNow();
281                executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
282            } catch (final InterruptedException e) {
283                final InterruptedIOException iio = new InterruptedIOException(e.getMessage());
284                iio.initCause(e);
285                throw iio;
286            } finally {
287                if (isSafeToCloseUnderlyingInputStream) {
288                    super.close();
289                }
290            }
291        }
292    }
293
294    private void closeUnderlyingInputStreamIfNecessary() {
295        boolean needToCloseUnderlyingInputStream = false;
296        stateChangeLock.lock();
297        try {
298            isReading = false;
299            if (isClosed && !isUnderlyingInputStreamBeingClosed) {
300                // close method cannot close underlyingInputStream because we were reading.
301                needToCloseUnderlyingInputStream = true;
302            }
303        } finally {
304            stateChangeLock.unlock();
305        }
306        if (needToCloseUnderlyingInputStream) {
307            try {
308                super.close();
309            } catch (final IOException ignored) {
310                // TODO Rethrow as UncheckedIOException?
311            }
312        }
313    }
314
315    private boolean isEndOfStream() {
316        return !activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream;
317    }
318
319    @Override
320    public int read() throws IOException {
321        if (activeBuffer.hasRemaining()) {
322            // short path - just get one byte.
323            return activeBuffer.get() & 0xFF;
324        }
325        final byte[] oneByteArray = BYTE_ARRAY_1.get();
326        oneByteArray[0] = 0;
327        return read(oneByteArray, 0, 1) == EOF ? EOF : oneByteArray[0] & 0xFF;
328    }
329
330    @Override
331    public int read(final byte[] b, final int offset, int len) throws IOException {
332        if (offset < 0 || len < 0 || len > b.length - offset) {
333            throw new IndexOutOfBoundsException();
334        }
335        if (len == 0) {
336            return 0;
337        }
338
339        if (!activeBuffer.hasRemaining()) {
340            // No remaining in active buffer - lock and switch to write ahead buffer.
341            stateChangeLock.lock();
342            try {
343                waitForAsyncReadComplete();
344                if (!readAheadBuffer.hasRemaining()) {
345                    // The first read.
346                    readAsync();
347                    waitForAsyncReadComplete();
348                    if (isEndOfStream()) {
349                        return EOF;
350                    }
351                }
352                // Swap the newly read ahead buffer in place of empty active buffer.
353                swapBuffers();
354                // After swapping buffers, trigger another async read for read ahead buffer.
355                readAsync();
356            } finally {
357                stateChangeLock.unlock();
358            }
359        }
360        len = Math.min(len, activeBuffer.remaining());
361        activeBuffer.get(b, offset, len);
362
363        return len;
364    }
365
366    /**
367     * Read data from underlyingInputStream to readAheadBuffer asynchronously.
368     *
369     * @throws IOException if an I/O error occurs.
370     */
371    private void readAsync() throws IOException {
372        stateChangeLock.lock();
373        final byte[] arr;
374        try {
375            arr = readAheadBuffer.array();
376            if (endOfStream || readInProgress) {
377                return;
378            }
379            checkReadException();
380            readAheadBuffer.position(0);
381            readAheadBuffer.flip();
382            readInProgress = true;
383        } finally {
384            stateChangeLock.unlock();
385        }
386        executorService.execute(() -> {
387            stateChangeLock.lock();
388            try {
389                if (isClosed) {
390                    readInProgress = false;
391                    return;
392                }
393                // Flip this so that the close method will not close the underlying input stream when we
394                // are reading.
395                isReading = true;
396            } finally {
397                stateChangeLock.unlock();
398            }
399
400            // Please note that it is safe to release the lock and read into the read ahead buffer
401            // because either of following two conditions will hold:
402            //
403            // 1. The active buffer has data available to read so the reader will not read from the read ahead buffer.
404            //
405            // 2. This is the first time read is called or the active buffer is exhausted, in that case the reader waits
406            // for this async read to complete.
407            //
408            // So there is no race condition in both the situations.
409            int read = 0;
410            int off = 0, len = arr.length;
411            Throwable exception = null;
412            try {
413                // try to fill the read ahead buffer.
414                // if a reader is waiting, possibly return early.
415                do {
416                    read = in.read(arr, off, len);
417                    if (read <= 0) {
418                        break;
419                    }
420                    off += read;
421                    len -= read;
422                } while (len > 0 && !isWaiting.get());
423            } catch (final Throwable ex) {
424                exception = ex;
425                if (ex instanceof Error) {
426                    // `readException` may not be reported to the user. Rethrow Error to make sure at least
427                    // The user can see Error in UncaughtExceptionHandler.
428                    throw (Error) ex;
429                }
430            } finally {
431                stateChangeLock.lock();
432                try {
433                    readAheadBuffer.limit(off);
434                    if (read < 0 || exception instanceof EOFException) {
435                        endOfStream = true;
436                    } else if (exception != null) {
437                        readAborted = true;
438                        readException = exception;
439                    }
440                    readInProgress = false;
441                    signalAsyncReadComplete();
442                } finally {
443                    stateChangeLock.unlock();
444                }
445                closeUnderlyingInputStreamIfNecessary();
446            }
447        });
448    }
449
450    private void signalAsyncReadComplete() {
451        stateChangeLock.lock();
452        try {
453            asyncReadComplete.signalAll();
454        } finally {
455            stateChangeLock.unlock();
456        }
457    }
458
459    @Override
460    public long skip(final long n) throws IOException {
461        if (n <= 0L) {
462            return 0L;
463        }
464        if (n <= activeBuffer.remaining()) {
465            // Only skipping from active buffer is sufficient
466            activeBuffer.position((int) n + activeBuffer.position());
467            return n;
468        }
469        stateChangeLock.lock();
470        final long skipped;
471        try {
472            skipped = skipInternal(n);
473        } finally {
474            stateChangeLock.unlock();
475        }
476        return skipped;
477    }
478
479    /**
480     * Internal skip function which should be called only from skip(). The assumption is that the stateChangeLock is already acquired in the caller before
481     * calling this function.
482     *
483     * @param n the number of bytes to be skipped.
484     * @return the actual number of bytes skipped.
485     * @throws IOException if an I/O error occurs.
486     */
487    private long skipInternal(final long n) throws IOException {
488        assert stateChangeLock.isLocked();
489        waitForAsyncReadComplete();
490        if (isEndOfStream()) {
491            return 0;
492        }
493        if (available() >= n) {
494            // we can skip from the internal buffers
495            int toSkip = (int) n;
496            // We need to skip from both active buffer and read ahead buffer
497            toSkip -= activeBuffer.remaining();
498            assert toSkip > 0; // skipping from activeBuffer already handled.
499            activeBuffer.position(0);
500            activeBuffer.flip();
501            readAheadBuffer.position(toSkip + readAheadBuffer.position());
502            swapBuffers();
503            // Trigger async read to emptied read ahead buffer.
504            readAsync();
505            return n;
506        }
507        final int skippedBytes = available();
508        final long toSkip = n - skippedBytes;
509        activeBuffer.position(0);
510        activeBuffer.flip();
511        readAheadBuffer.position(0);
512        readAheadBuffer.flip();
513        final long skippedFromInputStream = in.skip(toSkip);
514        readAsync();
515        return skippedBytes + skippedFromInputStream;
516    }
517
518    /**
519     * Flips the active and read ahead buffer
520     */
521    private void swapBuffers() {
522        final ByteBuffer temp = activeBuffer;
523        activeBuffer = readAheadBuffer;
524        readAheadBuffer = temp;
525    }
526
527    private void waitForAsyncReadComplete() throws IOException {
528        stateChangeLock.lock();
529        try {
530            isWaiting.set(true);
531            // There is only one reader, and one writer, so the writer should signal only once,
532            // but a while loop checking the wake-up condition is still needed to avoid spurious wakeups.
533            while (readInProgress) {
534                asyncReadComplete.await();
535            }
536        } catch (final InterruptedException e) {
537            final InterruptedIOException iio = new InterruptedIOException(e.getMessage());
538            iio.initCause(e);
539            throw iio;
540        } finally {
541            try {
542                isWaiting.set(false);
543            } finally {
544                stateChangeLock.unlock();
545            }
546        }
547        checkReadException();
548    }
549}