1 /*
2 * Licensed under the Apache License, Version 2.0 (the "License");
3 * you may not use this file except in compliance with the License.
4 * You may obtain a copy of the License at
5 *
6 * https://www.apache.org/licenses/LICENSE-2.0
7 *
8 * Unless required by applicable law or agreed to in writing, software
9 * distributed under the License is distributed on an "AS IS" BASIS,
10 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 * See the License for the specific language governing permissions and
12 * limitations under the License.
13 */
14 package org.apache.commons.io.input;
15
16 import static org.apache.commons.io.IOUtils.EOF;
17
18 // import javax.annotation.concurrent.GuardedBy;
19 import java.io.EOFException;
20 import java.io.FilterInputStream;
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.nio.ByteBuffer;
24 import java.util.Objects;
25 import java.util.concurrent.ExecutorService;
26 import java.util.concurrent.Executors;
27 import java.util.concurrent.TimeUnit;
28 import java.util.concurrent.atomic.AtomicBoolean;
29 import java.util.concurrent.locks.Condition;
30 import java.util.concurrent.locks.ReentrantLock;
31
32 import org.apache.commons.io.IOUtils;
33 import org.apache.commons.io.build.AbstractStreamBuilder;
34
35 /**
36 * Implements {@link InputStream} to asynchronously read ahead from an underlying input stream when a specified amount of data has been read from the current
37 * buffer. It does so by maintaining two buffers: an active buffer and a read ahead buffer. The active buffer contains data which should be returned when a
38 * read() call is issued. The read ahead buffer is used to asynchronously read from the underlying input stream. When the current active buffer is exhausted, we
39 * flip the two buffers so that we can start reading from the read ahead buffer without being blocked by disk I/O.
40 * <p>
41 * To build an instance, use {@link Builder}.
42 * </p>
43 * <p>
44 * This class was ported and adapted from Apache Spark commit 933dc6cb7b3de1d8ccaf73d124d6eb95b947ed19.
45 * </p>
46 *
47 * @see Builder
48 * @since 2.9.0
49 */
50 public class ReadAheadInputStream extends FilterInputStream {
51
52 // @formatter:off
53 /**
54 * Builds a new {@link ReadAheadInputStream}.
55 *
56 * <p>
57 * For example:
58 * </p>
59 * <pre>{@code
60 * ReadAheadInputStream s = ReadAheadInputStream.builder()
61 * .setPath(path)
62 * .setExecutorService(Executors.newSingleThreadExecutor(ReadAheadInputStream::newThread))
63 * .get();}
64 * </pre>
65 * <p>
66 * If an {@link ExecutorService} is not set, then a single-threaded daemon executor service is used.
67 * </p>
68 *
69 * @see #get()
70 * @since 2.12.0
71 */
72 // @formatter:on
73 public static class Builder extends AbstractStreamBuilder<ReadAheadInputStream, Builder> {
74
75 private ExecutorService executorService;
76
77 /**
78 * Constructs a new builder of {@link ReadAheadInputStream}.
79 */
80 public Builder() {
81 // empty
82 }
83
84 /**
85 * Builds a new {@link ReadAheadInputStream}.
86 * <p>
87 * You must set an aspect that supports {@link #getInputStream()}, otherwise, this method throws an exception.
88 * </p>
89 * <p>
90 * This builder uses the following aspects:
91 * </p>
92 * <ul>
93 * <li>{@link #getInputStream()} gets the target aspect.</li>
94 * <li>{@link #getBufferSize()}</li>
95 * <li>{@link ExecutorService}, if not set, a single-threaded daemon executor service is used.</li>
96 * </ul>
97 *
98 * @return a new instance.
99 * @throws IllegalStateException if the {@code origin} is {@code null}.
100 * @throws UnsupportedOperationException if the origin cannot be converted to an {@link InputStream}.
101 * @throws IOException if an I/O error occurs converting to an {@link InputStream} using {@link #getInputStream()}.
102 * @see #getInputStream()
103 * @see #getBufferSize()
104 * @see #getUnchecked()
105 */
106 @Override
107 public ReadAheadInputStream get() throws IOException {
108 return new ReadAheadInputStream(this);
109 }
110
111 /**
112 * Sets the executor service for the read-ahead thread.
113 * <p>
114 * If not set, a single-threaded daemon executor service is used.
115 * </p>
116 *
117 * @param executorService the executor service for the read-ahead thread, may be {@code null}.
118 * @return {@code this} instance.
119 */
120 public Builder setExecutorService(final ExecutorService executorService) {
121 this.executorService = executorService;
122 return this;
123 }
124
125 }
126
127 private static final ThreadLocal<byte[]> BYTE_ARRAY_1 = ThreadLocal.withInitial(() -> new byte[1]);
128
129 /**
130 * Constructs a new {@link Builder}.
131 *
132 * @return a new {@link Builder}.
133 * @since 2.12.0
134 */
135 public static Builder builder() {
136 return new Builder();
137 }
138
139 /**
140 * Constructs a new daemon thread.
141 *
142 * @param r the thread's runnable.
143 * @return a new daemon thread.
144 */
145 private static Thread newDaemonThread(final Runnable r) {
146 final Thread thread = new Thread(r, "commons-io-read-ahead");
147 thread.setDaemon(true);
148 return thread;
149 }
150
151 /**
152 * Constructs a new daemon executor service.
153 *
154 * @return a new daemon executor service.
155 */
156 private static ExecutorService newExecutorService() {
157 return Executors.newSingleThreadExecutor(ReadAheadInputStream::newDaemonThread);
158 }
159
160 private final ReentrantLock stateChangeLock = new ReentrantLock();
161
162 // @GuardedBy("stateChangeLock")
163 private ByteBuffer activeBuffer;
164
165 // @GuardedBy("stateChangeLock")
166 private ByteBuffer readAheadBuffer;
167
168 // @GuardedBy("stateChangeLock")
169 private boolean endOfStream;
170
171 // @GuardedBy("stateChangeLock")
172 // true if async read is in progress
173 private boolean readInProgress;
174
175 // @GuardedBy("stateChangeLock")
176 // true if read is aborted due to an exception in reading from underlying input stream.
177 private boolean readAborted;
178
179 // @GuardedBy("stateChangeLock")
180 private Throwable readException;
181
182 // @GuardedBy("stateChangeLock")
183 // whether the close method is called.
184 private boolean isClosed;
185
186 // @GuardedBy("stateChangeLock")
187 // true when the close method will close the underlying input stream. This is valid only if
188 // `isClosed` is true.
189 private boolean isUnderlyingInputStreamBeingClosed;
190
191 // @GuardedBy("stateChangeLock")
192 // whether there is a read ahead task running,
193 private boolean isReading;
194
195 // Whether there is a reader waiting for data.
196 private final AtomicBoolean isWaiting = new AtomicBoolean();
197
198 private final ExecutorService executorService;
199
200 private final boolean shutdownExecutorService;
201
202 private final Condition asyncReadComplete = stateChangeLock.newCondition();
203
204 @SuppressWarnings("resource")
205 private ReadAheadInputStream(final Builder builder) throws IOException {
206 this(builder.getInputStream(), builder.getBufferSize(), builder.executorService != null ? builder.executorService : newExecutorService(),
207 builder.executorService == null);
208 }
209
210 /**
211 * Constructs an instance with the specified buffer size and read-ahead threshold.
212 *
213 * @param inputStream The underlying input stream.
214 * @param bufferSizeInBytes The buffer size.
215 * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
216 */
217 @Deprecated
218 public ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes) {
219 this(inputStream, bufferSizeInBytes, newExecutorService(), true);
220 }
221
222 /**
223 * Constructs an instance with the specified buffer size and read-ahead threshold.
224 *
225 * @param inputStream The underlying input stream.
226 * @param bufferSizeInBytes The buffer size.
227 * @param executorService An executor service for the read-ahead thread.
228 * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
229 */
230 @Deprecated
231 public ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes, final ExecutorService executorService) {
232 this(inputStream, bufferSizeInBytes, executorService, false);
233 }
234
235 /**
236 * Constructs an instance with the specified buffer size and read-ahead threshold.
237 *
238 * @param inputStream The underlying input stream.
239 * @param bufferSizeInBytes The buffer size.
240 * @param executorService An executor service for the read-ahead thread.
241 * @param shutdownExecutorService Whether or not to shut down the given ExecutorService on close.
242 */
243 private ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes, final ExecutorService executorService,
244 final boolean shutdownExecutorService) {
245 super(Objects.requireNonNull(inputStream, "inputStream"));
246 if (bufferSizeInBytes <= 0) {
247 throw new IllegalArgumentException(String.format("bufferSizeInBytes <= 0, bufferSizeInBytes = %,d", bufferSizeInBytes));
248 }
249 this.executorService = Objects.requireNonNull(executorService, "executorService");
250 this.shutdownExecutorService = shutdownExecutorService;
251 this.activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
252 this.readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
253 this.activeBuffer.flip();
254 this.readAheadBuffer.flip();
255 }
256
257 @Override
258 public int available() throws IOException {
259 stateChangeLock.lock();
260 // Make sure we have no integer overflow.
261 try {
262 return (int) Math.min(Integer.MAX_VALUE, (long) activeBuffer.remaining() + readAheadBuffer.remaining());
263 } finally {
264 stateChangeLock.unlock();
265 }
266 }
267
268 private void checkReadException() throws IOException {
269 if (readAborted) {
270 if (readException instanceof IOException) {
271 throw (IOException) readException;
272 }
273 throw new IOException(readException);
274 }
275 }
276
277 @Override
278 public void close() throws IOException {
279 boolean isSafeToCloseUnderlyingInputStream = false;
280 stateChangeLock.lock();
281 try {
282 if (isClosed) {
283 return;
284 }
285 isClosed = true;
286 if (!isReading) {
287 // Nobody is reading, so we can close the underlying input stream in this method.
288 isSafeToCloseUnderlyingInputStream = true;
289 // Flip this to make sure the read ahead task will not close the underlying input stream.
290 isUnderlyingInputStreamBeingClosed = true;
291 }
292 } finally {
293 stateChangeLock.unlock();
294 }
295 if (shutdownExecutorService) {
296 try {
297 shutdownAwait();
298 } catch (final InterruptedException e) {
299 Thread.currentThread().interrupt();
300 throw Input.toInterruptedIOException(e);
301 } finally {
302 if (isSafeToCloseUnderlyingInputStream) {
303 super.close();
304 }
305 }
306 }
307 if (isSafeToCloseUnderlyingInputStream) {
308 super.close();
309 }
310 }
311
312 private void closeUnderlyingInputStreamIfNecessary() {
313 boolean needToCloseUnderlyingInputStream = false;
314 stateChangeLock.lock();
315 try {
316 isReading = false;
317 if (isClosed && !isUnderlyingInputStreamBeingClosed) {
318 // close method cannot close underlyingInputStream because we were reading.
319 needToCloseUnderlyingInputStream = true;
320 }
321 } finally {
322 stateChangeLock.unlock();
323 }
324 if (needToCloseUnderlyingInputStream) {
325 try {
326 super.close();
327 } catch (final IOException ignored) {
328 // TODO Rethrow as UncheckedIOException?
329 }
330 }
331 }
332
333 private boolean isEndOfStream() {
334 return !activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream;
335 }
336
337 @Override
338 public int read() throws IOException {
339 if (activeBuffer.hasRemaining()) {
340 // short path - just get one byte.
341 return activeBuffer.get() & 0xFF;
342 }
343 final byte[] oneByteArray = BYTE_ARRAY_1.get();
344 oneByteArray[0] = 0;
345 return read(oneByteArray, 0, 1) == EOF ? EOF : oneByteArray[0] & 0xFF;
346 }
347
348 @Override
349 public int read(final byte[] b, final int offset, int len) throws IOException {
350 IOUtils.checkFromIndexSize(b, offset, len);
351 if (len == 0) {
352 return 0;
353 }
354 if (!activeBuffer.hasRemaining()) {
355 // No remaining in active buffer - lock and switch to write ahead buffer.
356 stateChangeLock.lock();
357 try {
358 waitForAsyncReadComplete();
359 if (!readAheadBuffer.hasRemaining()) {
360 // The first read.
361 readAsync();
362 waitForAsyncReadComplete();
363 if (isEndOfStream()) {
364 return EOF;
365 }
366 }
367 // Swap the newly read ahead buffer in place of empty active buffer.
368 swapBuffers();
369 // After swapping buffers, trigger another async read for read ahead buffer.
370 readAsync();
371 } finally {
372 stateChangeLock.unlock();
373 }
374 }
375 len = Math.min(len, activeBuffer.remaining());
376 activeBuffer.get(b, offset, len);
377
378 return len;
379 }
380
381 /**
382 * Reads data from underlyingInputStream to readAheadBuffer asynchronously.
383 *
384 * @throws IOException if an I/O error occurs.
385 */
386 private void readAsync() throws IOException {
387 stateChangeLock.lock();
388 final byte[] arr;
389 try {
390 arr = readAheadBuffer.array();
391 if (endOfStream || readInProgress) {
392 return;
393 }
394 checkReadException();
395 readAheadBuffer.position(0);
396 readAheadBuffer.flip();
397 readInProgress = true;
398 } finally {
399 stateChangeLock.unlock();
400 }
401 executorService.execute(() -> {
402 stateChangeLock.lock();
403 try {
404 if (isClosed) {
405 readInProgress = false;
406 return;
407 }
408 // Flip this so that the close method will not close the underlying input stream when we
409 // are reading.
410 isReading = true;
411 } finally {
412 stateChangeLock.unlock();
413 }
414
415 // Please note that it is safe to release the lock and read into the read ahead buffer
416 // because either of following two conditions will hold:
417 //
418 // 1. The active buffer has data available to read so the reader will not read from the read ahead buffer.
419 //
420 // 2. This is the first time read is called or the active buffer is exhausted, in that case the reader waits
421 // for this async read to complete.
422 //
423 // So there is no race condition in both the situations.
424 int read = 0;
425 int off = 0;
426 int len = arr.length;
427 Throwable exception = null;
428 try {
429 // try to fill the read ahead buffer.
430 // if a reader is waiting, possibly return early.
431 do {
432 read = in.read(arr, off, len);
433 if (read <= 0) {
434 break;
435 }
436 off += read;
437 len -= read;
438 } while (len > 0 && !isWaiting.get());
439 } catch (final Throwable ex) {
440 exception = ex;
441 if (ex instanceof Error) {
442 // `readException` may not be reported to the user. Rethrow Error to make sure at least
443 // The user can see Error in UncaughtExceptionHandler.
444 throw (Error) ex;
445 }
446 } finally {
447 stateChangeLock.lock();
448 try {
449 readAheadBuffer.limit(off);
450 if (read < 0 || exception instanceof EOFException) {
451 endOfStream = true;
452 } else if (exception != null) {
453 readAborted = true;
454 readException = exception;
455 }
456 readInProgress = false;
457 signalAsyncReadComplete();
458 } finally {
459 stateChangeLock.unlock();
460 }
461 closeUnderlyingInputStreamIfNecessary();
462 }
463 });
464 }
465
466 boolean shutdownAwait() throws InterruptedException {
467 executorService.shutdownNow();
468 return executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
469 }
470
471 private void signalAsyncReadComplete() {
472 stateChangeLock.lock();
473 try {
474 asyncReadComplete.signalAll();
475 } finally {
476 stateChangeLock.unlock();
477 }
478 }
479
480 @Override
481 public long skip(final long n) throws IOException {
482 if (n <= 0L) {
483 return 0L;
484 }
485 if (n <= activeBuffer.remaining()) {
486 // Only skipping from active buffer is sufficient
487 activeBuffer.position((int) n + activeBuffer.position());
488 return n;
489 }
490 stateChangeLock.lock();
491 final long skipped;
492 try {
493 skipped = skipInternal(n);
494 } finally {
495 stateChangeLock.unlock();
496 }
497 return skipped;
498 }
499
500 /**
501 * Internal skip function which should be called only from skip(). The assumption is that the stateChangeLock is already acquired in the caller before
502 * calling this function.
503 *
504 * @param n the number of bytes to be skipped.
505 * @return the actual number of bytes skipped.
506 * @throws IOException if an I/O error occurs.
507 */
508 private long skipInternal(final long n) throws IOException {
509 if (!stateChangeLock.isLocked()) {
510 throw new IllegalStateException("Expected stateChangeLock to be locked");
511 }
512 waitForAsyncReadComplete();
513 if (isEndOfStream()) {
514 return 0;
515 }
516 if (available() >= n) {
517 // we can skip from the internal buffers
518 int toSkip = (int) n;
519 // We need to skip from both active buffer and read ahead buffer
520 toSkip -= activeBuffer.remaining();
521 if (toSkip <= 0) { // skipping from activeBuffer already handled.
522 throw new IllegalStateException("Expected toSkip > 0, actual: " + toSkip);
523 }
524 activeBuffer.position(0);
525 activeBuffer.flip();
526 readAheadBuffer.position(toSkip + readAheadBuffer.position());
527 swapBuffers();
528 // Trigger async read to emptied read ahead buffer.
529 readAsync();
530 return n;
531 }
532 final int skippedBytes = available();
533 final long toSkip = n - skippedBytes;
534 activeBuffer.position(0);
535 activeBuffer.flip();
536 readAheadBuffer.position(0);
537 readAheadBuffer.flip();
538 final long skippedFromInputStream = in.skip(toSkip);
539 readAsync();
540 return skippedBytes + skippedFromInputStream;
541 }
542
543 /**
544 * Flips the active and read ahead buffers.
545 */
546 private void swapBuffers() {
547 final ByteBuffer temp = activeBuffer;
548 activeBuffer = readAheadBuffer;
549 readAheadBuffer = temp;
550 }
551
552 private void waitForAsyncReadComplete() throws IOException {
553 stateChangeLock.lock();
554 try {
555 isWaiting.set(true);
556 // There is only one reader, and one writer, so the writer should signal only once,
557 // but a while loop checking the wake-up condition is still needed to avoid spurious wakeups.
558 while (readInProgress) {
559 asyncReadComplete.await();
560 }
561 } catch (final InterruptedException e) {
562 Thread.currentThread().interrupt();
563 throw Input.toInterruptedIOException(e);
564 } finally {
565 try {
566 isWaiting.set(false);
567 } finally {
568 stateChangeLock.unlock();
569 }
570 }
571 checkReadException();
572 }
573 }