View Javadoc
1   /*
2    * Licensed under the Apache License, Version 2.0 (the "License");
3    * you may not use this file except in compliance with the License.
4    * You may obtain a copy of the License at
5    *
6    *     http://www.apache.org/licenses/LICENSE-2.0
7    *
8    * Unless required by applicable law or agreed to in writing, software
9    * distributed under the License is distributed on an "AS IS" BASIS,
10   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11   * See the License for the specific language governing permissions and
12   * limitations under the License.
13   */
14  package org.apache.commons.io.input;
15  
16  import static org.apache.commons.io.IOUtils.EOF;
17  
18  // import javax.annotation.concurrent.GuardedBy;
19  import java.io.EOFException;
20  import java.io.FilterInputStream;
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.InterruptedIOException;
24  import java.nio.ByteBuffer;
25  import java.util.Objects;
26  import java.util.concurrent.ExecutorService;
27  import java.util.concurrent.Executors;
28  import java.util.concurrent.TimeUnit;
29  import java.util.concurrent.atomic.AtomicBoolean;
30  import java.util.concurrent.locks.Condition;
31  import java.util.concurrent.locks.ReentrantLock;
32  
33  import org.apache.commons.io.build.AbstractStreamBuilder;
34  
35  /**
36   * Implements {@link InputStream} to asynchronously read ahead from an underlying input stream when a specified amount of data has been read from the current
37   * buffer. It does so by maintaining two buffers: an active buffer and a read ahead buffer. The active buffer contains data which should be returned when a
38   * read() call is issued. The read ahead buffer is used to asynchronously read from the underlying input stream. When the current active buffer is exhausted, we
39   * flip the two buffers so that we can start reading from the read ahead buffer without being blocked by disk I/O.
40   * <p>
41   * To build an instance, use {@link Builder}.
42   * </p>
43   * <p>
44   * This class was ported and adapted from Apache Spark commit 933dc6cb7b3de1d8ccaf73d124d6eb95b947ed19.
45   * </p>
46   *
47   * @see Builder
48   * @since 2.9.0
49   */
50  public class ReadAheadInputStream extends FilterInputStream {
51  
52      // @formatter:off
53      /**
54       * Builds a new {@link ReadAheadInputStream}.
55       *
56       * <p>
57       * For example:
58       * </p>
59       * <pre>{@code
60       * ReadAheadInputStream s = ReadAheadInputStream.builder()
61       *   .setPath(path)
62       *   .setExecutorService(Executors.newSingleThreadExecutor(ReadAheadInputStream::newThread))
63       *   .get();}
64       * </pre>
65       *
66       * @see #get()
67       * @since 2.12.0
68       */
69      // @formatter:on
70      public static class Builder extends AbstractStreamBuilder<ReadAheadInputStream, Builder> {
71  
72          private ExecutorService executorService;
73  
74          /**
75           * Builds a new {@link ReadAheadInputStream}.
76           * <p>
77           * You must set input that supports {@link #getInputStream()}, otherwise, this method throws an exception.
78           * </p>
79           * <p>
80           * This builder use the following aspects:
81           * </p>
82           * <ul>
83           * <li>{@link #getInputStream()}</li>
84           * <li>{@link #getBufferSize()}</li>
85           * <li>{@link ExecutorService}</li>
86           * </ul>
87           *
88           * @return a new instance.
89           * @throws IllegalStateException         if the {@code origin} is {@code null}.
90           * @throws UnsupportedOperationException if the origin cannot be converted to an {@link InputStream}.
91           * @throws IOException                   if an I/O error occurs.
92           * @see #getInputStream()
93           * @see #getBufferSize()
94           */
95          @SuppressWarnings("resource")
96          @Override
97          public ReadAheadInputStream get() throws IOException {
98              return new ReadAheadInputStream(getInputStream(), getBufferSize(), executorService != null ? executorService : newExecutorService(),
99                      executorService == null);
100         }
101 
102         /**
103          * Sets the executor service for the read-ahead thread.
104          *
105          * @param executorService the executor service for the read-ahead thread.
106          * @return this
107          */
108         public Builder setExecutorService(final ExecutorService executorService) {
109             this.executorService = executorService;
110             return this;
111         }
112 
113     }
114 
115     private static final ThreadLocal<byte[]> BYTE_ARRAY_1 = ThreadLocal.withInitial(() -> new byte[1]);
116 
117     /**
118      * Constructs a new {@link Builder}.
119      *
120      * @return a new {@link Builder}.
121      * @since 2.12.0
122      */
123     public static Builder builder() {
124         return new Builder();
125     }
126 
127     /**
128      * Constructs a new daemon thread.
129      *
130      * @param r the thread's runnable.
131      * @return a new daemon thread.
132      */
133     private static Thread newDaemonThread(final Runnable r) {
134         final Thread thread = new Thread(r, "commons-io-read-ahead");
135         thread.setDaemon(true);
136         return thread;
137     }
138 
139     /**
140      * Constructs a new daemon executor service.
141      *
142      * @return a new daemon executor service.
143      */
144     private static ExecutorService newExecutorService() {
145         return Executors.newSingleThreadExecutor(ReadAheadInputStream::newDaemonThread);
146     }
147 
148     private final ReentrantLock stateChangeLock = new ReentrantLock();
149 
150     // @GuardedBy("stateChangeLock")
151     private ByteBuffer activeBuffer;
152 
153     // @GuardedBy("stateChangeLock")
154     private ByteBuffer readAheadBuffer;
155 
156     // @GuardedBy("stateChangeLock")
157     private boolean endOfStream;
158 
159     // @GuardedBy("stateChangeLock")
160     // true if async read is in progress
161     private boolean readInProgress;
162 
163     // @GuardedBy("stateChangeLock")
164     // true if read is aborted due to an exception in reading from underlying input stream.
165     private boolean readAborted;
166 
167     // @GuardedBy("stateChangeLock")
168     private Throwable readException;
169 
170     // @GuardedBy("stateChangeLock")
171     // whether the close method is called.
172     private boolean isClosed;
173 
174     // @GuardedBy("stateChangeLock")
175     // true when the close method will close the underlying input stream. This is valid only if
176     // `isClosed` is true.
177     private boolean isUnderlyingInputStreamBeingClosed;
178 
179     // @GuardedBy("stateChangeLock")
180     // whether there is a read ahead task running,
181     private boolean isReading;
182 
183     // Whether there is a reader waiting for data.
184     private final AtomicBoolean isWaiting = new AtomicBoolean();
185 
186     private final ExecutorService executorService;
187 
188     private final boolean shutdownExecutorService;
189 
190     private final Condition asyncReadComplete = stateChangeLock.newCondition();
191 
192     /**
193      * Constructs an instance with the specified buffer size and read-ahead threshold
194      *
195      * @param inputStream       The underlying input stream.
196      * @param bufferSizeInBytes The buffer size.
197      * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
198      */
199     @Deprecated
200     public ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes) {
201         this(inputStream, bufferSizeInBytes, newExecutorService(), true);
202     }
203 
204     /**
205      * Constructs an instance with the specified buffer size and read-ahead threshold
206      *
207      * @param inputStream       The underlying input stream.
208      * @param bufferSizeInBytes The buffer size.
209      * @param executorService   An executor service for the read-ahead thread.
210      * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
211      */
212     @Deprecated
213     public ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes, final ExecutorService executorService) {
214         this(inputStream, bufferSizeInBytes, executorService, false);
215     }
216 
217     /**
218      * Constructs an instance with the specified buffer size and read-ahead threshold
219      *
220      * @param inputStream             The underlying input stream.
221      * @param bufferSizeInBytes       The buffer size.
222      * @param executorService         An executor service for the read-ahead thread.
223      * @param shutdownExecutorService Whether or not to shut down the given ExecutorService on close.
224      */
225     private ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes, final ExecutorService executorService,
226             final boolean shutdownExecutorService) {
227         super(Objects.requireNonNull(inputStream, "inputStream"));
228         if (bufferSizeInBytes <= 0) {
229             throw new IllegalArgumentException("bufferSizeInBytes should be greater than 0, but the value is " + bufferSizeInBytes);
230         }
231         this.executorService = Objects.requireNonNull(executorService, "executorService");
232         this.shutdownExecutorService = shutdownExecutorService;
233         this.activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
234         this.readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
235         this.activeBuffer.flip();
236         this.readAheadBuffer.flip();
237     }
238 
239     @Override
240     public int available() throws IOException {
241         stateChangeLock.lock();
242         // Make sure we have no integer overflow.
243         try {
244             return (int) Math.min(Integer.MAX_VALUE, (long) activeBuffer.remaining() + readAheadBuffer.remaining());
245         } finally {
246             stateChangeLock.unlock();
247         }
248     }
249 
250     private void checkReadException() throws IOException {
251         if (readAborted) {
252             if (readException instanceof IOException) {
253                 throw (IOException) readException;
254             }
255             throw new IOException(readException);
256         }
257     }
258 
259     @Override
260     public void close() throws IOException {
261         boolean isSafeToCloseUnderlyingInputStream = false;
262         stateChangeLock.lock();
263         try {
264             if (isClosed) {
265                 return;
266             }
267             isClosed = true;
268             if (!isReading) {
269                 // Nobody is reading, so we can close the underlying input stream in this method.
270                 isSafeToCloseUnderlyingInputStream = true;
271                 // Flip this to make sure the read ahead task will not close the underlying input stream.
272                 isUnderlyingInputStreamBeingClosed = true;
273             }
274         } finally {
275             stateChangeLock.unlock();
276         }
277 
278         if (shutdownExecutorService) {
279             try {
280                 executorService.shutdownNow();
281                 executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
282             } catch (final InterruptedException e) {
283                 final InterruptedIOException iio = new InterruptedIOException(e.getMessage());
284                 iio.initCause(e);
285                 throw iio;
286             } finally {
287                 if (isSafeToCloseUnderlyingInputStream) {
288                     super.close();
289                 }
290             }
291         }
292     }
293 
294     private void closeUnderlyingInputStreamIfNecessary() {
295         boolean needToCloseUnderlyingInputStream = false;
296         stateChangeLock.lock();
297         try {
298             isReading = false;
299             if (isClosed && !isUnderlyingInputStreamBeingClosed) {
300                 // close method cannot close underlyingInputStream because we were reading.
301                 needToCloseUnderlyingInputStream = true;
302             }
303         } finally {
304             stateChangeLock.unlock();
305         }
306         if (needToCloseUnderlyingInputStream) {
307             try {
308                 super.close();
309             } catch (final IOException ignored) {
310                 // TODO Rethrow as UncheckedIOException?
311             }
312         }
313     }
314 
315     private boolean isEndOfStream() {
316         return !activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream;
317     }
318 
319     @Override
320     public int read() throws IOException {
321         if (activeBuffer.hasRemaining()) {
322             // short path - just get one byte.
323             return activeBuffer.get() & 0xFF;
324         }
325         final byte[] oneByteArray = BYTE_ARRAY_1.get();
326         oneByteArray[0] = 0;
327         return read(oneByteArray, 0, 1) == EOF ? EOF : oneByteArray[0] & 0xFF;
328     }
329 
330     @Override
331     public int read(final byte[] b, final int offset, int len) throws IOException {
332         if (offset < 0 || len < 0 || len > b.length - offset) {
333             throw new IndexOutOfBoundsException();
334         }
335         if (len == 0) {
336             return 0;
337         }
338 
339         if (!activeBuffer.hasRemaining()) {
340             // No remaining in active buffer - lock and switch to write ahead buffer.
341             stateChangeLock.lock();
342             try {
343                 waitForAsyncReadComplete();
344                 if (!readAheadBuffer.hasRemaining()) {
345                     // The first read.
346                     readAsync();
347                     waitForAsyncReadComplete();
348                     if (isEndOfStream()) {
349                         return EOF;
350                     }
351                 }
352                 // Swap the newly read ahead buffer in place of empty active buffer.
353                 swapBuffers();
354                 // After swapping buffers, trigger another async read for read ahead buffer.
355                 readAsync();
356             } finally {
357                 stateChangeLock.unlock();
358             }
359         }
360         len = Math.min(len, activeBuffer.remaining());
361         activeBuffer.get(b, offset, len);
362 
363         return len;
364     }
365 
366     /**
367      * Read data from underlyingInputStream to readAheadBuffer asynchronously.
368      *
369      * @throws IOException if an I/O error occurs.
370      */
371     private void readAsync() throws IOException {
372         stateChangeLock.lock();
373         final byte[] arr;
374         try {
375             arr = readAheadBuffer.array();
376             if (endOfStream || readInProgress) {
377                 return;
378             }
379             checkReadException();
380             readAheadBuffer.position(0);
381             readAheadBuffer.flip();
382             readInProgress = true;
383         } finally {
384             stateChangeLock.unlock();
385         }
386         executorService.execute(() -> {
387             stateChangeLock.lock();
388             try {
389                 if (isClosed) {
390                     readInProgress = false;
391                     return;
392                 }
393                 // Flip this so that the close method will not close the underlying input stream when we
394                 // are reading.
395                 isReading = true;
396             } finally {
397                 stateChangeLock.unlock();
398             }
399 
400             // Please note that it is safe to release the lock and read into the read ahead buffer
401             // because either of following two conditions will hold:
402             //
403             // 1. The active buffer has data available to read so the reader will not read from the read ahead buffer.
404             //
405             // 2. This is the first time read is called or the active buffer is exhausted, in that case the reader waits
406             // for this async read to complete.
407             //
408             // So there is no race condition in both the situations.
409             int read = 0;
410             int off = 0, len = arr.length;
411             Throwable exception = null;
412             try {
413                 // try to fill the read ahead buffer.
414                 // if a reader is waiting, possibly return early.
415                 do {
416                     read = in.read(arr, off, len);
417                     if (read <= 0) {
418                         break;
419                     }
420                     off += read;
421                     len -= read;
422                 } while (len > 0 && !isWaiting.get());
423             } catch (final Throwable ex) {
424                 exception = ex;
425                 if (ex instanceof Error) {
426                     // `readException` may not be reported to the user. Rethrow Error to make sure at least
427                     // The user can see Error in UncaughtExceptionHandler.
428                     throw (Error) ex;
429                 }
430             } finally {
431                 stateChangeLock.lock();
432                 try {
433                     readAheadBuffer.limit(off);
434                     if (read < 0 || exception instanceof EOFException) {
435                         endOfStream = true;
436                     } else if (exception != null) {
437                         readAborted = true;
438                         readException = exception;
439                     }
440                     readInProgress = false;
441                     signalAsyncReadComplete();
442                 } finally {
443                     stateChangeLock.unlock();
444                 }
445                 closeUnderlyingInputStreamIfNecessary();
446             }
447         });
448     }
449 
450     private void signalAsyncReadComplete() {
451         stateChangeLock.lock();
452         try {
453             asyncReadComplete.signalAll();
454         } finally {
455             stateChangeLock.unlock();
456         }
457     }
458 
459     @Override
460     public long skip(final long n) throws IOException {
461         if (n <= 0L) {
462             return 0L;
463         }
464         if (n <= activeBuffer.remaining()) {
465             // Only skipping from active buffer is sufficient
466             activeBuffer.position((int) n + activeBuffer.position());
467             return n;
468         }
469         stateChangeLock.lock();
470         final long skipped;
471         try {
472             skipped = skipInternal(n);
473         } finally {
474             stateChangeLock.unlock();
475         }
476         return skipped;
477     }
478 
479     /**
480      * Internal skip function which should be called only from skip(). The assumption is that the stateChangeLock is already acquired in the caller before
481      * calling this function.
482      *
483      * @param n the number of bytes to be skipped.
484      * @return the actual number of bytes skipped.
485      * @throws IOException if an I/O error occurs.
486      */
487     private long skipInternal(final long n) throws IOException {
488         assert stateChangeLock.isLocked();
489         waitForAsyncReadComplete();
490         if (isEndOfStream()) {
491             return 0;
492         }
493         if (available() >= n) {
494             // we can skip from the internal buffers
495             int toSkip = (int) n;
496             // We need to skip from both active buffer and read ahead buffer
497             toSkip -= activeBuffer.remaining();
498             assert toSkip > 0; // skipping from activeBuffer already handled.
499             activeBuffer.position(0);
500             activeBuffer.flip();
501             readAheadBuffer.position(toSkip + readAheadBuffer.position());
502             swapBuffers();
503             // Trigger async read to emptied read ahead buffer.
504             readAsync();
505             return n;
506         }
507         final int skippedBytes = available();
508         final long toSkip = n - skippedBytes;
509         activeBuffer.position(0);
510         activeBuffer.flip();
511         readAheadBuffer.position(0);
512         readAheadBuffer.flip();
513         final long skippedFromInputStream = in.skip(toSkip);
514         readAsync();
515         return skippedBytes + skippedFromInputStream;
516     }
517 
518     /**
519      * Flips the active and read ahead buffer
520      */
521     private void swapBuffers() {
522         final ByteBuffer temp = activeBuffer;
523         activeBuffer = readAheadBuffer;
524         readAheadBuffer = temp;
525     }
526 
527     private void waitForAsyncReadComplete() throws IOException {
528         stateChangeLock.lock();
529         try {
530             isWaiting.set(true);
531             // There is only one reader, and one writer, so the writer should signal only once,
532             // but a while loop checking the wake-up condition is still needed to avoid spurious wakeups.
533             while (readInProgress) {
534                 asyncReadComplete.await();
535             }
536         } catch (final InterruptedException e) {
537             final InterruptedIOException iio = new InterruptedIOException(e.getMessage());
538             iio.initCause(e);
539             throw iio;
540         } finally {
541             try {
542                 isWaiting.set(false);
543             } finally {
544                 stateChangeLock.unlock();
545             }
546         }
547         checkReadException();
548     }
549 }