View Javadoc
1   /*
2    * Licensed under the Apache License, Version 2.0 (the "License");
3    * you may not use this file except in compliance with the License.
4    * You may obtain a copy of the License at
5    *
6    *     http://www.apache.org/licenses/LICENSE-2.0
7    *
8    * Unless required by applicable law or agreed to in writing, software
9    * distributed under the License is distributed on an "AS IS" BASIS,
10   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11   * See the License for the specific language governing permissions and
12   * limitations under the License.
13   */
14  package org.apache.commons.io.input;
15  
16  import static org.apache.commons.io.IOUtils.EOF;
17  
18  // import javax.annotation.concurrent.GuardedBy;
19  import java.io.EOFException;
20  import java.io.IOException;
21  import java.io.InputStream;
22  import java.io.InterruptedIOException;
23  import java.nio.ByteBuffer;
24  import java.util.Objects;
25  import java.util.concurrent.ExecutorService;
26  import java.util.concurrent.Executors;
27  import java.util.concurrent.TimeUnit;
28  import java.util.concurrent.atomic.AtomicBoolean;
29  import java.util.concurrent.locks.Condition;
30  import java.util.concurrent.locks.ReentrantLock;
31  
32  /**
33   * Implements {@link InputStream} to asynchronously read ahead from an underlying input stream when a specified amount
34   * of data has been read from the current buffer. It does so by maintaining two buffers: an active buffer and a read
35   * ahead buffer. The active buffer contains data which should be returned when a read() call is issued. The read ahead
36   * buffer is used to asynchronously read from the underlying input stream. When the current active buffer is exhausted,
37   * we flip the two buffers so that we can start reading from the read ahead buffer without being blocked by disk I/O.
38   * <p>
39   * This class was ported and adapted from Apache Spark commit 933dc6cb7b3de1d8ccaf73d124d6eb95b947ed19.
40   * </p>
41   *
42   * @since 2.9.0
43   */
44  public class ReadAheadInputStream extends InputStream {
45  
46      private static final ThreadLocal<byte[]> oneByte = ThreadLocal.withInitial(() -> new byte[1]);
47  
48      /**
49       * Creates a new daemon executor service.
50       *
51       * @return a new daemon executor service.
52       */
53      private static ExecutorService newExecutorService() {
54          return Executors.newSingleThreadExecutor(ReadAheadInputStream::newThread);
55      }
56  
57      /**
58       * Creates a new daemon thread.
59       *
60       * @param r the thread's runnable.
61       * @return a new daemon thread.
62       */
63      private static Thread newThread(final Runnable r) {
64          final Thread thread = new Thread(r, "commons-io-read-ahead");
65          thread.setDaemon(true);
66          return thread;
67      }
68  
69      private final ReentrantLock stateChangeLock = new ReentrantLock();
70  
71      // @GuardedBy("stateChangeLock")
72      private ByteBuffer activeBuffer;
73  
74      // @GuardedBy("stateChangeLock")
75      private ByteBuffer readAheadBuffer;
76  
77      // @GuardedBy("stateChangeLock")
78      private boolean endOfStream;
79  
80      // @GuardedBy("stateChangeLock")
81      // true if async read is in progress
82      private boolean readInProgress;
83  
84      // @GuardedBy("stateChangeLock")
85      // true if read is aborted due to an exception in reading from underlying input stream.
86      private boolean readAborted;
87  
88      // @GuardedBy("stateChangeLock")
89      private Throwable readException;
90  
91      // @GuardedBy("stateChangeLock")
92      // whether the close method is called.
93      private boolean isClosed;
94  
95      // @GuardedBy("stateChangeLock")
96      // true when the close method will close the underlying input stream. This is valid only if
97      // `isClosed` is true.
98      private boolean isUnderlyingInputStreamBeingClosed;
99  
100     // @GuardedBy("stateChangeLock")
101     // whether there is a read ahead task running,
102     private boolean isReading;
103 
104     // Whether there is a reader waiting for data.
105     private final AtomicBoolean isWaiting = new AtomicBoolean(false);
106 
107     private final InputStream underlyingInputStream;
108 
109     private final ExecutorService executorService;
110 
111     private final boolean shutdownExecutorService;
112 
113     private final Condition asyncReadComplete = stateChangeLock.newCondition();
114 
115     /**
116      * Creates an instance with the specified buffer size and read-ahead threshold
117      *
118      * @param inputStream The underlying input stream.
119      * @param bufferSizeInBytes The buffer size.
120      */
121     public ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes) {
122         this(inputStream, bufferSizeInBytes, newExecutorService(), true);
123     }
124 
125     /**
126      * Creates an instance with the specified buffer size and read-ahead threshold
127      *
128      * @param inputStream The underlying input stream.
129      * @param bufferSizeInBytes The buffer size.
130      * @param executorService An executor service for the read-ahead thread.
131      */
132     public ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes,
133         final ExecutorService executorService) {
134         this(inputStream, bufferSizeInBytes, executorService, false);
135     }
136 
137     /**
138      * Creates an instance with the specified buffer size and read-ahead threshold
139      *
140      * @param inputStream The underlying input stream.
141      * @param bufferSizeInBytes The buffer size.
142      * @param executorService An executor service for the read-ahead thread.
143      * @param shutdownExecutorService Whether or not to shutdown the given ExecutorService on close.
144      */
145     private ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes,
146         final ExecutorService executorService, final boolean shutdownExecutorService) {
147         if (bufferSizeInBytes <= 0) {
148             throw new IllegalArgumentException(
149                 "bufferSizeInBytes should be greater than 0, but the value is " + bufferSizeInBytes);
150         }
151         this.executorService = Objects.requireNonNull(executorService, "executorService");
152         this.underlyingInputStream = Objects.requireNonNull(inputStream, "inputStream");
153         this.shutdownExecutorService = shutdownExecutorService;
154         this.activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
155         this.readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
156         this.activeBuffer.flip();
157         this.readAheadBuffer.flip();
158     }
159 
160     @Override
161     public int available() throws IOException {
162         stateChangeLock.lock();
163         // Make sure we have no integer overflow.
164         try {
165             return (int) Math.min(Integer.MAX_VALUE, (long) activeBuffer.remaining() + readAheadBuffer.remaining());
166         } finally {
167             stateChangeLock.unlock();
168         }
169     }
170 
171     private void checkReadException() throws IOException {
172         if (readAborted) {
173             if (readException instanceof IOException) {
174                 throw (IOException) readException;
175             }
176             throw new IOException(readException);
177         }
178     }
179 
180     @Override
181     public void close() throws IOException {
182         boolean isSafeToCloseUnderlyingInputStream = false;
183         stateChangeLock.lock();
184         try {
185             if (isClosed) {
186                 return;
187             }
188             isClosed = true;
189             if (!isReading) {
190                 // Nobody is reading, so we can close the underlying input stream in this method.
191                 isSafeToCloseUnderlyingInputStream = true;
192                 // Flip this to make sure the read ahead task will not close the underlying input stream.
193                 isUnderlyingInputStreamBeingClosed = true;
194             }
195         } finally {
196             stateChangeLock.unlock();
197         }
198 
199         if (shutdownExecutorService) {
200             try {
201                 executorService.shutdownNow();
202                 executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
203             } catch (final InterruptedException e) {
204                 final InterruptedIOException iio = new InterruptedIOException(e.getMessage());
205                 iio.initCause(e);
206                 throw iio;
207             } finally {
208                 if (isSafeToCloseUnderlyingInputStream) {
209                     underlyingInputStream.close();
210                 }
211             }
212         }
213     }
214 
215     private void closeUnderlyingInputStreamIfNecessary() {
216         boolean needToCloseUnderlyingInputStream = false;
217         stateChangeLock.lock();
218         try {
219             isReading = false;
220             if (isClosed && !isUnderlyingInputStreamBeingClosed) {
221                 // close method cannot close underlyingInputStream because we were reading.
222                 needToCloseUnderlyingInputStream = true;
223             }
224         } finally {
225             stateChangeLock.unlock();
226         }
227         if (needToCloseUnderlyingInputStream) {
228             try {
229                 underlyingInputStream.close();
230             } catch (final IOException e) {
231                 // TODO ?
232             }
233         }
234     }
235 
236     private boolean isEndOfStream() {
237         return !activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream;
238     }
239 
240     @Override
241     public int read() throws IOException {
242         if (activeBuffer.hasRemaining()) {
243             // short path - just get one byte.
244             return activeBuffer.get() & 0xFF;
245         }
246         final byte[] oneByteArray = oneByte.get();
247         return read(oneByteArray, 0, 1) == EOF ? -1 : oneByteArray[0] & 0xFF;
248     }
249 
250     @Override
251     public int read(final byte[] b, final int offset, int len) throws IOException {
252         if (offset < 0 || len < 0 || len > b.length - offset) {
253             throw new IndexOutOfBoundsException();
254         }
255         if (len == 0) {
256             return 0;
257         }
258 
259         if (!activeBuffer.hasRemaining()) {
260             // No remaining in active buffer - lock and switch to write ahead buffer.
261             stateChangeLock.lock();
262             try {
263                 waitForAsyncReadComplete();
264                 if (!readAheadBuffer.hasRemaining()) {
265                     // The first read.
266                     readAsync();
267                     waitForAsyncReadComplete();
268                     if (isEndOfStream()) {
269                         return EOF;
270                     }
271                 }
272                 // Swap the newly read read ahead buffer in place of empty active buffer.
273                 swapBuffers();
274                 // After swapping buffers, trigger another async read for read ahead buffer.
275                 readAsync();
276             } finally {
277                 stateChangeLock.unlock();
278             }
279         }
280         len = Math.min(len, activeBuffer.remaining());
281         activeBuffer.get(b, offset, len);
282 
283         return len;
284     }
285 
286     /** Read data from underlyingInputStream to readAheadBuffer asynchronously. */
287     private void readAsync() throws IOException {
288         stateChangeLock.lock();
289         final byte[] arr;
290         try {
291             arr = readAheadBuffer.array();
292             if (endOfStream || readInProgress) {
293                 return;
294             }
295             checkReadException();
296             readAheadBuffer.position(0);
297             readAheadBuffer.flip();
298             readInProgress = true;
299         } finally {
300             stateChangeLock.unlock();
301         }
302         executorService.execute(() -> {
303             stateChangeLock.lock();
304             try {
305                 if (isClosed) {
306                     readInProgress = false;
307                     return;
308                 }
309                 // Flip this so that the close method will not close the underlying input stream when we
310                 // are reading.
311                 isReading = true;
312             } finally {
313                 stateChangeLock.unlock();
314             }
315 
316             // Please note that it is safe to release the lock and read into the read ahead buffer
317             // because either of following two conditions will hold:
318             //
319             // 1. The active buffer has data available to read so the reader will not read from the read ahead buffer.
320             //
321             // 2. This is the first time read is called or the active buffer is exhausted, in that case the reader waits
322             // for this async read to complete.
323             //
324             // So there is no race condition in both the situations.
325             int read = 0;
326             int off = 0, len = arr.length;
327             Throwable exception = null;
328             try {
329                 // try to fill the read ahead buffer.
330                 // if a reader is waiting, possibly return early.
331                 do {
332                     read = underlyingInputStream.read(arr, off, len);
333                     if (read <= 0) {
334                         break;
335                     }
336                     off += read;
337                     len -= read;
338                 } while (len > 0 && !isWaiting.get());
339             } catch (final Throwable ex) {
340                 exception = ex;
341                 if (ex instanceof Error) {
342                     // `readException` may not be reported to the user. Rethrow Error to make sure at least
343                     // The user can see Error in UncaughtExceptionHandler.
344                     throw (Error) ex;
345                 }
346             } finally {
347                 stateChangeLock.lock();
348                 try {
349                     readAheadBuffer.limit(off);
350                     if (read < 0 || (exception instanceof EOFException)) {
351                         endOfStream = true;
352                     } else if (exception != null) {
353                         readAborted = true;
354                         readException = exception;
355                     }
356                     readInProgress = false;
357                     signalAsyncReadComplete();
358                 } finally {
359                     stateChangeLock.unlock();
360                 }
361                 closeUnderlyingInputStreamIfNecessary();
362             }
363         });
364     }
365 
366     private void signalAsyncReadComplete() {
367         stateChangeLock.lock();
368         try {
369             asyncReadComplete.signalAll();
370         } finally {
371             stateChangeLock.unlock();
372         }
373     }
374 
375     @Override
376     public long skip(final long n) throws IOException {
377         if (n <= 0L) {
378             return 0L;
379         }
380         if (n <= activeBuffer.remaining()) {
381             // Only skipping from active buffer is sufficient
382             activeBuffer.position((int) n + activeBuffer.position());
383             return n;
384         }
385         stateChangeLock.lock();
386         long skipped;
387         try {
388             skipped = skipInternal(n);
389         } finally {
390             stateChangeLock.unlock();
391         }
392         return skipped;
393     }
394 
395     /**
396      * Internal skip function which should be called only from skip(). The assumption is that the stateChangeLock is
397      * already acquired in the caller before calling this function.
398      *
399      * @param n the number of bytes to be skipped.
400      * @return the actual number of bytes skipped.
401      */
402     private long skipInternal(final long n) throws IOException {
403         assert stateChangeLock.isLocked();
404         waitForAsyncReadComplete();
405         if (isEndOfStream()) {
406             return 0;
407         }
408         if (available() >= n) {
409             // we can skip from the internal buffers
410             int toSkip = (int) n;
411             // We need to skip from both active buffer and read ahead buffer
412             toSkip -= activeBuffer.remaining();
413             assert toSkip > 0; // skipping from activeBuffer already handled.
414             activeBuffer.position(0);
415             activeBuffer.flip();
416             readAheadBuffer.position(toSkip + readAheadBuffer.position());
417             swapBuffers();
418             // Trigger async read to emptied read ahead buffer.
419             readAsync();
420             return n;
421         }
422         final int skippedBytes = available();
423         final long toSkip = n - skippedBytes;
424         activeBuffer.position(0);
425         activeBuffer.flip();
426         readAheadBuffer.position(0);
427         readAheadBuffer.flip();
428         final long skippedFromInputStream = underlyingInputStream.skip(toSkip);
429         readAsync();
430         return skippedBytes + skippedFromInputStream;
431     }
432 
433     /**
434      * Flips the active and read ahead buffer
435      */
436     private void swapBuffers() {
437         final ByteBuffer temp = activeBuffer;
438         activeBuffer = readAheadBuffer;
439         readAheadBuffer = temp;
440     }
441 
442     private void waitForAsyncReadComplete() throws IOException {
443         stateChangeLock.lock();
444         try {
445             isWaiting.set(true);
446             // There is only one reader, and one writer, so the writer should signal only once,
447             // but a while loop checking the wake up condition is still needed to avoid spurious wakeups.
448             while (readInProgress) {
449                 asyncReadComplete.await();
450             }
451         } catch (final InterruptedException e) {
452             final InterruptedIOException iio = new InterruptedIOException(e.getMessage());
453             iio.initCause(e);
454             throw iio;
455         } finally {
456             isWaiting.set(false);
457             stateChangeLock.unlock();
458         }
459         checkReadException();
460     }
461 }