View Javadoc
1   /*
2    * Licensed under the Apache License, Version 2.0 (the "License");
3    * you may not use this file except in compliance with the License.
4    * You may obtain a copy of the License at
5    *
6    *     https://www.apache.org/licenses/LICENSE-2.0
7    *
8    * Unless required by applicable law or agreed to in writing, software
9    * distributed under the License is distributed on an "AS IS" BASIS,
10   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11   * See the License for the specific language governing permissions and
12   * limitations under the License.
13   */
14  package org.apache.commons.io.input;
15  
16  import static org.apache.commons.io.IOUtils.EOF;
17  
18  // import javax.annotation.concurrent.GuardedBy;
19  import java.io.EOFException;
20  import java.io.FilterInputStream;
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.InterruptedIOException;
24  import java.nio.ByteBuffer;
25  import java.util.Objects;
26  import java.util.concurrent.ExecutorService;
27  import java.util.concurrent.Executors;
28  import java.util.concurrent.TimeUnit;
29  import java.util.concurrent.atomic.AtomicBoolean;
30  import java.util.concurrent.locks.Condition;
31  import java.util.concurrent.locks.ReentrantLock;
32  
33  import org.apache.commons.io.IOUtils;
34  import org.apache.commons.io.build.AbstractStreamBuilder;
35  
36  /**
37   * Implements {@link InputStream} to asynchronously read ahead from an underlying input stream when a specified amount of data has been read from the current
38   * buffer. It does so by maintaining two buffers: an active buffer and a read ahead buffer. The active buffer contains data which should be returned when a
39   * read() call is issued. The read ahead buffer is used to asynchronously read from the underlying input stream. When the current active buffer is exhausted, we
40   * flip the two buffers so that we can start reading from the read ahead buffer without being blocked by disk I/O.
41   * <p>
42   * To build an instance, use {@link Builder}.
43   * </p>
44   * <p>
45   * This class was ported and adapted from Apache Spark commit 933dc6cb7b3de1d8ccaf73d124d6eb95b947ed19.
46   * </p>
47   *
48   * @see Builder
49   * @since 2.9.0
50   */
51  public class ReadAheadInputStream extends FilterInputStream {
52  
53      // @formatter:off
54      /**
55       * Builds a new {@link ReadAheadInputStream}.
56       *
57       * <p>
58       * For example:
59       * </p>
60       * <pre>{@code
61       * ReadAheadInputStream s = ReadAheadInputStream.builder()
62       *   .setPath(path)
63       *   .setExecutorService(Executors.newSingleThreadExecutor(ReadAheadInputStream::newThread))
64       *   .get();}
65       * </pre>
66       *
67       * @see #get()
68       * @since 2.12.0
69       */
70      // @formatter:on
71      public static class Builder extends AbstractStreamBuilder<ReadAheadInputStream, Builder> {
72  
73          private ExecutorService executorService;
74  
75          /**
76           * Constructs a new builder of {@link ReadAheadInputStream}.
77           */
78          public Builder() {
79              // empty
80          }
81  
82          /**
83           * Builds a new {@link ReadAheadInputStream}.
84           * <p>
85           * You must set an aspect that supports {@link #getInputStream()}, otherwise, this method throws an exception.
86           * </p>
87           * <p>
88           * This builder uses the following aspects:
89           * </p>
90           * <ul>
91           * <li>{@link #getInputStream()} gets the target aspect.</li>
92           * <li>{@link #getBufferSize()}</li>
93           * <li>{@link ExecutorService}</li>
94           * </ul>
95           *
96           * @return a new instance.
97           * @throws IllegalStateException         if the {@code origin} is {@code null}.
98           * @throws UnsupportedOperationException if the origin cannot be converted to an {@link InputStream}.
99           * @throws IOException                   if an I/O error occurs converting to an {@link InputStream} using {@link #getInputStream()}.
100          * @see #getInputStream()
101          * @see #getBufferSize()
102          * @see #getUnchecked()
103          */
104         @Override
105         public ReadAheadInputStream get() throws IOException {
106             return new ReadAheadInputStream(this);
107         }
108 
109         /**
110          * Sets the executor service for the read-ahead thread.
111          *
112          * @param executorService the executor service for the read-ahead thread.
113          * @return {@code this} instance.
114          */
115         public Builder setExecutorService(final ExecutorService executorService) {
116             this.executorService = executorService;
117             return this;
118         }
119 
120     }
121 
122     private static final ThreadLocal<byte[]> BYTE_ARRAY_1 = ThreadLocal.withInitial(() -> new byte[1]);
123 
124     /**
125      * Constructs a new {@link Builder}.
126      *
127      * @return a new {@link Builder}.
128      * @since 2.12.0
129      */
130     public static Builder builder() {
131         return new Builder();
132     }
133 
134     /**
135      * Constructs a new daemon thread.
136      *
137      * @param r the thread's runnable.
138      * @return a new daemon thread.
139      */
140     private static Thread newDaemonThread(final Runnable r) {
141         final Thread thread = new Thread(r, "commons-io-read-ahead");
142         thread.setDaemon(true);
143         return thread;
144     }
145 
146     /**
147      * Constructs a new daemon executor service.
148      *
149      * @return a new daemon executor service.
150      */
151     private static ExecutorService newExecutorService() {
152         return Executors.newSingleThreadExecutor(ReadAheadInputStream::newDaemonThread);
153     }
154 
155     private final ReentrantLock stateChangeLock = new ReentrantLock();
156 
157     // @GuardedBy("stateChangeLock")
158     private ByteBuffer activeBuffer;
159 
160     // @GuardedBy("stateChangeLock")
161     private ByteBuffer readAheadBuffer;
162 
163     // @GuardedBy("stateChangeLock")
164     private boolean endOfStream;
165 
166     // @GuardedBy("stateChangeLock")
167     // true if async read is in progress
168     private boolean readInProgress;
169 
170     // @GuardedBy("stateChangeLock")
171     // true if read is aborted due to an exception in reading from underlying input stream.
172     private boolean readAborted;
173 
174     // @GuardedBy("stateChangeLock")
175     private Throwable readException;
176 
177     // @GuardedBy("stateChangeLock")
178     // whether the close method is called.
179     private boolean isClosed;
180 
181     // @GuardedBy("stateChangeLock")
182     // true when the close method will close the underlying input stream. This is valid only if
183     // `isClosed` is true.
184     private boolean isUnderlyingInputStreamBeingClosed;
185 
186     // @GuardedBy("stateChangeLock")
187     // whether there is a read ahead task running,
188     private boolean isReading;
189 
190     // Whether there is a reader waiting for data.
191     private final AtomicBoolean isWaiting = new AtomicBoolean();
192 
193     private final ExecutorService executorService;
194 
195     private final boolean shutdownExecutorService;
196 
197     private final Condition asyncReadComplete = stateChangeLock.newCondition();
198 
199     @SuppressWarnings("resource")
200     private ReadAheadInputStream(final Builder builder) throws IOException {
201         this(builder.getInputStream(), builder.getBufferSize(), builder.executorService != null ? builder.executorService : newExecutorService(),
202                 builder.executorService == null);
203     }
204 
205     /**
206      * Constructs an instance with the specified buffer size and read-ahead threshold
207      *
208      * @param inputStream       The underlying input stream.
209      * @param bufferSizeInBytes The buffer size.
210      * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
211      */
212     @Deprecated
213     public ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes) {
214         this(inputStream, bufferSizeInBytes, newExecutorService(), true);
215     }
216 
217     /**
218      * Constructs an instance with the specified buffer size and read-ahead threshold
219      *
220      * @param inputStream       The underlying input stream.
221      * @param bufferSizeInBytes The buffer size.
222      * @param executorService   An executor service for the read-ahead thread.
223      * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
224      */
225     @Deprecated
226     public ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes, final ExecutorService executorService) {
227         this(inputStream, bufferSizeInBytes, executorService, false);
228     }
229 
230     /**
231      * Constructs an instance with the specified buffer size and read-ahead threshold
232      *
233      * @param inputStream             The underlying input stream.
234      * @param bufferSizeInBytes       The buffer size.
235      * @param executorService         An executor service for the read-ahead thread.
236      * @param shutdownExecutorService Whether or not to shut down the given ExecutorService on close.
237      */
238     private ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes, final ExecutorService executorService,
239             final boolean shutdownExecutorService) {
240         super(Objects.requireNonNull(inputStream, "inputStream"));
241         if (bufferSizeInBytes <= 0) {
242             throw new IllegalArgumentException(String.format("bufferSizeInBytes <= 0, bufferSizeInBytes = %,d", bufferSizeInBytes));
243         }
244         this.executorService = Objects.requireNonNull(executorService, "executorService");
245         this.shutdownExecutorService = shutdownExecutorService;
246         this.activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
247         this.readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
248         this.activeBuffer.flip();
249         this.readAheadBuffer.flip();
250     }
251 
252     @Override
253     public int available() throws IOException {
254         stateChangeLock.lock();
255         // Make sure we have no integer overflow.
256         try {
257             return (int) Math.min(Integer.MAX_VALUE, (long) activeBuffer.remaining() + readAheadBuffer.remaining());
258         } finally {
259             stateChangeLock.unlock();
260         }
261     }
262 
263     private void checkReadException() throws IOException {
264         if (readAborted) {
265             if (readException instanceof IOException) {
266                 throw (IOException) readException;
267             }
268             throw new IOException(readException);
269         }
270     }
271 
272     @Override
273     public void close() throws IOException {
274         boolean isSafeToCloseUnderlyingInputStream = false;
275         stateChangeLock.lock();
276         try {
277             if (isClosed) {
278                 return;
279             }
280             isClosed = true;
281             if (!isReading) {
282                 // Nobody is reading, so we can close the underlying input stream in this method.
283                 isSafeToCloseUnderlyingInputStream = true;
284                 // Flip this to make sure the read ahead task will not close the underlying input stream.
285                 isUnderlyingInputStreamBeingClosed = true;
286             }
287         } finally {
288             stateChangeLock.unlock();
289         }
290 
291         if (shutdownExecutorService) {
292             try {
293                 executorService.shutdownNow();
294                 executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
295             } catch (final InterruptedException e) {
296                 final InterruptedIOException iio = new InterruptedIOException(e.getMessage());
297                 iio.initCause(e);
298                 throw iio;
299             } finally {
300                 if (isSafeToCloseUnderlyingInputStream) {
301                     super.close();
302                 }
303             }
304         }
305     }
306 
307     private void closeUnderlyingInputStreamIfNecessary() {
308         boolean needToCloseUnderlyingInputStream = false;
309         stateChangeLock.lock();
310         try {
311             isReading = false;
312             if (isClosed && !isUnderlyingInputStreamBeingClosed) {
313                 // close method cannot close underlyingInputStream because we were reading.
314                 needToCloseUnderlyingInputStream = true;
315             }
316         } finally {
317             stateChangeLock.unlock();
318         }
319         if (needToCloseUnderlyingInputStream) {
320             try {
321                 super.close();
322             } catch (final IOException ignored) {
323                 // TODO Rethrow as UncheckedIOException?
324             }
325         }
326     }
327 
328     private boolean isEndOfStream() {
329         return !activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream;
330     }
331 
332     @Override
333     public int read() throws IOException {
334         if (activeBuffer.hasRemaining()) {
335             // short path - just get one byte.
336             return activeBuffer.get() & 0xFF;
337         }
338         final byte[] oneByteArray = BYTE_ARRAY_1.get();
339         oneByteArray[0] = 0;
340         return read(oneByteArray, 0, 1) == EOF ? EOF : oneByteArray[0] & 0xFF;
341     }
342 
343     @Override
344     public int read(final byte[] b, final int offset, int len) throws IOException {
345         IOUtils.checkFromIndexSize(b, offset, len);
346         if (len == 0) {
347             return 0;
348         }
349 
350         if (!activeBuffer.hasRemaining()) {
351             // No remaining in active buffer - lock and switch to write ahead buffer.
352             stateChangeLock.lock();
353             try {
354                 waitForAsyncReadComplete();
355                 if (!readAheadBuffer.hasRemaining()) {
356                     // The first read.
357                     readAsync();
358                     waitForAsyncReadComplete();
359                     if (isEndOfStream()) {
360                         return EOF;
361                     }
362                 }
363                 // Swap the newly read ahead buffer in place of empty active buffer.
364                 swapBuffers();
365                 // After swapping buffers, trigger another async read for read ahead buffer.
366                 readAsync();
367             } finally {
368                 stateChangeLock.unlock();
369             }
370         }
371         len = Math.min(len, activeBuffer.remaining());
372         activeBuffer.get(b, offset, len);
373 
374         return len;
375     }
376 
377     /**
378      * Reads data from underlyingInputStream to readAheadBuffer asynchronously.
379      *
380      * @throws IOException if an I/O error occurs.
381      */
382     private void readAsync() throws IOException {
383         stateChangeLock.lock();
384         final byte[] arr;
385         try {
386             arr = readAheadBuffer.array();
387             if (endOfStream || readInProgress) {
388                 return;
389             }
390             checkReadException();
391             readAheadBuffer.position(0);
392             readAheadBuffer.flip();
393             readInProgress = true;
394         } finally {
395             stateChangeLock.unlock();
396         }
397         executorService.execute(() -> {
398             stateChangeLock.lock();
399             try {
400                 if (isClosed) {
401                     readInProgress = false;
402                     return;
403                 }
404                 // Flip this so that the close method will not close the underlying input stream when we
405                 // are reading.
406                 isReading = true;
407             } finally {
408                 stateChangeLock.unlock();
409             }
410 
411             // Please note that it is safe to release the lock and read into the read ahead buffer
412             // because either of following two conditions will hold:
413             //
414             // 1. The active buffer has data available to read so the reader will not read from the read ahead buffer.
415             //
416             // 2. This is the first time read is called or the active buffer is exhausted, in that case the reader waits
417             // for this async read to complete.
418             //
419             // So there is no race condition in both the situations.
420             int read = 0;
421             int off = 0;
422             int len = arr.length;
423             Throwable exception = null;
424             try {
425                 // try to fill the read ahead buffer.
426                 // if a reader is waiting, possibly return early.
427                 do {
428                     read = in.read(arr, off, len);
429                     if (read <= 0) {
430                         break;
431                     }
432                     off += read;
433                     len -= read;
434                 } while (len > 0 && !isWaiting.get());
435             } catch (final Throwable ex) {
436                 exception = ex;
437                 if (ex instanceof Error) {
438                     // `readException` may not be reported to the user. Rethrow Error to make sure at least
439                     // The user can see Error in UncaughtExceptionHandler.
440                     throw (Error) ex;
441                 }
442             } finally {
443                 stateChangeLock.lock();
444                 try {
445                     readAheadBuffer.limit(off);
446                     if (read < 0 || exception instanceof EOFException) {
447                         endOfStream = true;
448                     } else if (exception != null) {
449                         readAborted = true;
450                         readException = exception;
451                     }
452                     readInProgress = false;
453                     signalAsyncReadComplete();
454                 } finally {
455                     stateChangeLock.unlock();
456                 }
457                 closeUnderlyingInputStreamIfNecessary();
458             }
459         });
460     }
461 
462     private void signalAsyncReadComplete() {
463         stateChangeLock.lock();
464         try {
465             asyncReadComplete.signalAll();
466         } finally {
467             stateChangeLock.unlock();
468         }
469     }
470 
471     @Override
472     public long skip(final long n) throws IOException {
473         if (n <= 0L) {
474             return 0L;
475         }
476         if (n <= activeBuffer.remaining()) {
477             // Only skipping from active buffer is sufficient
478             activeBuffer.position((int) n + activeBuffer.position());
479             return n;
480         }
481         stateChangeLock.lock();
482         final long skipped;
483         try {
484             skipped = skipInternal(n);
485         } finally {
486             stateChangeLock.unlock();
487         }
488         return skipped;
489     }
490 
491     /**
492      * Internal skip function which should be called only from skip(). The assumption is that the stateChangeLock is already acquired in the caller before
493      * calling this function.
494      *
495      * @param n the number of bytes to be skipped.
496      * @return the actual number of bytes skipped.
497      * @throws IOException if an I/O error occurs.
498      */
499     private long skipInternal(final long n) throws IOException {
500         if (!stateChangeLock.isLocked()) {
501             throw new IllegalStateException("Expected stateChangeLock to be locked");
502         }
503         waitForAsyncReadComplete();
504         if (isEndOfStream()) {
505             return 0;
506         }
507         if (available() >= n) {
508             // we can skip from the internal buffers
509             int toSkip = (int) n;
510             // We need to skip from both active buffer and read ahead buffer
511             toSkip -= activeBuffer.remaining();
512             if (toSkip <= 0) { // skipping from activeBuffer already handled.
513                 throw new IllegalStateException("Expected toSkip > 0, actual: " + toSkip);
514             }
515             activeBuffer.position(0);
516             activeBuffer.flip();
517             readAheadBuffer.position(toSkip + readAheadBuffer.position());
518             swapBuffers();
519             // Trigger async read to emptied read ahead buffer.
520             readAsync();
521             return n;
522         }
523         final int skippedBytes = available();
524         final long toSkip = n - skippedBytes;
525         activeBuffer.position(0);
526         activeBuffer.flip();
527         readAheadBuffer.position(0);
528         readAheadBuffer.flip();
529         final long skippedFromInputStream = in.skip(toSkip);
530         readAsync();
531         return skippedBytes + skippedFromInputStream;
532     }
533 
534     /**
535      * Flips the active and read ahead buffer
536      */
537     private void swapBuffers() {
538         final ByteBuffer temp = activeBuffer;
539         activeBuffer = readAheadBuffer;
540         readAheadBuffer = temp;
541     }
542 
543     private void waitForAsyncReadComplete() throws IOException {
544         stateChangeLock.lock();
545         try {
546             isWaiting.set(true);
547             // There is only one reader, and one writer, so the writer should signal only once,
548             // but a while loop checking the wake-up condition is still needed to avoid spurious wakeups.
549             while (readInProgress) {
550                 asyncReadComplete.await();
551             }
552         } catch (final InterruptedException e) {
553             final InterruptedIOException iio = new InterruptedIOException(e.getMessage());
554             iio.initCause(e);
555             throw iio;
556         } finally {
557             try {
558                 isWaiting.set(false);
559             } finally {
560                 stateChangeLock.unlock();
561             }
562         }
563         checkReadException();
564     }
565 }