1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.apache.commons.io.input;
15
16 import static org.apache.commons.io.IOUtils.EOF;
17
18
19 import java.io.EOFException;
20 import java.io.FilterInputStream;
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.InterruptedIOException;
24 import java.nio.ByteBuffer;
25 import java.util.Objects;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.AtomicBoolean;
30 import java.util.concurrent.locks.Condition;
31 import java.util.concurrent.locks.ReentrantLock;
32
33 import org.apache.commons.io.build.AbstractStreamBuilder;
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50 public class ReadAheadInputStream extends FilterInputStream {
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70 public static class Builder extends AbstractStreamBuilder<ReadAheadInputStream, Builder> {
71
72 private ExecutorService executorService;
73
74
75
76
77 public Builder() {
78
79 }
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103 @Override
104 public ReadAheadInputStream get() throws IOException {
105 return new ReadAheadInputStream(getInputStream(), getBufferSize(), executorService != null ? executorService : newExecutorService(),
106 executorService == null);
107 }
108
109
110
111
112
113
114
115 public Builder setExecutorService(final ExecutorService executorService) {
116 this.executorService = executorService;
117 return this;
118 }
119
120 }
121
122 private static final ThreadLocal<byte[]> BYTE_ARRAY_1 = ThreadLocal.withInitial(() -> new byte[1]);
123
124
125
126
127
128
129
130 public static Builder builder() {
131 return new Builder();
132 }
133
134
135
136
137
138
139
140 private static Thread newDaemonThread(final Runnable r) {
141 final Thread thread = new Thread(r, "commons-io-read-ahead");
142 thread.setDaemon(true);
143 return thread;
144 }
145
146
147
148
149
150
151 private static ExecutorService newExecutorService() {
152 return Executors.newSingleThreadExecutor(ReadAheadInputStream::newDaemonThread);
153 }
154
155 private final ReentrantLock stateChangeLock = new ReentrantLock();
156
157
158 private ByteBuffer activeBuffer;
159
160
161 private ByteBuffer readAheadBuffer;
162
163
164 private boolean endOfStream;
165
166
167
168 private boolean readInProgress;
169
170
171
172 private boolean readAborted;
173
174
175 private Throwable readException;
176
177
178
179 private boolean isClosed;
180
181
182
183
184 private boolean isUnderlyingInputStreamBeingClosed;
185
186
187
188 private boolean isReading;
189
190
191 private final AtomicBoolean isWaiting = new AtomicBoolean();
192
193 private final ExecutorService executorService;
194
195 private final boolean shutdownExecutorService;
196
197 private final Condition asyncReadComplete = stateChangeLock.newCondition();
198
199
200
201
202
203
204
205
206 @Deprecated
207 public ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes) {
208 this(inputStream, bufferSizeInBytes, newExecutorService(), true);
209 }
210
211
212
213
214
215
216
217
218
219 @Deprecated
220 public ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes, final ExecutorService executorService) {
221 this(inputStream, bufferSizeInBytes, executorService, false);
222 }
223
224
225
226
227
228
229
230
231
232 private ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes, final ExecutorService executorService,
233 final boolean shutdownExecutorService) {
234 super(Objects.requireNonNull(inputStream, "inputStream"));
235 if (bufferSizeInBytes <= 0) {
236 throw new IllegalArgumentException("bufferSizeInBytes should be greater than 0, but the value is " + bufferSizeInBytes);
237 }
238 this.executorService = Objects.requireNonNull(executorService, "executorService");
239 this.shutdownExecutorService = shutdownExecutorService;
240 this.activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
241 this.readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
242 this.activeBuffer.flip();
243 this.readAheadBuffer.flip();
244 }
245
246 @Override
247 public int available() throws IOException {
248 stateChangeLock.lock();
249
250 try {
251 return (int) Math.min(Integer.MAX_VALUE, (long) activeBuffer.remaining() + readAheadBuffer.remaining());
252 } finally {
253 stateChangeLock.unlock();
254 }
255 }
256
257 private void checkReadException() throws IOException {
258 if (readAborted) {
259 if (readException instanceof IOException) {
260 throw (IOException) readException;
261 }
262 throw new IOException(readException);
263 }
264 }
265
266 @Override
267 public void close() throws IOException {
268 boolean isSafeToCloseUnderlyingInputStream = false;
269 stateChangeLock.lock();
270 try {
271 if (isClosed) {
272 return;
273 }
274 isClosed = true;
275 if (!isReading) {
276
277 isSafeToCloseUnderlyingInputStream = true;
278
279 isUnderlyingInputStreamBeingClosed = true;
280 }
281 } finally {
282 stateChangeLock.unlock();
283 }
284
285 if (shutdownExecutorService) {
286 try {
287 executorService.shutdownNow();
288 executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
289 } catch (final InterruptedException e) {
290 final InterruptedIOException iio = new InterruptedIOException(e.getMessage());
291 iio.initCause(e);
292 throw iio;
293 } finally {
294 if (isSafeToCloseUnderlyingInputStream) {
295 super.close();
296 }
297 }
298 }
299 }
300
301 private void closeUnderlyingInputStreamIfNecessary() {
302 boolean needToCloseUnderlyingInputStream = false;
303 stateChangeLock.lock();
304 try {
305 isReading = false;
306 if (isClosed && !isUnderlyingInputStreamBeingClosed) {
307
308 needToCloseUnderlyingInputStream = true;
309 }
310 } finally {
311 stateChangeLock.unlock();
312 }
313 if (needToCloseUnderlyingInputStream) {
314 try {
315 super.close();
316 } catch (final IOException ignored) {
317
318 }
319 }
320 }
321
322 private boolean isEndOfStream() {
323 return !activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream;
324 }
325
326 @Override
327 public int read() throws IOException {
328 if (activeBuffer.hasRemaining()) {
329
330 return activeBuffer.get() & 0xFF;
331 }
332 final byte[] oneByteArray = BYTE_ARRAY_1.get();
333 oneByteArray[0] = 0;
334 return read(oneByteArray, 0, 1) == EOF ? EOF : oneByteArray[0] & 0xFF;
335 }
336
337 @Override
338 public int read(final byte[] b, final int offset, int len) throws IOException {
339 if (offset < 0 || len < 0 || len > b.length - offset) {
340 throw new IndexOutOfBoundsException();
341 }
342 if (len == 0) {
343 return 0;
344 }
345
346 if (!activeBuffer.hasRemaining()) {
347
348 stateChangeLock.lock();
349 try {
350 waitForAsyncReadComplete();
351 if (!readAheadBuffer.hasRemaining()) {
352
353 readAsync();
354 waitForAsyncReadComplete();
355 if (isEndOfStream()) {
356 return EOF;
357 }
358 }
359
360 swapBuffers();
361
362 readAsync();
363 } finally {
364 stateChangeLock.unlock();
365 }
366 }
367 len = Math.min(len, activeBuffer.remaining());
368 activeBuffer.get(b, offset, len);
369
370 return len;
371 }
372
373
374
375
376
377
378 private void readAsync() throws IOException {
379 stateChangeLock.lock();
380 final byte[] arr;
381 try {
382 arr = readAheadBuffer.array();
383 if (endOfStream || readInProgress) {
384 return;
385 }
386 checkReadException();
387 readAheadBuffer.position(0);
388 readAheadBuffer.flip();
389 readInProgress = true;
390 } finally {
391 stateChangeLock.unlock();
392 }
393 executorService.execute(() -> {
394 stateChangeLock.lock();
395 try {
396 if (isClosed) {
397 readInProgress = false;
398 return;
399 }
400
401
402 isReading = true;
403 } finally {
404 stateChangeLock.unlock();
405 }
406
407
408
409
410
411
412
413
414
415
416 int read = 0;
417 int off = 0;
418 int len = arr.length;
419 Throwable exception = null;
420 try {
421
422
423 do {
424 read = in.read(arr, off, len);
425 if (read <= 0) {
426 break;
427 }
428 off += read;
429 len -= read;
430 } while (len > 0 && !isWaiting.get());
431 } catch (final Throwable ex) {
432 exception = ex;
433 if (ex instanceof Error) {
434
435
436 throw (Error) ex;
437 }
438 } finally {
439 stateChangeLock.lock();
440 try {
441 readAheadBuffer.limit(off);
442 if (read < 0 || exception instanceof EOFException) {
443 endOfStream = true;
444 } else if (exception != null) {
445 readAborted = true;
446 readException = exception;
447 }
448 readInProgress = false;
449 signalAsyncReadComplete();
450 } finally {
451 stateChangeLock.unlock();
452 }
453 closeUnderlyingInputStreamIfNecessary();
454 }
455 });
456 }
457
458 private void signalAsyncReadComplete() {
459 stateChangeLock.lock();
460 try {
461 asyncReadComplete.signalAll();
462 } finally {
463 stateChangeLock.unlock();
464 }
465 }
466
467 @Override
468 public long skip(final long n) throws IOException {
469 if (n <= 0L) {
470 return 0L;
471 }
472 if (n <= activeBuffer.remaining()) {
473
474 activeBuffer.position((int) n + activeBuffer.position());
475 return n;
476 }
477 stateChangeLock.lock();
478 final long skipped;
479 try {
480 skipped = skipInternal(n);
481 } finally {
482 stateChangeLock.unlock();
483 }
484 return skipped;
485 }
486
487
488
489
490
491
492
493
494
495 private long skipInternal(final long n) throws IOException {
496 if (!stateChangeLock.isLocked()) {
497 throw new IllegalStateException("Expected stateChangeLock to be locked");
498 }
499 waitForAsyncReadComplete();
500 if (isEndOfStream()) {
501 return 0;
502 }
503 if (available() >= n) {
504
505 int toSkip = (int) n;
506
507 toSkip -= activeBuffer.remaining();
508 if (toSkip <= 0) {
509 throw new IllegalStateException("Expected toSkip > 0, actual: " + toSkip);
510 }
511 activeBuffer.position(0);
512 activeBuffer.flip();
513 readAheadBuffer.position(toSkip + readAheadBuffer.position());
514 swapBuffers();
515
516 readAsync();
517 return n;
518 }
519 final int skippedBytes = available();
520 final long toSkip = n - skippedBytes;
521 activeBuffer.position(0);
522 activeBuffer.flip();
523 readAheadBuffer.position(0);
524 readAheadBuffer.flip();
525 final long skippedFromInputStream = in.skip(toSkip);
526 readAsync();
527 return skippedBytes + skippedFromInputStream;
528 }
529
530
531
532
533 private void swapBuffers() {
534 final ByteBuffer temp = activeBuffer;
535 activeBuffer = readAheadBuffer;
536 readAheadBuffer = temp;
537 }
538
539 private void waitForAsyncReadComplete() throws IOException {
540 stateChangeLock.lock();
541 try {
542 isWaiting.set(true);
543
544
545 while (readInProgress) {
546 asyncReadComplete.await();
547 }
548 } catch (final InterruptedException e) {
549 final InterruptedIOException iio = new InterruptedIOException(e.getMessage());
550 iio.initCause(e);
551 throw iio;
552 } finally {
553 try {
554 isWaiting.set(false);
555 } finally {
556 stateChangeLock.unlock();
557 }
558 }
559 checkReadException();
560 }
561 }