1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.apache.commons.io.input;
15
16 import static org.apache.commons.io.IOUtils.EOF;
17
18
19 import java.io.EOFException;
20 import java.io.FilterInputStream;
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.InterruptedIOException;
24 import java.nio.ByteBuffer;
25 import java.util.Objects;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.AtomicBoolean;
30 import java.util.concurrent.locks.Condition;
31 import java.util.concurrent.locks.ReentrantLock;
32
33 import org.apache.commons.io.build.AbstractStreamBuilder;
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50 public class ReadAheadInputStream extends FilterInputStream {
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70 public static class Builder extends AbstractStreamBuilder<ReadAheadInputStream, Builder> {
71
72 private ExecutorService executorService;
73
74
75
76
77 public Builder() {
78
79 }
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103 @Override
104 public ReadAheadInputStream get() throws IOException {
105 return new ReadAheadInputStream(this);
106 }
107
108
109
110
111
112
113
114 public Builder setExecutorService(final ExecutorService executorService) {
115 this.executorService = executorService;
116 return this;
117 }
118
119 }
120
121 private static final ThreadLocal<byte[]> BYTE_ARRAY_1 = ThreadLocal.withInitial(() -> new byte[1]);
122
123
124
125
126
127
128
129 public static Builder builder() {
130 return new Builder();
131 }
132
133
134
135
136
137
138
139 private static Thread newDaemonThread(final Runnable r) {
140 final Thread thread = new Thread(r, "commons-io-read-ahead");
141 thread.setDaemon(true);
142 return thread;
143 }
144
145
146
147
148
149
150 private static ExecutorService newExecutorService() {
151 return Executors.newSingleThreadExecutor(ReadAheadInputStream::newDaemonThread);
152 }
153
154 private final ReentrantLock stateChangeLock = new ReentrantLock();
155
156
157 private ByteBuffer activeBuffer;
158
159
160 private ByteBuffer readAheadBuffer;
161
162
163 private boolean endOfStream;
164
165
166
167 private boolean readInProgress;
168
169
170
171 private boolean readAborted;
172
173
174 private Throwable readException;
175
176
177
178 private boolean isClosed;
179
180
181
182
183 private boolean isUnderlyingInputStreamBeingClosed;
184
185
186
187 private boolean isReading;
188
189
190 private final AtomicBoolean isWaiting = new AtomicBoolean();
191
192 private final ExecutorService executorService;
193
194 private final boolean shutdownExecutorService;
195
196 private final Condition asyncReadComplete = stateChangeLock.newCondition();
197
198 @SuppressWarnings("resource")
199 private ReadAheadInputStream(final Builder builder) throws IOException {
200 this(builder.getInputStream(), builder.getBufferSize(), builder.executorService != null ? builder.executorService : newExecutorService(),
201 builder.executorService == null);
202 }
203
204
205
206
207
208
209
210
211 @Deprecated
212 public ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes) {
213 this(inputStream, bufferSizeInBytes, newExecutorService(), true);
214 }
215
216
217
218
219
220
221
222
223
224 @Deprecated
225 public ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes, final ExecutorService executorService) {
226 this(inputStream, bufferSizeInBytes, executorService, false);
227 }
228
229
230
231
232
233
234
235
236
237 private ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes, final ExecutorService executorService,
238 final boolean shutdownExecutorService) {
239 super(Objects.requireNonNull(inputStream, "inputStream"));
240 if (bufferSizeInBytes <= 0) {
241 throw new IllegalArgumentException("bufferSizeInBytes should be greater than 0, but the value is " + bufferSizeInBytes);
242 }
243 this.executorService = Objects.requireNonNull(executorService, "executorService");
244 this.shutdownExecutorService = shutdownExecutorService;
245 this.activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
246 this.readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
247 this.activeBuffer.flip();
248 this.readAheadBuffer.flip();
249 }
250
251 @Override
252 public int available() throws IOException {
253 stateChangeLock.lock();
254
255 try {
256 return (int) Math.min(Integer.MAX_VALUE, (long) activeBuffer.remaining() + readAheadBuffer.remaining());
257 } finally {
258 stateChangeLock.unlock();
259 }
260 }
261
262 private void checkReadException() throws IOException {
263 if (readAborted) {
264 if (readException instanceof IOException) {
265 throw (IOException) readException;
266 }
267 throw new IOException(readException);
268 }
269 }
270
271 @Override
272 public void close() throws IOException {
273 boolean isSafeToCloseUnderlyingInputStream = false;
274 stateChangeLock.lock();
275 try {
276 if (isClosed) {
277 return;
278 }
279 isClosed = true;
280 if (!isReading) {
281
282 isSafeToCloseUnderlyingInputStream = true;
283
284 isUnderlyingInputStreamBeingClosed = true;
285 }
286 } finally {
287 stateChangeLock.unlock();
288 }
289
290 if (shutdownExecutorService) {
291 try {
292 executorService.shutdownNow();
293 executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
294 } catch (final InterruptedException e) {
295 final InterruptedIOException iio = new InterruptedIOException(e.getMessage());
296 iio.initCause(e);
297 throw iio;
298 } finally {
299 if (isSafeToCloseUnderlyingInputStream) {
300 super.close();
301 }
302 }
303 }
304 }
305
306 private void closeUnderlyingInputStreamIfNecessary() {
307 boolean needToCloseUnderlyingInputStream = false;
308 stateChangeLock.lock();
309 try {
310 isReading = false;
311 if (isClosed && !isUnderlyingInputStreamBeingClosed) {
312
313 needToCloseUnderlyingInputStream = true;
314 }
315 } finally {
316 stateChangeLock.unlock();
317 }
318 if (needToCloseUnderlyingInputStream) {
319 try {
320 super.close();
321 } catch (final IOException ignored) {
322
323 }
324 }
325 }
326
327 private boolean isEndOfStream() {
328 return !activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream;
329 }
330
331 @Override
332 public int read() throws IOException {
333 if (activeBuffer.hasRemaining()) {
334
335 return activeBuffer.get() & 0xFF;
336 }
337 final byte[] oneByteArray = BYTE_ARRAY_1.get();
338 oneByteArray[0] = 0;
339 return read(oneByteArray, 0, 1) == EOF ? EOF : oneByteArray[0] & 0xFF;
340 }
341
342 @Override
343 public int read(final byte[] b, final int offset, int len) throws IOException {
344 if (offset < 0 || len < 0 || len > b.length - offset) {
345 throw new IndexOutOfBoundsException();
346 }
347 if (len == 0) {
348 return 0;
349 }
350
351 if (!activeBuffer.hasRemaining()) {
352
353 stateChangeLock.lock();
354 try {
355 waitForAsyncReadComplete();
356 if (!readAheadBuffer.hasRemaining()) {
357
358 readAsync();
359 waitForAsyncReadComplete();
360 if (isEndOfStream()) {
361 return EOF;
362 }
363 }
364
365 swapBuffers();
366
367 readAsync();
368 } finally {
369 stateChangeLock.unlock();
370 }
371 }
372 len = Math.min(len, activeBuffer.remaining());
373 activeBuffer.get(b, offset, len);
374
375 return len;
376 }
377
378
379
380
381
382
383 private void readAsync() throws IOException {
384 stateChangeLock.lock();
385 final byte[] arr;
386 try {
387 arr = readAheadBuffer.array();
388 if (endOfStream || readInProgress) {
389 return;
390 }
391 checkReadException();
392 readAheadBuffer.position(0);
393 readAheadBuffer.flip();
394 readInProgress = true;
395 } finally {
396 stateChangeLock.unlock();
397 }
398 executorService.execute(() -> {
399 stateChangeLock.lock();
400 try {
401 if (isClosed) {
402 readInProgress = false;
403 return;
404 }
405
406
407 isReading = true;
408 } finally {
409 stateChangeLock.unlock();
410 }
411
412
413
414
415
416
417
418
419
420
421 int read = 0;
422 int off = 0;
423 int len = arr.length;
424 Throwable exception = null;
425 try {
426
427
428 do {
429 read = in.read(arr, off, len);
430 if (read <= 0) {
431 break;
432 }
433 off += read;
434 len -= read;
435 } while (len > 0 && !isWaiting.get());
436 } catch (final Throwable ex) {
437 exception = ex;
438 if (ex instanceof Error) {
439
440
441 throw (Error) ex;
442 }
443 } finally {
444 stateChangeLock.lock();
445 try {
446 readAheadBuffer.limit(off);
447 if (read < 0 || exception instanceof EOFException) {
448 endOfStream = true;
449 } else if (exception != null) {
450 readAborted = true;
451 readException = exception;
452 }
453 readInProgress = false;
454 signalAsyncReadComplete();
455 } finally {
456 stateChangeLock.unlock();
457 }
458 closeUnderlyingInputStreamIfNecessary();
459 }
460 });
461 }
462
463 private void signalAsyncReadComplete() {
464 stateChangeLock.lock();
465 try {
466 asyncReadComplete.signalAll();
467 } finally {
468 stateChangeLock.unlock();
469 }
470 }
471
472 @Override
473 public long skip(final long n) throws IOException {
474 if (n <= 0L) {
475 return 0L;
476 }
477 if (n <= activeBuffer.remaining()) {
478
479 activeBuffer.position((int) n + activeBuffer.position());
480 return n;
481 }
482 stateChangeLock.lock();
483 final long skipped;
484 try {
485 skipped = skipInternal(n);
486 } finally {
487 stateChangeLock.unlock();
488 }
489 return skipped;
490 }
491
492
493
494
495
496
497
498
499
500 private long skipInternal(final long n) throws IOException {
501 if (!stateChangeLock.isLocked()) {
502 throw new IllegalStateException("Expected stateChangeLock to be locked");
503 }
504 waitForAsyncReadComplete();
505 if (isEndOfStream()) {
506 return 0;
507 }
508 if (available() >= n) {
509
510 int toSkip = (int) n;
511
512 toSkip -= activeBuffer.remaining();
513 if (toSkip <= 0) {
514 throw new IllegalStateException("Expected toSkip > 0, actual: " + toSkip);
515 }
516 activeBuffer.position(0);
517 activeBuffer.flip();
518 readAheadBuffer.position(toSkip + readAheadBuffer.position());
519 swapBuffers();
520
521 readAsync();
522 return n;
523 }
524 final int skippedBytes = available();
525 final long toSkip = n - skippedBytes;
526 activeBuffer.position(0);
527 activeBuffer.flip();
528 readAheadBuffer.position(0);
529 readAheadBuffer.flip();
530 final long skippedFromInputStream = in.skip(toSkip);
531 readAsync();
532 return skippedBytes + skippedFromInputStream;
533 }
534
535
536
537
538 private void swapBuffers() {
539 final ByteBuffer temp = activeBuffer;
540 activeBuffer = readAheadBuffer;
541 readAheadBuffer = temp;
542 }
543
544 private void waitForAsyncReadComplete() throws IOException {
545 stateChangeLock.lock();
546 try {
547 isWaiting.set(true);
548
549
550 while (readInProgress) {
551 asyncReadComplete.await();
552 }
553 } catch (final InterruptedException e) {
554 final InterruptedIOException iio = new InterruptedIOException(e.getMessage());
555 iio.initCause(e);
556 throw iio;
557 } finally {
558 try {
559 isWaiting.set(false);
560 } finally {
561 stateChangeLock.unlock();
562 }
563 }
564 checkReadException();
565 }
566 }