1 /*
2 * Licensed under the Apache License, Version 2.0 (the "License");
3 * you may not use this file except in compliance with the License.
4 * You may obtain a copy of the License at
5 *
6 * https://www.apache.org/licenses/LICENSE-2.0
7 *
8 * Unless required by applicable law or agreed to in writing, software
9 * distributed under the License is distributed on an "AS IS" BASIS,
10 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 * See the License for the specific language governing permissions and
12 * limitations under the License.
13 */
14 package org.apache.commons.io.input;
15
16 import static org.apache.commons.io.IOUtils.EOF;
17
18 // import javax.annotation.concurrent.GuardedBy;
19 import java.io.EOFException;
20 import java.io.FilterInputStream;
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.InterruptedIOException;
24 import java.nio.ByteBuffer;
25 import java.util.Objects;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.AtomicBoolean;
30 import java.util.concurrent.locks.Condition;
31 import java.util.concurrent.locks.ReentrantLock;
32
33 import org.apache.commons.io.IOUtils;
34 import org.apache.commons.io.build.AbstractStreamBuilder;
35
36 /**
37 * Implements {@link InputStream} to asynchronously read ahead from an underlying input stream when a specified amount of data has been read from the current
38 * buffer. It does so by maintaining two buffers: an active buffer and a read ahead buffer. The active buffer contains data which should be returned when a
39 * read() call is issued. The read ahead buffer is used to asynchronously read from the underlying input stream. When the current active buffer is exhausted, we
40 * flip the two buffers so that we can start reading from the read ahead buffer without being blocked by disk I/O.
41 * <p>
42 * To build an instance, use {@link Builder}.
43 * </p>
44 * <p>
45 * This class was ported and adapted from Apache Spark commit 933dc6cb7b3de1d8ccaf73d124d6eb95b947ed19.
46 * </p>
47 *
48 * @see Builder
49 * @since 2.9.0
50 */
51 public class ReadAheadInputStream extends FilterInputStream {
52
53 // @formatter:off
54 /**
55 * Builds a new {@link ReadAheadInputStream}.
56 *
57 * <p>
58 * For example:
59 * </p>
60 * <pre>{@code
61 * ReadAheadInputStream s = ReadAheadInputStream.builder()
62 * .setPath(path)
63 * .setExecutorService(Executors.newSingleThreadExecutor(ReadAheadInputStream::newThread))
64 * .get();}
65 * </pre>
66 *
67 * @see #get()
68 * @since 2.12.0
69 */
70 // @formatter:on
71 public static class Builder extends AbstractStreamBuilder<ReadAheadInputStream, Builder> {
72
73 private ExecutorService executorService;
74
75 /**
76 * Constructs a new builder of {@link ReadAheadInputStream}.
77 */
78 public Builder() {
79 // empty
80 }
81
82 /**
83 * Builds a new {@link ReadAheadInputStream}.
84 * <p>
85 * You must set an aspect that supports {@link #getInputStream()}, otherwise, this method throws an exception.
86 * </p>
87 * <p>
88 * This builder uses the following aspects:
89 * </p>
90 * <ul>
91 * <li>{@link #getInputStream()} gets the target aspect.</li>
92 * <li>{@link #getBufferSize()}</li>
93 * <li>{@link ExecutorService}</li>
94 * </ul>
95 *
96 * @return a new instance.
97 * @throws IllegalStateException if the {@code origin} is {@code null}.
98 * @throws UnsupportedOperationException if the origin cannot be converted to an {@link InputStream}.
99 * @throws IOException if an I/O error occurs converting to an {@link InputStream} using {@link #getInputStream()}.
100 * @see #getInputStream()
101 * @see #getBufferSize()
102 * @see #getUnchecked()
103 */
104 @Override
105 public ReadAheadInputStream get() throws IOException {
106 return new ReadAheadInputStream(this);
107 }
108
109 /**
110 * Sets the executor service for the read-ahead thread.
111 *
112 * @param executorService the executor service for the read-ahead thread.
113 * @return {@code this} instance.
114 */
115 public Builder setExecutorService(final ExecutorService executorService) {
116 this.executorService = executorService;
117 return this;
118 }
119
120 }
121
122 private static final ThreadLocal<byte[]> BYTE_ARRAY_1 = ThreadLocal.withInitial(() -> new byte[1]);
123
124 /**
125 * Constructs a new {@link Builder}.
126 *
127 * @return a new {@link Builder}.
128 * @since 2.12.0
129 */
130 public static Builder builder() {
131 return new Builder();
132 }
133
134 /**
135 * Constructs a new daemon thread.
136 *
137 * @param r the thread's runnable.
138 * @return a new daemon thread.
139 */
140 private static Thread newDaemonThread(final Runnable r) {
141 final Thread thread = new Thread(r, "commons-io-read-ahead");
142 thread.setDaemon(true);
143 return thread;
144 }
145
146 /**
147 * Constructs a new daemon executor service.
148 *
149 * @return a new daemon executor service.
150 */
151 private static ExecutorService newExecutorService() {
152 return Executors.newSingleThreadExecutor(ReadAheadInputStream::newDaemonThread);
153 }
154
155 private final ReentrantLock stateChangeLock = new ReentrantLock();
156
157 // @GuardedBy("stateChangeLock")
158 private ByteBuffer activeBuffer;
159
160 // @GuardedBy("stateChangeLock")
161 private ByteBuffer readAheadBuffer;
162
163 // @GuardedBy("stateChangeLock")
164 private boolean endOfStream;
165
166 // @GuardedBy("stateChangeLock")
167 // true if async read is in progress
168 private boolean readInProgress;
169
170 // @GuardedBy("stateChangeLock")
171 // true if read is aborted due to an exception in reading from underlying input stream.
172 private boolean readAborted;
173
174 // @GuardedBy("stateChangeLock")
175 private Throwable readException;
176
177 // @GuardedBy("stateChangeLock")
178 // whether the close method is called.
179 private boolean isClosed;
180
181 // @GuardedBy("stateChangeLock")
182 // true when the close method will close the underlying input stream. This is valid only if
183 // `isClosed` is true.
184 private boolean isUnderlyingInputStreamBeingClosed;
185
186 // @GuardedBy("stateChangeLock")
187 // whether there is a read ahead task running,
188 private boolean isReading;
189
190 // Whether there is a reader waiting for data.
191 private final AtomicBoolean isWaiting = new AtomicBoolean();
192
193 private final ExecutorService executorService;
194
195 private final boolean shutdownExecutorService;
196
197 private final Condition asyncReadComplete = stateChangeLock.newCondition();
198
199 @SuppressWarnings("resource")
200 private ReadAheadInputStream(final Builder builder) throws IOException {
201 this(builder.getInputStream(), builder.getBufferSize(), builder.executorService != null ? builder.executorService : newExecutorService(),
202 builder.executorService == null);
203 }
204
205 /**
206 * Constructs an instance with the specified buffer size and read-ahead threshold
207 *
208 * @param inputStream The underlying input stream.
209 * @param bufferSizeInBytes The buffer size.
210 * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
211 */
212 @Deprecated
213 public ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes) {
214 this(inputStream, bufferSizeInBytes, newExecutorService(), true);
215 }
216
217 /**
218 * Constructs an instance with the specified buffer size and read-ahead threshold
219 *
220 * @param inputStream The underlying input stream.
221 * @param bufferSizeInBytes The buffer size.
222 * @param executorService An executor service for the read-ahead thread.
223 * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
224 */
225 @Deprecated
226 public ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes, final ExecutorService executorService) {
227 this(inputStream, bufferSizeInBytes, executorService, false);
228 }
229
230 /**
231 * Constructs an instance with the specified buffer size and read-ahead threshold
232 *
233 * @param inputStream The underlying input stream.
234 * @param bufferSizeInBytes The buffer size.
235 * @param executorService An executor service for the read-ahead thread.
236 * @param shutdownExecutorService Whether or not to shut down the given ExecutorService on close.
237 */
238 private ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes, final ExecutorService executorService,
239 final boolean shutdownExecutorService) {
240 super(Objects.requireNonNull(inputStream, "inputStream"));
241 if (bufferSizeInBytes <= 0) {
242 throw new IllegalArgumentException(String.format("bufferSizeInBytes <= 0, bufferSizeInBytes = %,d", bufferSizeInBytes));
243 }
244 this.executorService = Objects.requireNonNull(executorService, "executorService");
245 this.shutdownExecutorService = shutdownExecutorService;
246 this.activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
247 this.readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
248 this.activeBuffer.flip();
249 this.readAheadBuffer.flip();
250 }
251
252 @Override
253 public int available() throws IOException {
254 stateChangeLock.lock();
255 // Make sure we have no integer overflow.
256 try {
257 return (int) Math.min(Integer.MAX_VALUE, (long) activeBuffer.remaining() + readAheadBuffer.remaining());
258 } finally {
259 stateChangeLock.unlock();
260 }
261 }
262
263 private void checkReadException() throws IOException {
264 if (readAborted) {
265 if (readException instanceof IOException) {
266 throw (IOException) readException;
267 }
268 throw new IOException(readException);
269 }
270 }
271
272 @Override
273 public void close() throws IOException {
274 boolean isSafeToCloseUnderlyingInputStream = false;
275 stateChangeLock.lock();
276 try {
277 if (isClosed) {
278 return;
279 }
280 isClosed = true;
281 if (!isReading) {
282 // Nobody is reading, so we can close the underlying input stream in this method.
283 isSafeToCloseUnderlyingInputStream = true;
284 // Flip this to make sure the read ahead task will not close the underlying input stream.
285 isUnderlyingInputStreamBeingClosed = true;
286 }
287 } finally {
288 stateChangeLock.unlock();
289 }
290
291 if (shutdownExecutorService) {
292 try {
293 executorService.shutdownNow();
294 executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
295 } catch (final InterruptedException e) {
296 final InterruptedIOException iio = new InterruptedIOException(e.getMessage());
297 iio.initCause(e);
298 throw iio;
299 } finally {
300 if (isSafeToCloseUnderlyingInputStream) {
301 super.close();
302 }
303 }
304 }
305 }
306
307 private void closeUnderlyingInputStreamIfNecessary() {
308 boolean needToCloseUnderlyingInputStream = false;
309 stateChangeLock.lock();
310 try {
311 isReading = false;
312 if (isClosed && !isUnderlyingInputStreamBeingClosed) {
313 // close method cannot close underlyingInputStream because we were reading.
314 needToCloseUnderlyingInputStream = true;
315 }
316 } finally {
317 stateChangeLock.unlock();
318 }
319 if (needToCloseUnderlyingInputStream) {
320 try {
321 super.close();
322 } catch (final IOException ignored) {
323 // TODO Rethrow as UncheckedIOException?
324 }
325 }
326 }
327
328 private boolean isEndOfStream() {
329 return !activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream;
330 }
331
332 @Override
333 public int read() throws IOException {
334 if (activeBuffer.hasRemaining()) {
335 // short path - just get one byte.
336 return activeBuffer.get() & 0xFF;
337 }
338 final byte[] oneByteArray = BYTE_ARRAY_1.get();
339 oneByteArray[0] = 0;
340 return read(oneByteArray, 0, 1) == EOF ? EOF : oneByteArray[0] & 0xFF;
341 }
342
343 @Override
344 public int read(final byte[] b, final int offset, int len) throws IOException {
345 IOUtils.checkFromIndexSize(b, offset, len);
346 if (len == 0) {
347 return 0;
348 }
349
350 if (!activeBuffer.hasRemaining()) {
351 // No remaining in active buffer - lock and switch to write ahead buffer.
352 stateChangeLock.lock();
353 try {
354 waitForAsyncReadComplete();
355 if (!readAheadBuffer.hasRemaining()) {
356 // The first read.
357 readAsync();
358 waitForAsyncReadComplete();
359 if (isEndOfStream()) {
360 return EOF;
361 }
362 }
363 // Swap the newly read ahead buffer in place of empty active buffer.
364 swapBuffers();
365 // After swapping buffers, trigger another async read for read ahead buffer.
366 readAsync();
367 } finally {
368 stateChangeLock.unlock();
369 }
370 }
371 len = Math.min(len, activeBuffer.remaining());
372 activeBuffer.get(b, offset, len);
373
374 return len;
375 }
376
377 /**
378 * Reads data from underlyingInputStream to readAheadBuffer asynchronously.
379 *
380 * @throws IOException if an I/O error occurs.
381 */
382 private void readAsync() throws IOException {
383 stateChangeLock.lock();
384 final byte[] arr;
385 try {
386 arr = readAheadBuffer.array();
387 if (endOfStream || readInProgress) {
388 return;
389 }
390 checkReadException();
391 readAheadBuffer.position(0);
392 readAheadBuffer.flip();
393 readInProgress = true;
394 } finally {
395 stateChangeLock.unlock();
396 }
397 executorService.execute(() -> {
398 stateChangeLock.lock();
399 try {
400 if (isClosed) {
401 readInProgress = false;
402 return;
403 }
404 // Flip this so that the close method will not close the underlying input stream when we
405 // are reading.
406 isReading = true;
407 } finally {
408 stateChangeLock.unlock();
409 }
410
411 // Please note that it is safe to release the lock and read into the read ahead buffer
412 // because either of following two conditions will hold:
413 //
414 // 1. The active buffer has data available to read so the reader will not read from the read ahead buffer.
415 //
416 // 2. This is the first time read is called or the active buffer is exhausted, in that case the reader waits
417 // for this async read to complete.
418 //
419 // So there is no race condition in both the situations.
420 int read = 0;
421 int off = 0;
422 int len = arr.length;
423 Throwable exception = null;
424 try {
425 // try to fill the read ahead buffer.
426 // if a reader is waiting, possibly return early.
427 do {
428 read = in.read(arr, off, len);
429 if (read <= 0) {
430 break;
431 }
432 off += read;
433 len -= read;
434 } while (len > 0 && !isWaiting.get());
435 } catch (final Throwable ex) {
436 exception = ex;
437 if (ex instanceof Error) {
438 // `readException` may not be reported to the user. Rethrow Error to make sure at least
439 // The user can see Error in UncaughtExceptionHandler.
440 throw (Error) ex;
441 }
442 } finally {
443 stateChangeLock.lock();
444 try {
445 readAheadBuffer.limit(off);
446 if (read < 0 || exception instanceof EOFException) {
447 endOfStream = true;
448 } else if (exception != null) {
449 readAborted = true;
450 readException = exception;
451 }
452 readInProgress = false;
453 signalAsyncReadComplete();
454 } finally {
455 stateChangeLock.unlock();
456 }
457 closeUnderlyingInputStreamIfNecessary();
458 }
459 });
460 }
461
462 private void signalAsyncReadComplete() {
463 stateChangeLock.lock();
464 try {
465 asyncReadComplete.signalAll();
466 } finally {
467 stateChangeLock.unlock();
468 }
469 }
470
471 @Override
472 public long skip(final long n) throws IOException {
473 if (n <= 0L) {
474 return 0L;
475 }
476 if (n <= activeBuffer.remaining()) {
477 // Only skipping from active buffer is sufficient
478 activeBuffer.position((int) n + activeBuffer.position());
479 return n;
480 }
481 stateChangeLock.lock();
482 final long skipped;
483 try {
484 skipped = skipInternal(n);
485 } finally {
486 stateChangeLock.unlock();
487 }
488 return skipped;
489 }
490
491 /**
492 * Internal skip function which should be called only from skip(). The assumption is that the stateChangeLock is already acquired in the caller before
493 * calling this function.
494 *
495 * @param n the number of bytes to be skipped.
496 * @return the actual number of bytes skipped.
497 * @throws IOException if an I/O error occurs.
498 */
499 private long skipInternal(final long n) throws IOException {
500 if (!stateChangeLock.isLocked()) {
501 throw new IllegalStateException("Expected stateChangeLock to be locked");
502 }
503 waitForAsyncReadComplete();
504 if (isEndOfStream()) {
505 return 0;
506 }
507 if (available() >= n) {
508 // we can skip from the internal buffers
509 int toSkip = (int) n;
510 // We need to skip from both active buffer and read ahead buffer
511 toSkip -= activeBuffer.remaining();
512 if (toSkip <= 0) { // skipping from activeBuffer already handled.
513 throw new IllegalStateException("Expected toSkip > 0, actual: " + toSkip);
514 }
515 activeBuffer.position(0);
516 activeBuffer.flip();
517 readAheadBuffer.position(toSkip + readAheadBuffer.position());
518 swapBuffers();
519 // Trigger async read to emptied read ahead buffer.
520 readAsync();
521 return n;
522 }
523 final int skippedBytes = available();
524 final long toSkip = n - skippedBytes;
525 activeBuffer.position(0);
526 activeBuffer.flip();
527 readAheadBuffer.position(0);
528 readAheadBuffer.flip();
529 final long skippedFromInputStream = in.skip(toSkip);
530 readAsync();
531 return skippedBytes + skippedFromInputStream;
532 }
533
534 /**
535 * Flips the active and read ahead buffer
536 */
537 private void swapBuffers() {
538 final ByteBuffer temp = activeBuffer;
539 activeBuffer = readAheadBuffer;
540 readAheadBuffer = temp;
541 }
542
543 private void waitForAsyncReadComplete() throws IOException {
544 stateChangeLock.lock();
545 try {
546 isWaiting.set(true);
547 // There is only one reader, and one writer, so the writer should signal only once,
548 // but a while loop checking the wake-up condition is still needed to avoid spurious wakeups.
549 while (readInProgress) {
550 asyncReadComplete.await();
551 }
552 } catch (final InterruptedException e) {
553 final InterruptedIOException iio = new InterruptedIOException(e.getMessage());
554 iio.initCause(e);
555 throw iio;
556 } finally {
557 try {
558 isWaiting.set(false);
559 } finally {
560 stateChangeLock.unlock();
561 }
562 }
563 checkReadException();
564 }
565 }