View Javadoc
1   /*
2    * Licensed under the Apache License, Version 2.0 (the "License");
3    * you may not use this file except in compliance with the License.
4    * You may obtain a copy of the License at
5    *
6    *     https://www.apache.org/licenses/LICENSE-2.0
7    *
8    * Unless required by applicable law or agreed to in writing, software
9    * distributed under the License is distributed on an "AS IS" BASIS,
10   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11   * See the License for the specific language governing permissions and
12   * limitations under the License.
13   */
14  package org.apache.commons.io.input;
15  
16  import static org.apache.commons.io.IOUtils.EOF;
17  
18  // import javax.annotation.concurrent.GuardedBy;
19  import java.io.EOFException;
20  import java.io.FilterInputStream;
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.InterruptedIOException;
24  import java.nio.ByteBuffer;
25  import java.util.Objects;
26  import java.util.concurrent.ExecutorService;
27  import java.util.concurrent.Executors;
28  import java.util.concurrent.TimeUnit;
29  import java.util.concurrent.atomic.AtomicBoolean;
30  import java.util.concurrent.locks.Condition;
31  import java.util.concurrent.locks.ReentrantLock;
32  
33  import org.apache.commons.io.build.AbstractStreamBuilder;
34  
35  /**
36   * Implements {@link InputStream} to asynchronously read ahead from an underlying input stream when a specified amount of data has been read from the current
37   * buffer. It does so by maintaining two buffers: an active buffer and a read ahead buffer. The active buffer contains data which should be returned when a
38   * read() call is issued. The read ahead buffer is used to asynchronously read from the underlying input stream. When the current active buffer is exhausted, we
39   * flip the two buffers so that we can start reading from the read ahead buffer without being blocked by disk I/O.
40   * <p>
41   * To build an instance, use {@link Builder}.
42   * </p>
43   * <p>
44   * This class was ported and adapted from Apache Spark commit 933dc6cb7b3de1d8ccaf73d124d6eb95b947ed19.
45   * </p>
46   *
47   * @see Builder
48   * @since 2.9.0
49   */
50  public class ReadAheadInputStream extends FilterInputStream {
51  
52      // @formatter:off
53      /**
54       * Builds a new {@link ReadAheadInputStream}.
55       *
56       * <p>
57       * For example:
58       * </p>
59       * <pre>{@code
60       * ReadAheadInputStream s = ReadAheadInputStream.builder()
61       *   .setPath(path)
62       *   .setExecutorService(Executors.newSingleThreadExecutor(ReadAheadInputStream::newThread))
63       *   .get();}
64       * </pre>
65       *
66       * @see #get()
67       * @since 2.12.0
68       */
69      // @formatter:on
70      public static class Builder extends AbstractStreamBuilder<ReadAheadInputStream, Builder> {
71  
72          private ExecutorService executorService;
73  
74          /**
75           * Constructs a new builder of {@link ReadAheadInputStream}.
76           */
77          public Builder() {
78              // empty
79          }
80  
81          /**
82           * Builds a new {@link ReadAheadInputStream}.
83           * <p>
84           * You must set an aspect that supports {@link #getInputStream()}, otherwise, this method throws an exception.
85           * </p>
86           * <p>
87           * This builder uses the following aspects:
88           * </p>
89           * <ul>
90           * <li>{@link #getInputStream()} gets the target aspect.</li>
91           * <li>{@link #getBufferSize()}</li>
92           * <li>{@link ExecutorService}</li>
93           * </ul>
94           *
95           * @return a new instance.
96           * @throws IllegalStateException         if the {@code origin} is {@code null}.
97           * @throws UnsupportedOperationException if the origin cannot be converted to an {@link InputStream}.
98           * @throws IOException                   if an I/O error occurs converting to an {@link InputStream} using {@link #getInputStream()}.
99           * @see #getInputStream()
100          * @see #getBufferSize()
101          * @see #getUnchecked()
102          */
103         @Override
104         public ReadAheadInputStream get() throws IOException {
105             return new ReadAheadInputStream(this);
106         }
107 
108         /**
109          * Sets the executor service for the read-ahead thread.
110          *
111          * @param executorService the executor service for the read-ahead thread.
112          * @return {@code this} instance.
113          */
114         public Builder setExecutorService(final ExecutorService executorService) {
115             this.executorService = executorService;
116             return this;
117         }
118 
119     }
120 
121     private static final ThreadLocal<byte[]> BYTE_ARRAY_1 = ThreadLocal.withInitial(() -> new byte[1]);
122 
123     /**
124      * Constructs a new {@link Builder}.
125      *
126      * @return a new {@link Builder}.
127      * @since 2.12.0
128      */
129     public static Builder builder() {
130         return new Builder();
131     }
132 
133     /**
134      * Constructs a new daemon thread.
135      *
136      * @param r the thread's runnable.
137      * @return a new daemon thread.
138      */
139     private static Thread newDaemonThread(final Runnable r) {
140         final Thread thread = new Thread(r, "commons-io-read-ahead");
141         thread.setDaemon(true);
142         return thread;
143     }
144 
145     /**
146      * Constructs a new daemon executor service.
147      *
148      * @return a new daemon executor service.
149      */
150     private static ExecutorService newExecutorService() {
151         return Executors.newSingleThreadExecutor(ReadAheadInputStream::newDaemonThread);
152     }
153 
154     private final ReentrantLock stateChangeLock = new ReentrantLock();
155 
156     // @GuardedBy("stateChangeLock")
157     private ByteBuffer activeBuffer;
158 
159     // @GuardedBy("stateChangeLock")
160     private ByteBuffer readAheadBuffer;
161 
162     // @GuardedBy("stateChangeLock")
163     private boolean endOfStream;
164 
165     // @GuardedBy("stateChangeLock")
166     // true if async read is in progress
167     private boolean readInProgress;
168 
169     // @GuardedBy("stateChangeLock")
170     // true if read is aborted due to an exception in reading from underlying input stream.
171     private boolean readAborted;
172 
173     // @GuardedBy("stateChangeLock")
174     private Throwable readException;
175 
176     // @GuardedBy("stateChangeLock")
177     // whether the close method is called.
178     private boolean isClosed;
179 
180     // @GuardedBy("stateChangeLock")
181     // true when the close method will close the underlying input stream. This is valid only if
182     // `isClosed` is true.
183     private boolean isUnderlyingInputStreamBeingClosed;
184 
185     // @GuardedBy("stateChangeLock")
186     // whether there is a read ahead task running,
187     private boolean isReading;
188 
189     // Whether there is a reader waiting for data.
190     private final AtomicBoolean isWaiting = new AtomicBoolean();
191 
192     private final ExecutorService executorService;
193 
194     private final boolean shutdownExecutorService;
195 
196     private final Condition asyncReadComplete = stateChangeLock.newCondition();
197 
198     @SuppressWarnings("resource")
199     private ReadAheadInputStream(final Builder builder) throws IOException {
200         this(builder.getInputStream(), builder.getBufferSize(), builder.executorService != null ? builder.executorService : newExecutorService(),
201                 builder.executorService == null);
202     }
203 
204     /**
205      * Constructs an instance with the specified buffer size and read-ahead threshold
206      *
207      * @param inputStream       The underlying input stream.
208      * @param bufferSizeInBytes The buffer size.
209      * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
210      */
211     @Deprecated
212     public ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes) {
213         this(inputStream, bufferSizeInBytes, newExecutorService(), true);
214     }
215 
216     /**
217      * Constructs an instance with the specified buffer size and read-ahead threshold
218      *
219      * @param inputStream       The underlying input stream.
220      * @param bufferSizeInBytes The buffer size.
221      * @param executorService   An executor service for the read-ahead thread.
222      * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
223      */
224     @Deprecated
225     public ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes, final ExecutorService executorService) {
226         this(inputStream, bufferSizeInBytes, executorService, false);
227     }
228 
229     /**
230      * Constructs an instance with the specified buffer size and read-ahead threshold
231      *
232      * @param inputStream             The underlying input stream.
233      * @param bufferSizeInBytes       The buffer size.
234      * @param executorService         An executor service for the read-ahead thread.
235      * @param shutdownExecutorService Whether or not to shut down the given ExecutorService on close.
236      */
237     private ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes, final ExecutorService executorService,
238             final boolean shutdownExecutorService) {
239         super(Objects.requireNonNull(inputStream, "inputStream"));
240         if (bufferSizeInBytes <= 0) {
241             throw new IllegalArgumentException("bufferSizeInBytes should be greater than 0, but the value is " + bufferSizeInBytes);
242         }
243         this.executorService = Objects.requireNonNull(executorService, "executorService");
244         this.shutdownExecutorService = shutdownExecutorService;
245         this.activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
246         this.readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
247         this.activeBuffer.flip();
248         this.readAheadBuffer.flip();
249     }
250 
251     @Override
252     public int available() throws IOException {
253         stateChangeLock.lock();
254         // Make sure we have no integer overflow.
255         try {
256             return (int) Math.min(Integer.MAX_VALUE, (long) activeBuffer.remaining() + readAheadBuffer.remaining());
257         } finally {
258             stateChangeLock.unlock();
259         }
260     }
261 
262     private void checkReadException() throws IOException {
263         if (readAborted) {
264             if (readException instanceof IOException) {
265                 throw (IOException) readException;
266             }
267             throw new IOException(readException);
268         }
269     }
270 
271     @Override
272     public void close() throws IOException {
273         boolean isSafeToCloseUnderlyingInputStream = false;
274         stateChangeLock.lock();
275         try {
276             if (isClosed) {
277                 return;
278             }
279             isClosed = true;
280             if (!isReading) {
281                 // Nobody is reading, so we can close the underlying input stream in this method.
282                 isSafeToCloseUnderlyingInputStream = true;
283                 // Flip this to make sure the read ahead task will not close the underlying input stream.
284                 isUnderlyingInputStreamBeingClosed = true;
285             }
286         } finally {
287             stateChangeLock.unlock();
288         }
289 
290         if (shutdownExecutorService) {
291             try {
292                 executorService.shutdownNow();
293                 executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
294             } catch (final InterruptedException e) {
295                 final InterruptedIOException iio = new InterruptedIOException(e.getMessage());
296                 iio.initCause(e);
297                 throw iio;
298             } finally {
299                 if (isSafeToCloseUnderlyingInputStream) {
300                     super.close();
301                 }
302             }
303         }
304     }
305 
306     private void closeUnderlyingInputStreamIfNecessary() {
307         boolean needToCloseUnderlyingInputStream = false;
308         stateChangeLock.lock();
309         try {
310             isReading = false;
311             if (isClosed && !isUnderlyingInputStreamBeingClosed) {
312                 // close method cannot close underlyingInputStream because we were reading.
313                 needToCloseUnderlyingInputStream = true;
314             }
315         } finally {
316             stateChangeLock.unlock();
317         }
318         if (needToCloseUnderlyingInputStream) {
319             try {
320                 super.close();
321             } catch (final IOException ignored) {
322                 // TODO Rethrow as UncheckedIOException?
323             }
324         }
325     }
326 
327     private boolean isEndOfStream() {
328         return !activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream;
329     }
330 
331     @Override
332     public int read() throws IOException {
333         if (activeBuffer.hasRemaining()) {
334             // short path - just get one byte.
335             return activeBuffer.get() & 0xFF;
336         }
337         final byte[] oneByteArray = BYTE_ARRAY_1.get();
338         oneByteArray[0] = 0;
339         return read(oneByteArray, 0, 1) == EOF ? EOF : oneByteArray[0] & 0xFF;
340     }
341 
342     @Override
343     public int read(final byte[] b, final int offset, int len) throws IOException {
344         if (offset < 0 || len < 0 || len > b.length - offset) {
345             throw new IndexOutOfBoundsException();
346         }
347         if (len == 0) {
348             return 0;
349         }
350 
351         if (!activeBuffer.hasRemaining()) {
352             // No remaining in active buffer - lock and switch to write ahead buffer.
353             stateChangeLock.lock();
354             try {
355                 waitForAsyncReadComplete();
356                 if (!readAheadBuffer.hasRemaining()) {
357                     // The first read.
358                     readAsync();
359                     waitForAsyncReadComplete();
360                     if (isEndOfStream()) {
361                         return EOF;
362                     }
363                 }
364                 // Swap the newly read ahead buffer in place of empty active buffer.
365                 swapBuffers();
366                 // After swapping buffers, trigger another async read for read ahead buffer.
367                 readAsync();
368             } finally {
369                 stateChangeLock.unlock();
370             }
371         }
372         len = Math.min(len, activeBuffer.remaining());
373         activeBuffer.get(b, offset, len);
374 
375         return len;
376     }
377 
378     /**
379      * Reads data from underlyingInputStream to readAheadBuffer asynchronously.
380      *
381      * @throws IOException if an I/O error occurs.
382      */
383     private void readAsync() throws IOException {
384         stateChangeLock.lock();
385         final byte[] arr;
386         try {
387             arr = readAheadBuffer.array();
388             if (endOfStream || readInProgress) {
389                 return;
390             }
391             checkReadException();
392             readAheadBuffer.position(0);
393             readAheadBuffer.flip();
394             readInProgress = true;
395         } finally {
396             stateChangeLock.unlock();
397         }
398         executorService.execute(() -> {
399             stateChangeLock.lock();
400             try {
401                 if (isClosed) {
402                     readInProgress = false;
403                     return;
404                 }
405                 // Flip this so that the close method will not close the underlying input stream when we
406                 // are reading.
407                 isReading = true;
408             } finally {
409                 stateChangeLock.unlock();
410             }
411 
412             // Please note that it is safe to release the lock and read into the read ahead buffer
413             // because either of following two conditions will hold:
414             //
415             // 1. The active buffer has data available to read so the reader will not read from the read ahead buffer.
416             //
417             // 2. This is the first time read is called or the active buffer is exhausted, in that case the reader waits
418             // for this async read to complete.
419             //
420             // So there is no race condition in both the situations.
421             int read = 0;
422             int off = 0;
423             int len = arr.length;
424             Throwable exception = null;
425             try {
426                 // try to fill the read ahead buffer.
427                 // if a reader is waiting, possibly return early.
428                 do {
429                     read = in.read(arr, off, len);
430                     if (read <= 0) {
431                         break;
432                     }
433                     off += read;
434                     len -= read;
435                 } while (len > 0 && !isWaiting.get());
436             } catch (final Throwable ex) {
437                 exception = ex;
438                 if (ex instanceof Error) {
439                     // `readException` may not be reported to the user. Rethrow Error to make sure at least
440                     // The user can see Error in UncaughtExceptionHandler.
441                     throw (Error) ex;
442                 }
443             } finally {
444                 stateChangeLock.lock();
445                 try {
446                     readAheadBuffer.limit(off);
447                     if (read < 0 || exception instanceof EOFException) {
448                         endOfStream = true;
449                     } else if (exception != null) {
450                         readAborted = true;
451                         readException = exception;
452                     }
453                     readInProgress = false;
454                     signalAsyncReadComplete();
455                 } finally {
456                     stateChangeLock.unlock();
457                 }
458                 closeUnderlyingInputStreamIfNecessary();
459             }
460         });
461     }
462 
463     private void signalAsyncReadComplete() {
464         stateChangeLock.lock();
465         try {
466             asyncReadComplete.signalAll();
467         } finally {
468             stateChangeLock.unlock();
469         }
470     }
471 
472     @Override
473     public long skip(final long n) throws IOException {
474         if (n <= 0L) {
475             return 0L;
476         }
477         if (n <= activeBuffer.remaining()) {
478             // Only skipping from active buffer is sufficient
479             activeBuffer.position((int) n + activeBuffer.position());
480             return n;
481         }
482         stateChangeLock.lock();
483         final long skipped;
484         try {
485             skipped = skipInternal(n);
486         } finally {
487             stateChangeLock.unlock();
488         }
489         return skipped;
490     }
491 
492     /**
493      * Internal skip function which should be called only from skip(). The assumption is that the stateChangeLock is already acquired in the caller before
494      * calling this function.
495      *
496      * @param n the number of bytes to be skipped.
497      * @return the actual number of bytes skipped.
498      * @throws IOException if an I/O error occurs.
499      */
500     private long skipInternal(final long n) throws IOException {
501         if (!stateChangeLock.isLocked()) {
502             throw new IllegalStateException("Expected stateChangeLock to be locked");
503         }
504         waitForAsyncReadComplete();
505         if (isEndOfStream()) {
506             return 0;
507         }
508         if (available() >= n) {
509             // we can skip from the internal buffers
510             int toSkip = (int) n;
511             // We need to skip from both active buffer and read ahead buffer
512             toSkip -= activeBuffer.remaining();
513             if (toSkip <= 0) { // skipping from activeBuffer already handled.
514                 throw new IllegalStateException("Expected toSkip > 0, actual: " + toSkip);
515             }
516             activeBuffer.position(0);
517             activeBuffer.flip();
518             readAheadBuffer.position(toSkip + readAheadBuffer.position());
519             swapBuffers();
520             // Trigger async read to emptied read ahead buffer.
521             readAsync();
522             return n;
523         }
524         final int skippedBytes = available();
525         final long toSkip = n - skippedBytes;
526         activeBuffer.position(0);
527         activeBuffer.flip();
528         readAheadBuffer.position(0);
529         readAheadBuffer.flip();
530         final long skippedFromInputStream = in.skip(toSkip);
531         readAsync();
532         return skippedBytes + skippedFromInputStream;
533     }
534 
535     /**
536      * Flips the active and read ahead buffer
537      */
538     private void swapBuffers() {
539         final ByteBuffer temp = activeBuffer;
540         activeBuffer = readAheadBuffer;
541         readAheadBuffer = temp;
542     }
543 
544     private void waitForAsyncReadComplete() throws IOException {
545         stateChangeLock.lock();
546         try {
547             isWaiting.set(true);
548             // There is only one reader, and one writer, so the writer should signal only once,
549             // but a while loop checking the wake-up condition is still needed to avoid spurious wakeups.
550             while (readInProgress) {
551                 asyncReadComplete.await();
552             }
553         } catch (final InterruptedException e) {
554             final InterruptedIOException iio = new InterruptedIOException(e.getMessage());
555             iio.initCause(e);
556             throw iio;
557         } finally {
558             try {
559                 isWaiting.set(false);
560             } finally {
561                 stateChangeLock.unlock();
562             }
563         }
564         checkReadException();
565     }
566 }