View Javadoc
1   /*
2    * Licensed under the Apache License, Version 2.0 (the "License");
3    * you may not use this file except in compliance with the License.
4    * You may obtain a copy of the License at
5    *
6    *     http://www.apache.org/licenses/LICENSE-2.0
7    *
8    * Unless required by applicable law or agreed to in writing, software
9    * distributed under the License is distributed on an "AS IS" BASIS,
10   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11   * See the License for the specific language governing permissions and
12   * limitations under the License.
13   */
14  package org.apache.commons.io.input;
15  
16  import static org.apache.commons.io.IOUtils.EOF;
17  
18  // import javax.annotation.concurrent.GuardedBy;
19  import java.io.EOFException;
20  import java.io.FilterInputStream;
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.InterruptedIOException;
24  import java.nio.ByteBuffer;
25  import java.util.Objects;
26  import java.util.concurrent.ExecutorService;
27  import java.util.concurrent.Executors;
28  import java.util.concurrent.TimeUnit;
29  import java.util.concurrent.atomic.AtomicBoolean;
30  import java.util.concurrent.locks.Condition;
31  import java.util.concurrent.locks.ReentrantLock;
32  
33  import org.apache.commons.io.build.AbstractStreamBuilder;
34  
35  /**
36   * Implements {@link InputStream} to asynchronously read ahead from an underlying input stream when a specified amount of data has been read from the current
37   * buffer. It does so by maintaining two buffers: an active buffer and a read ahead buffer. The active buffer contains data which should be returned when a
38   * read() call is issued. The read ahead buffer is used to asynchronously read from the underlying input stream. When the current active buffer is exhausted, we
39   * flip the two buffers so that we can start reading from the read ahead buffer without being blocked by disk I/O.
40   * <p>
41   * To build an instance, use {@link Builder}.
42   * </p>
43   * <p>
44   * This class was ported and adapted from Apache Spark commit 933dc6cb7b3de1d8ccaf73d124d6eb95b947ed19.
45   * </p>
46   *
47   * @see Builder
48   * @since 2.9.0
49   */
50  public class ReadAheadInputStream extends FilterInputStream {
51  
52      // @formatter:off
53      /**
54       * Builds a new {@link ReadAheadInputStream}.
55       *
56       * <p>
57       * For example:
58       * </p>
59       * <pre>{@code
60       * ReadAheadInputStream s = ReadAheadInputStream.builder()
61       *   .setPath(path)
62       *   .setExecutorService(Executors.newSingleThreadExecutor(ReadAheadInputStream::newThread))
63       *   .get();}
64       * </pre>
65       *
66       * @see #get()
67       * @since 2.12.0
68       */
69      // @formatter:on
70      public static class Builder extends AbstractStreamBuilder<ReadAheadInputStream, Builder> {
71  
72          private ExecutorService executorService;
73  
74          /**
75           * Constructs a new builder of {@link ReadAheadInputStream}.
76           */
77          public Builder() {
78              // empty
79          }
80  
81          /**
82           * Builds a new {@link ReadAheadInputStream}.
83           * <p>
84           * You must set an aspect that supports {@link #getInputStream()}, otherwise, this method throws an exception.
85           * </p>
86           * <p>
87           * This builder uses the following aspects:
88           * </p>
89           * <ul>
90           * <li>{@link #getInputStream()} gets the target aspect.</li>
91           * <li>{@link #getBufferSize()}</li>
92           * <li>{@link ExecutorService}</li>
93           * </ul>
94           *
95           * @return a new instance.
96           * @throws IllegalStateException         if the {@code origin} is {@code null}.
97           * @throws UnsupportedOperationException if the origin cannot be converted to an {@link InputStream}.
98           * @throws IOException                   if an I/O error occurs converting to an {@link InputStream} using {@link #getInputStream()}.
99           * @see #getInputStream()
100          * @see #getBufferSize()
101          * @see #getUnchecked()
102          */
103         @Override
104         public ReadAheadInputStream get() throws IOException {
105             return new ReadAheadInputStream(getInputStream(), getBufferSize(), executorService != null ? executorService : newExecutorService(),
106                     executorService == null);
107         }
108 
109         /**
110          * Sets the executor service for the read-ahead thread.
111          *
112          * @param executorService the executor service for the read-ahead thread.
113          * @return {@code this} instance.
114          */
115         public Builder setExecutorService(final ExecutorService executorService) {
116             this.executorService = executorService;
117             return this;
118         }
119 
120     }
121 
122     private static final ThreadLocal<byte[]> BYTE_ARRAY_1 = ThreadLocal.withInitial(() -> new byte[1]);
123 
124     /**
125      * Constructs a new {@link Builder}.
126      *
127      * @return a new {@link Builder}.
128      * @since 2.12.0
129      */
130     public static Builder builder() {
131         return new Builder();
132     }
133 
134     /**
135      * Constructs a new daemon thread.
136      *
137      * @param r the thread's runnable.
138      * @return a new daemon thread.
139      */
140     private static Thread newDaemonThread(final Runnable r) {
141         final Thread thread = new Thread(r, "commons-io-read-ahead");
142         thread.setDaemon(true);
143         return thread;
144     }
145 
146     /**
147      * Constructs a new daemon executor service.
148      *
149      * @return a new daemon executor service.
150      */
151     private static ExecutorService newExecutorService() {
152         return Executors.newSingleThreadExecutor(ReadAheadInputStream::newDaemonThread);
153     }
154 
155     private final ReentrantLock stateChangeLock = new ReentrantLock();
156 
157     // @GuardedBy("stateChangeLock")
158     private ByteBuffer activeBuffer;
159 
160     // @GuardedBy("stateChangeLock")
161     private ByteBuffer readAheadBuffer;
162 
163     // @GuardedBy("stateChangeLock")
164     private boolean endOfStream;
165 
166     // @GuardedBy("stateChangeLock")
167     // true if async read is in progress
168     private boolean readInProgress;
169 
170     // @GuardedBy("stateChangeLock")
171     // true if read is aborted due to an exception in reading from underlying input stream.
172     private boolean readAborted;
173 
174     // @GuardedBy("stateChangeLock")
175     private Throwable readException;
176 
177     // @GuardedBy("stateChangeLock")
178     // whether the close method is called.
179     private boolean isClosed;
180 
181     // @GuardedBy("stateChangeLock")
182     // true when the close method will close the underlying input stream. This is valid only if
183     // `isClosed` is true.
184     private boolean isUnderlyingInputStreamBeingClosed;
185 
186     // @GuardedBy("stateChangeLock")
187     // whether there is a read ahead task running,
188     private boolean isReading;
189 
190     // Whether there is a reader waiting for data.
191     private final AtomicBoolean isWaiting = new AtomicBoolean();
192 
193     private final ExecutorService executorService;
194 
195     private final boolean shutdownExecutorService;
196 
197     private final Condition asyncReadComplete = stateChangeLock.newCondition();
198 
199     /**
200      * Constructs an instance with the specified buffer size and read-ahead threshold
201      *
202      * @param inputStream       The underlying input stream.
203      * @param bufferSizeInBytes The buffer size.
204      * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
205      */
206     @Deprecated
207     public ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes) {
208         this(inputStream, bufferSizeInBytes, newExecutorService(), true);
209     }
210 
211     /**
212      * Constructs an instance with the specified buffer size and read-ahead threshold
213      *
214      * @param inputStream       The underlying input stream.
215      * @param bufferSizeInBytes The buffer size.
216      * @param executorService   An executor service for the read-ahead thread.
217      * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
218      */
219     @Deprecated
220     public ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes, final ExecutorService executorService) {
221         this(inputStream, bufferSizeInBytes, executorService, false);
222     }
223 
224     /**
225      * Constructs an instance with the specified buffer size and read-ahead threshold
226      *
227      * @param inputStream             The underlying input stream.
228      * @param bufferSizeInBytes       The buffer size.
229      * @param executorService         An executor service for the read-ahead thread.
230      * @param shutdownExecutorService Whether or not to shut down the given ExecutorService on close.
231      */
232     private ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes, final ExecutorService executorService,
233             final boolean shutdownExecutorService) {
234         super(Objects.requireNonNull(inputStream, "inputStream"));
235         if (bufferSizeInBytes <= 0) {
236             throw new IllegalArgumentException("bufferSizeInBytes should be greater than 0, but the value is " + bufferSizeInBytes);
237         }
238         this.executorService = Objects.requireNonNull(executorService, "executorService");
239         this.shutdownExecutorService = shutdownExecutorService;
240         this.activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
241         this.readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
242         this.activeBuffer.flip();
243         this.readAheadBuffer.flip();
244     }
245 
246     @Override
247     public int available() throws IOException {
248         stateChangeLock.lock();
249         // Make sure we have no integer overflow.
250         try {
251             return (int) Math.min(Integer.MAX_VALUE, (long) activeBuffer.remaining() + readAheadBuffer.remaining());
252         } finally {
253             stateChangeLock.unlock();
254         }
255     }
256 
257     private void checkReadException() throws IOException {
258         if (readAborted) {
259             if (readException instanceof IOException) {
260                 throw (IOException) readException;
261             }
262             throw new IOException(readException);
263         }
264     }
265 
266     @Override
267     public void close() throws IOException {
268         boolean isSafeToCloseUnderlyingInputStream = false;
269         stateChangeLock.lock();
270         try {
271             if (isClosed) {
272                 return;
273             }
274             isClosed = true;
275             if (!isReading) {
276                 // Nobody is reading, so we can close the underlying input stream in this method.
277                 isSafeToCloseUnderlyingInputStream = true;
278                 // Flip this to make sure the read ahead task will not close the underlying input stream.
279                 isUnderlyingInputStreamBeingClosed = true;
280             }
281         } finally {
282             stateChangeLock.unlock();
283         }
284 
285         if (shutdownExecutorService) {
286             try {
287                 executorService.shutdownNow();
288                 executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
289             } catch (final InterruptedException e) {
290                 final InterruptedIOException iio = new InterruptedIOException(e.getMessage());
291                 iio.initCause(e);
292                 throw iio;
293             } finally {
294                 if (isSafeToCloseUnderlyingInputStream) {
295                     super.close();
296                 }
297             }
298         }
299     }
300 
301     private void closeUnderlyingInputStreamIfNecessary() {
302         boolean needToCloseUnderlyingInputStream = false;
303         stateChangeLock.lock();
304         try {
305             isReading = false;
306             if (isClosed && !isUnderlyingInputStreamBeingClosed) {
307                 // close method cannot close underlyingInputStream because we were reading.
308                 needToCloseUnderlyingInputStream = true;
309             }
310         } finally {
311             stateChangeLock.unlock();
312         }
313         if (needToCloseUnderlyingInputStream) {
314             try {
315                 super.close();
316             } catch (final IOException ignored) {
317                 // TODO Rethrow as UncheckedIOException?
318             }
319         }
320     }
321 
322     private boolean isEndOfStream() {
323         return !activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream;
324     }
325 
326     @Override
327     public int read() throws IOException {
328         if (activeBuffer.hasRemaining()) {
329             // short path - just get one byte.
330             return activeBuffer.get() & 0xFF;
331         }
332         final byte[] oneByteArray = BYTE_ARRAY_1.get();
333         oneByteArray[0] = 0;
334         return read(oneByteArray, 0, 1) == EOF ? EOF : oneByteArray[0] & 0xFF;
335     }
336 
337     @Override
338     public int read(final byte[] b, final int offset, int len) throws IOException {
339         if (offset < 0 || len < 0 || len > b.length - offset) {
340             throw new IndexOutOfBoundsException();
341         }
342         if (len == 0) {
343             return 0;
344         }
345 
346         if (!activeBuffer.hasRemaining()) {
347             // No remaining in active buffer - lock and switch to write ahead buffer.
348             stateChangeLock.lock();
349             try {
350                 waitForAsyncReadComplete();
351                 if (!readAheadBuffer.hasRemaining()) {
352                     // The first read.
353                     readAsync();
354                     waitForAsyncReadComplete();
355                     if (isEndOfStream()) {
356                         return EOF;
357                     }
358                 }
359                 // Swap the newly read ahead buffer in place of empty active buffer.
360                 swapBuffers();
361                 // After swapping buffers, trigger another async read for read ahead buffer.
362                 readAsync();
363             } finally {
364                 stateChangeLock.unlock();
365             }
366         }
367         len = Math.min(len, activeBuffer.remaining());
368         activeBuffer.get(b, offset, len);
369 
370         return len;
371     }
372 
373     /**
374      * Read data from underlyingInputStream to readAheadBuffer asynchronously.
375      *
376      * @throws IOException if an I/O error occurs.
377      */
378     private void readAsync() throws IOException {
379         stateChangeLock.lock();
380         final byte[] arr;
381         try {
382             arr = readAheadBuffer.array();
383             if (endOfStream || readInProgress) {
384                 return;
385             }
386             checkReadException();
387             readAheadBuffer.position(0);
388             readAheadBuffer.flip();
389             readInProgress = true;
390         } finally {
391             stateChangeLock.unlock();
392         }
393         executorService.execute(() -> {
394             stateChangeLock.lock();
395             try {
396                 if (isClosed) {
397                     readInProgress = false;
398                     return;
399                 }
400                 // Flip this so that the close method will not close the underlying input stream when we
401                 // are reading.
402                 isReading = true;
403             } finally {
404                 stateChangeLock.unlock();
405             }
406 
407             // Please note that it is safe to release the lock and read into the read ahead buffer
408             // because either of following two conditions will hold:
409             //
410             // 1. The active buffer has data available to read so the reader will not read from the read ahead buffer.
411             //
412             // 2. This is the first time read is called or the active buffer is exhausted, in that case the reader waits
413             // for this async read to complete.
414             //
415             // So there is no race condition in both the situations.
416             int read = 0;
417             int off = 0;
418             int len = arr.length;
419             Throwable exception = null;
420             try {
421                 // try to fill the read ahead buffer.
422                 // if a reader is waiting, possibly return early.
423                 do {
424                     read = in.read(arr, off, len);
425                     if (read <= 0) {
426                         break;
427                     }
428                     off += read;
429                     len -= read;
430                 } while (len > 0 && !isWaiting.get());
431             } catch (final Throwable ex) {
432                 exception = ex;
433                 if (ex instanceof Error) {
434                     // `readException` may not be reported to the user. Rethrow Error to make sure at least
435                     // The user can see Error in UncaughtExceptionHandler.
436                     throw (Error) ex;
437                 }
438             } finally {
439                 stateChangeLock.lock();
440                 try {
441                     readAheadBuffer.limit(off);
442                     if (read < 0 || exception instanceof EOFException) {
443                         endOfStream = true;
444                     } else if (exception != null) {
445                         readAborted = true;
446                         readException = exception;
447                     }
448                     readInProgress = false;
449                     signalAsyncReadComplete();
450                 } finally {
451                     stateChangeLock.unlock();
452                 }
453                 closeUnderlyingInputStreamIfNecessary();
454             }
455         });
456     }
457 
458     private void signalAsyncReadComplete() {
459         stateChangeLock.lock();
460         try {
461             asyncReadComplete.signalAll();
462         } finally {
463             stateChangeLock.unlock();
464         }
465     }
466 
467     @Override
468     public long skip(final long n) throws IOException {
469         if (n <= 0L) {
470             return 0L;
471         }
472         if (n <= activeBuffer.remaining()) {
473             // Only skipping from active buffer is sufficient
474             activeBuffer.position((int) n + activeBuffer.position());
475             return n;
476         }
477         stateChangeLock.lock();
478         final long skipped;
479         try {
480             skipped = skipInternal(n);
481         } finally {
482             stateChangeLock.unlock();
483         }
484         return skipped;
485     }
486 
487     /**
488      * Internal skip function which should be called only from skip(). The assumption is that the stateChangeLock is already acquired in the caller before
489      * calling this function.
490      *
491      * @param n the number of bytes to be skipped.
492      * @return the actual number of bytes skipped.
493      * @throws IOException if an I/O error occurs.
494      */
495     private long skipInternal(final long n) throws IOException {
496         if (!stateChangeLock.isLocked()) {
497             throw new IllegalStateException("Expected stateChangeLock to be locked");
498         }
499         waitForAsyncReadComplete();
500         if (isEndOfStream()) {
501             return 0;
502         }
503         if (available() >= n) {
504             // we can skip from the internal buffers
505             int toSkip = (int) n;
506             // We need to skip from both active buffer and read ahead buffer
507             toSkip -= activeBuffer.remaining();
508             if (toSkip <= 0) { // skipping from activeBuffer already handled.
509                 throw new IllegalStateException("Expected toSkip > 0, actual: " + toSkip);
510             }
511             activeBuffer.position(0);
512             activeBuffer.flip();
513             readAheadBuffer.position(toSkip + readAheadBuffer.position());
514             swapBuffers();
515             // Trigger async read to emptied read ahead buffer.
516             readAsync();
517             return n;
518         }
519         final int skippedBytes = available();
520         final long toSkip = n - skippedBytes;
521         activeBuffer.position(0);
522         activeBuffer.flip();
523         readAheadBuffer.position(0);
524         readAheadBuffer.flip();
525         final long skippedFromInputStream = in.skip(toSkip);
526         readAsync();
527         return skippedBytes + skippedFromInputStream;
528     }
529 
530     /**
531      * Flips the active and read ahead buffer
532      */
533     private void swapBuffers() {
534         final ByteBuffer temp = activeBuffer;
535         activeBuffer = readAheadBuffer;
536         readAheadBuffer = temp;
537     }
538 
539     private void waitForAsyncReadComplete() throws IOException {
540         stateChangeLock.lock();
541         try {
542             isWaiting.set(true);
543             // There is only one reader, and one writer, so the writer should signal only once,
544             // but a while loop checking the wake-up condition is still needed to avoid spurious wakeups.
545             while (readInProgress) {
546                 asyncReadComplete.await();
547             }
548         } catch (final InterruptedException e) {
549             final InterruptedIOException iio = new InterruptedIOException(e.getMessage());
550             iio.initCause(e);
551             throw iio;
552         } finally {
553             try {
554                 isWaiting.set(false);
555             } finally {
556                 stateChangeLock.unlock();
557             }
558         }
559         checkReadException();
560     }
561 }