View Javadoc
1   /*
2    * Licensed under the Apache License, Version 2.0 (the "License");
3    * you may not use this file except in compliance with the License.
4    * You may obtain a copy of the License at
5    *
6    *     https://www.apache.org/licenses/LICENSE-2.0
7    *
8    * Unless required by applicable law or agreed to in writing, software
9    * distributed under the License is distributed on an "AS IS" BASIS,
10   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11   * See the License for the specific language governing permissions and
12   * limitations under the License.
13   */
14  package org.apache.commons.io.input;
15  
16  import static org.apache.commons.io.IOUtils.EOF;
17  
18  // import javax.annotation.concurrent.GuardedBy;
19  import java.io.EOFException;
20  import java.io.FilterInputStream;
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.nio.ByteBuffer;
24  import java.util.Objects;
25  import java.util.concurrent.ExecutorService;
26  import java.util.concurrent.Executors;
27  import java.util.concurrent.TimeUnit;
28  import java.util.concurrent.atomic.AtomicBoolean;
29  import java.util.concurrent.locks.Condition;
30  import java.util.concurrent.locks.ReentrantLock;
31  
32  import org.apache.commons.io.IOUtils;
33  import org.apache.commons.io.build.AbstractStreamBuilder;
34  
35  /**
36   * Implements {@link InputStream} to asynchronously read ahead from an underlying input stream when a specified amount of data has been read from the current
37   * buffer. It does so by maintaining two buffers: an active buffer and a read ahead buffer. The active buffer contains data which should be returned when a
38   * read() call is issued. The read ahead buffer is used to asynchronously read from the underlying input stream. When the current active buffer is exhausted, we
39   * flip the two buffers so that we can start reading from the read ahead buffer without being blocked by disk I/O.
40   * <p>
41   * To build an instance, use {@link Builder}.
42   * </p>
43   * <p>
44   * This class was ported and adapted from Apache Spark commit 933dc6cb7b3de1d8ccaf73d124d6eb95b947ed19.
45   * </p>
46   *
47   * @see Builder
48   * @since 2.9.0
49   */
50  public class ReadAheadInputStream extends FilterInputStream {
51  
52      // @formatter:off
53      /**
54       * Builds a new {@link ReadAheadInputStream}.
55       *
56       * <p>
57       * For example:
58       * </p>
59       * <pre>{@code
60       * ReadAheadInputStream s = ReadAheadInputStream.builder()
61       *   .setPath(path)
62       *   .setExecutorService(Executors.newSingleThreadExecutor(ReadAheadInputStream::newThread))
63       *   .get();}
64       * </pre>
65       * <p>
66       * If an {@link ExecutorService} is not set, then a single-threaded daemon executor service is used.
67       * </p>
68       *
69       * @see #get()
70       * @since 2.12.0
71       */
72      // @formatter:on
73      public static class Builder extends AbstractStreamBuilder<ReadAheadInputStream, Builder> {
74  
75          private ExecutorService executorService;
76  
77          /**
78           * Constructs a new builder of {@link ReadAheadInputStream}.
79           */
80          public Builder() {
81              // empty
82          }
83  
84          /**
85           * Builds a new {@link ReadAheadInputStream}.
86           * <p>
87           * You must set an aspect that supports {@link #getInputStream()}, otherwise, this method throws an exception.
88           * </p>
89           * <p>
90           * This builder uses the following aspects:
91           * </p>
92           * <ul>
93           * <li>{@link #getInputStream()} gets the target aspect.</li>
94           * <li>{@link #getBufferSize()}</li>
95           * <li>{@link ExecutorService}, if not set, a single-threaded daemon executor service is used.</li>
96           * </ul>
97           *
98           * @return a new instance.
99           * @throws IllegalStateException         if the {@code origin} is {@code null}.
100          * @throws UnsupportedOperationException if the origin cannot be converted to an {@link InputStream}.
101          * @throws IOException                   if an I/O error occurs converting to an {@link InputStream} using {@link #getInputStream()}.
102          * @see #getInputStream()
103          * @see #getBufferSize()
104          * @see #getUnchecked()
105          */
106         @Override
107         public ReadAheadInputStream get() throws IOException {
108             return new ReadAheadInputStream(this);
109         }
110 
111         /**
112          * Sets the executor service for the read-ahead thread.
113          * <p>
114          * If not set, a single-threaded daemon executor service is used.
115          * </p>
116          *
117          * @param executorService the executor service for the read-ahead thread, may be {@code null}.
118          * @return {@code this} instance.
119          */
120         public Builder setExecutorService(final ExecutorService executorService) {
121             this.executorService = executorService;
122             return this;
123         }
124 
125     }
126 
127     private static final ThreadLocal<byte[]> BYTE_ARRAY_1 = ThreadLocal.withInitial(() -> new byte[1]);
128 
129     /**
130      * Constructs a new {@link Builder}.
131      *
132      * @return a new {@link Builder}.
133      * @since 2.12.0
134      */
135     public static Builder builder() {
136         return new Builder();
137     }
138 
139     /**
140      * Constructs a new daemon thread.
141      *
142      * @param r the thread's runnable.
143      * @return a new daemon thread.
144      */
145     private static Thread newDaemonThread(final Runnable r) {
146         final Thread thread = new Thread(r, "commons-io-read-ahead");
147         thread.setDaemon(true);
148         return thread;
149     }
150 
151     /**
152      * Constructs a new daemon executor service.
153      *
154      * @return a new daemon executor service.
155      */
156     private static ExecutorService newExecutorService() {
157         return Executors.newSingleThreadExecutor(ReadAheadInputStream::newDaemonThread);
158     }
159 
160     private final ReentrantLock stateChangeLock = new ReentrantLock();
161 
162     // @GuardedBy("stateChangeLock")
163     private ByteBuffer activeBuffer;
164 
165     // @GuardedBy("stateChangeLock")
166     private ByteBuffer readAheadBuffer;
167 
168     // @GuardedBy("stateChangeLock")
169     private boolean endOfStream;
170 
171     // @GuardedBy("stateChangeLock")
172     // true if async read is in progress
173     private boolean readInProgress;
174 
175     // @GuardedBy("stateChangeLock")
176     // true if read is aborted due to an exception in reading from underlying input stream.
177     private boolean readAborted;
178 
179     // @GuardedBy("stateChangeLock")
180     private Throwable readException;
181 
182     // @GuardedBy("stateChangeLock")
183     // whether the close method is called.
184     private boolean isClosed;
185 
186     // @GuardedBy("stateChangeLock")
187     // true when the close method will close the underlying input stream. This is valid only if
188     // `isClosed` is true.
189     private boolean isUnderlyingInputStreamBeingClosed;
190 
191     // @GuardedBy("stateChangeLock")
192     // whether there is a read ahead task running,
193     private boolean isReading;
194 
195     // Whether there is a reader waiting for data.
196     private final AtomicBoolean isWaiting = new AtomicBoolean();
197 
198     private final ExecutorService executorService;
199 
200     private final boolean shutdownExecutorService;
201 
202     private final Condition asyncReadComplete = stateChangeLock.newCondition();
203 
204     @SuppressWarnings("resource")
205     private ReadAheadInputStream(final Builder builder) throws IOException {
206         this(builder.getInputStream(), builder.getBufferSize(), builder.executorService != null ? builder.executorService : newExecutorService(),
207                 builder.executorService == null);
208     }
209 
210     /**
211      * Constructs an instance with the specified buffer size and read-ahead threshold.
212      *
213      * @param inputStream       The underlying input stream.
214      * @param bufferSizeInBytes The buffer size.
215      * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
216      */
217     @Deprecated
218     public ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes) {
219         this(inputStream, bufferSizeInBytes, newExecutorService(), true);
220     }
221 
222     /**
223      * Constructs an instance with the specified buffer size and read-ahead threshold.
224      *
225      * @param inputStream       The underlying input stream.
226      * @param bufferSizeInBytes The buffer size.
227      * @param executorService   An executor service for the read-ahead thread.
228      * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
229      */
230     @Deprecated
231     public ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes, final ExecutorService executorService) {
232         this(inputStream, bufferSizeInBytes, executorService, false);
233     }
234 
235     /**
236      * Constructs an instance with the specified buffer size and read-ahead threshold.
237      *
238      * @param inputStream             The underlying input stream.
239      * @param bufferSizeInBytes       The buffer size.
240      * @param executorService         An executor service for the read-ahead thread.
241      * @param shutdownExecutorService Whether or not to shut down the given ExecutorService on close.
242      */
243     private ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes, final ExecutorService executorService,
244             final boolean shutdownExecutorService) {
245         super(Objects.requireNonNull(inputStream, "inputStream"));
246         if (bufferSizeInBytes <= 0) {
247             throw new IllegalArgumentException(String.format("bufferSizeInBytes <= 0, bufferSizeInBytes = %,d", bufferSizeInBytes));
248         }
249         this.executorService = Objects.requireNonNull(executorService, "executorService");
250         this.shutdownExecutorService = shutdownExecutorService;
251         this.activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
252         this.readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
253         this.activeBuffer.flip();
254         this.readAheadBuffer.flip();
255     }
256 
257     @Override
258     public int available() throws IOException {
259         stateChangeLock.lock();
260         // Make sure we have no integer overflow.
261         try {
262             return (int) Math.min(Integer.MAX_VALUE, (long) activeBuffer.remaining() + readAheadBuffer.remaining());
263         } finally {
264             stateChangeLock.unlock();
265         }
266     }
267 
268     private void checkReadException() throws IOException {
269         if (readAborted) {
270             if (readException instanceof IOException) {
271                 throw (IOException) readException;
272             }
273             throw new IOException(readException);
274         }
275     }
276 
277     @Override
278     public void close() throws IOException {
279         boolean isSafeToCloseUnderlyingInputStream = false;
280         stateChangeLock.lock();
281         try {
282             if (isClosed) {
283                 return;
284             }
285             isClosed = true;
286             if (!isReading) {
287                 // Nobody is reading, so we can close the underlying input stream in this method.
288                 isSafeToCloseUnderlyingInputStream = true;
289                 // Flip this to make sure the read ahead task will not close the underlying input stream.
290                 isUnderlyingInputStreamBeingClosed = true;
291             }
292         } finally {
293             stateChangeLock.unlock();
294         }
295         if (shutdownExecutorService) {
296             try {
297                 shutdownAwait();
298             } catch (final InterruptedException e) {
299                 Thread.currentThread().interrupt();
300                 throw Input.toInterruptedIOException(e);
301             } finally {
302                 if (isSafeToCloseUnderlyingInputStream) {
303                     super.close();
304                 }
305             }
306         }
307         if (isSafeToCloseUnderlyingInputStream) {
308             super.close();
309         }
310     }
311 
312     private void closeUnderlyingInputStreamIfNecessary() {
313         boolean needToCloseUnderlyingInputStream = false;
314         stateChangeLock.lock();
315         try {
316             isReading = false;
317             if (isClosed && !isUnderlyingInputStreamBeingClosed) {
318                 // close method cannot close underlyingInputStream because we were reading.
319                 needToCloseUnderlyingInputStream = true;
320             }
321         } finally {
322             stateChangeLock.unlock();
323         }
324         if (needToCloseUnderlyingInputStream) {
325             try {
326                 super.close();
327             } catch (final IOException ignored) {
328                 // TODO Rethrow as UncheckedIOException?
329             }
330         }
331     }
332 
333     private boolean isEndOfStream() {
334         return !activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream;
335     }
336 
337     @Override
338     public int read() throws IOException {
339         if (activeBuffer.hasRemaining()) {
340             // short path - just get one byte.
341             return activeBuffer.get() & 0xFF;
342         }
343         final byte[] oneByteArray = BYTE_ARRAY_1.get();
344         oneByteArray[0] = 0;
345         return read(oneByteArray, 0, 1) == EOF ? EOF : oneByteArray[0] & 0xFF;
346     }
347 
348     @Override
349     public int read(final byte[] b, final int offset, int len) throws IOException {
350         IOUtils.checkFromIndexSize(b, offset, len);
351         if (len == 0) {
352             return 0;
353         }
354         if (!activeBuffer.hasRemaining()) {
355             // No remaining in active buffer - lock and switch to write ahead buffer.
356             stateChangeLock.lock();
357             try {
358                 waitForAsyncReadComplete();
359                 if (!readAheadBuffer.hasRemaining()) {
360                     // The first read.
361                     readAsync();
362                     waitForAsyncReadComplete();
363                     if (isEndOfStream()) {
364                         return EOF;
365                     }
366                 }
367                 // Swap the newly read ahead buffer in place of empty active buffer.
368                 swapBuffers();
369                 // After swapping buffers, trigger another async read for read ahead buffer.
370                 readAsync();
371             } finally {
372                 stateChangeLock.unlock();
373             }
374         }
375         len = Math.min(len, activeBuffer.remaining());
376         activeBuffer.get(b, offset, len);
377 
378         return len;
379     }
380 
381     /**
382      * Reads data from underlyingInputStream to readAheadBuffer asynchronously.
383      *
384      * @throws IOException if an I/O error occurs.
385      */
386     private void readAsync() throws IOException {
387         stateChangeLock.lock();
388         final byte[] arr;
389         try {
390             arr = readAheadBuffer.array();
391             if (endOfStream || readInProgress) {
392                 return;
393             }
394             checkReadException();
395             readAheadBuffer.position(0);
396             readAheadBuffer.flip();
397             readInProgress = true;
398         } finally {
399             stateChangeLock.unlock();
400         }
401         executorService.execute(() -> {
402             stateChangeLock.lock();
403             try {
404                 if (isClosed) {
405                     readInProgress = false;
406                     return;
407                 }
408                 // Flip this so that the close method will not close the underlying input stream when we
409                 // are reading.
410                 isReading = true;
411             } finally {
412                 stateChangeLock.unlock();
413             }
414 
415             // Please note that it is safe to release the lock and read into the read ahead buffer
416             // because either of following two conditions will hold:
417             //
418             // 1. The active buffer has data available to read so the reader will not read from the read ahead buffer.
419             //
420             // 2. This is the first time read is called or the active buffer is exhausted, in that case the reader waits
421             // for this async read to complete.
422             //
423             // So there is no race condition in both the situations.
424             int read = 0;
425             int off = 0;
426             int len = arr.length;
427             Throwable exception = null;
428             try {
429                 // try to fill the read ahead buffer.
430                 // if a reader is waiting, possibly return early.
431                 do {
432                     read = in.read(arr, off, len);
433                     if (read <= 0) {
434                         break;
435                     }
436                     off += read;
437                     len -= read;
438                 } while (len > 0 && !isWaiting.get());
439             } catch (final Throwable ex) {
440                 exception = ex;
441                 if (ex instanceof Error) {
442                     // `readException` may not be reported to the user. Rethrow Error to make sure at least
443                     // The user can see Error in UncaughtExceptionHandler.
444                     throw (Error) ex;
445                 }
446             } finally {
447                 stateChangeLock.lock();
448                 try {
449                     readAheadBuffer.limit(off);
450                     if (read < 0 || exception instanceof EOFException) {
451                         endOfStream = true;
452                     } else if (exception != null) {
453                         readAborted = true;
454                         readException = exception;
455                     }
456                     readInProgress = false;
457                     signalAsyncReadComplete();
458                 } finally {
459                     stateChangeLock.unlock();
460                 }
461                 closeUnderlyingInputStreamIfNecessary();
462             }
463         });
464     }
465 
466     boolean shutdownAwait() throws InterruptedException {
467         executorService.shutdownNow();
468         return executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
469     }
470 
471     private void signalAsyncReadComplete() {
472         stateChangeLock.lock();
473         try {
474             asyncReadComplete.signalAll();
475         } finally {
476             stateChangeLock.unlock();
477         }
478     }
479 
480     @Override
481     public long skip(final long n) throws IOException {
482         if (n <= 0L) {
483             return 0L;
484         }
485         if (n <= activeBuffer.remaining()) {
486             // Only skipping from active buffer is sufficient
487             activeBuffer.position((int) n + activeBuffer.position());
488             return n;
489         }
490         stateChangeLock.lock();
491         final long skipped;
492         try {
493             skipped = skipInternal(n);
494         } finally {
495             stateChangeLock.unlock();
496         }
497         return skipped;
498     }
499 
500     /**
501      * Internal skip function which should be called only from skip(). The assumption is that the stateChangeLock is already acquired in the caller before
502      * calling this function.
503      *
504      * @param n the number of bytes to be skipped.
505      * @return the actual number of bytes skipped.
506      * @throws IOException if an I/O error occurs.
507      */
508     private long skipInternal(final long n) throws IOException {
509         if (!stateChangeLock.isLocked()) {
510             throw new IllegalStateException("Expected stateChangeLock to be locked");
511         }
512         waitForAsyncReadComplete();
513         if (isEndOfStream()) {
514             return 0;
515         }
516         if (available() >= n) {
517             // we can skip from the internal buffers
518             int toSkip = (int) n;
519             // We need to skip from both active buffer and read ahead buffer
520             toSkip -= activeBuffer.remaining();
521             if (toSkip <= 0) { // skipping from activeBuffer already handled.
522                 throw new IllegalStateException("Expected toSkip > 0, actual: " + toSkip);
523             }
524             activeBuffer.position(0);
525             activeBuffer.flip();
526             readAheadBuffer.position(toSkip + readAheadBuffer.position());
527             swapBuffers();
528             // Trigger async read to emptied read ahead buffer.
529             readAsync();
530             return n;
531         }
532         final int skippedBytes = available();
533         final long toSkip = n - skippedBytes;
534         activeBuffer.position(0);
535         activeBuffer.flip();
536         readAheadBuffer.position(0);
537         readAheadBuffer.flip();
538         final long skippedFromInputStream = in.skip(toSkip);
539         readAsync();
540         return skippedBytes + skippedFromInputStream;
541     }
542 
543     /**
544      * Flips the active and read ahead buffers.
545      */
546     private void swapBuffers() {
547         final ByteBuffer temp = activeBuffer;
548         activeBuffer = readAheadBuffer;
549         readAheadBuffer = temp;
550     }
551 
552     private void waitForAsyncReadComplete() throws IOException {
553         stateChangeLock.lock();
554         try {
555             isWaiting.set(true);
556             // There is only one reader, and one writer, so the writer should signal only once,
557             // but a while loop checking the wake-up condition is still needed to avoid spurious wakeups.
558             while (readInProgress) {
559                 asyncReadComplete.await();
560             }
561         } catch (final InterruptedException e) {
562             Thread.currentThread().interrupt();
563             throw Input.toInterruptedIOException(e);
564         } finally {
565             try {
566                 isWaiting.set(false);
567             } finally {
568                 stateChangeLock.unlock();
569             }
570         }
571         checkReadException();
572     }
573 }