View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *      https://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.commons.io.input;
18  
19  import static java.nio.charset.StandardCharsets.UTF_8;
20  import static org.apache.commons.lang3.ArrayUtils.EMPTY_BYTE_ARRAY;
21  import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
22  import static org.junit.jupiter.api.Assertions.assertEquals;
23  import static org.junit.jupiter.api.Assertions.assertThrows;
24  import static org.junit.jupiter.api.Assertions.assertTimeout;
25  import static org.junit.jupiter.api.Assertions.assertTrue;
26  import static org.junit.jupiter.api.Assumptions.assumeFalse;
27  import static org.junit.jupiter.api.DynamicTest.dynamicTest;
28  
29  import java.io.BufferedInputStream;
30  import java.io.BufferedOutputStream;
31  import java.io.BufferedReader;
32  import java.io.ByteArrayOutputStream;
33  import java.io.IOException;
34  import java.io.InputStream;
35  import java.io.InputStreamReader;
36  import java.io.OutputStream;
37  import java.nio.charset.StandardCharsets;
38  import java.nio.file.Files;
39  import java.nio.file.Path;
40  import java.time.Duration;
41  import java.util.concurrent.BlockingQueue;
42  import java.util.concurrent.CompletableFuture;
43  import java.util.concurrent.CountDownLatch;
44  import java.util.concurrent.LinkedBlockingQueue;
45  import java.util.concurrent.TimeUnit;
46  import java.util.concurrent.atomic.AtomicBoolean;
47  import java.util.stream.Stream;
48  
49  import org.apache.commons.io.IOUtils;
50  import org.apache.commons.io.output.QueueOutputStream;
51  import org.apache.commons.io.output.QueueOutputStreamTest;
52  import org.apache.commons.lang3.StringUtils;
53  import org.junit.jupiter.api.DynamicTest;
54  import org.junit.jupiter.api.Test;
55  import org.junit.jupiter.api.TestFactory;
56  import org.junit.jupiter.params.ParameterizedTest;
57  import org.junit.jupiter.params.provider.Arguments;
58  import org.junit.jupiter.params.provider.MethodSource;
59  
60  import com.google.common.base.Stopwatch;
61  
62  /**
63   * Test {@link QueueInputStream}.
64   *
65   * @see QueueOutputStreamTest
66   */
67  public class QueueInputStreamTest {
68  
69      public static Stream<Arguments> inputData() {
70          // @formatter:off
71          return Stream.of(Arguments.of(""),
72                  Arguments.of("1"),
73                  Arguments.of("12"),
74                  Arguments.of("1234"),
75                  Arguments.of("12345678"),
76                  Arguments.of(StringUtils.repeat("A", 4095)),
77                  Arguments.of(StringUtils.repeat("A", 4096)),
78                  Arguments.of(StringUtils.repeat("A", 4097)),
79                  Arguments.of(StringUtils.repeat("A", 8191)),
80                  Arguments.of(StringUtils.repeat("A", 8192)),
81                  Arguments.of(StringUtils.repeat("A", 8193)),
82                  Arguments.of(StringUtils.repeat("A", 8192 * 4)));
83          // @formatter:on
84      }
85  
86      @TestFactory
87      public DynamicTest[] bulkReadErrorHandlingTests() {
88          final QueueInputStream queueInputStream = new QueueInputStream();
89          return new DynamicTest[] {
90                  dynamicTest("Offset too big", () ->
91                          assertThrows(IndexOutOfBoundsException.class, () ->
92                                  queueInputStream.read(EMPTY_BYTE_ARRAY, 1, 0))),
93  
94                  dynamicTest("Offset negative", () ->
95                          assertThrows(IndexOutOfBoundsException.class, () ->
96                                  queueInputStream.read(EMPTY_BYTE_ARRAY, -1, 0))),
97  
98                  dynamicTest("Length too big", () ->
99                          assertThrows(IndexOutOfBoundsException.class, () ->
100                                 queueInputStream.read(EMPTY_BYTE_ARRAY, 0, 1))),
101 
102                 dynamicTest("Length negative", () ->
103                         assertThrows(IndexOutOfBoundsException.class, () ->
104                                 queueInputStream.read(EMPTY_BYTE_ARRAY, 0, -1))),
105         };
106     }
107 
108     private int defaultBufferSize() {
109         return 8192;
110     }
111 
112     private void doTestReadLineByLine(final String inputData, final InputStream inputStream, final OutputStream outputStream) throws IOException {
113         final String[] lines = inputData.split("\n");
114         try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, UTF_8))) {
115             for (final String line : lines) {
116                 outputStream.write(line.getBytes(UTF_8));
117                 outputStream.write('\n');
118 
119                 final String actualLine = reader.readLine();
120                 assertEquals(line, actualLine);
121             }
122         }
123     }
124 
125     private String readUnbuffered(final InputStream inputStream) throws IOException {
126         return readUnbuffered(inputStream, Integer.MAX_VALUE);
127     }
128 
129     private String readUnbuffered(final InputStream inputStream, final int maxBytes) throws IOException {
130         if (maxBytes == 0) {
131             return "";
132         }
133 
134         final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
135         int n = -1;
136         while ((n = inputStream.read()) != -1) {
137             byteArrayOutputStream.write(n);
138             if (byteArrayOutputStream.size() >= maxBytes) {
139                 break;
140             }
141         }
142         return byteArrayOutputStream.toString(StandardCharsets.UTF_8.name());
143     }
144 
145     @SuppressWarnings("resource")
146     @ParameterizedTest(name = "inputData={0}")
147     @MethodSource("inputData")
148     void testAvailableAfterClose(final String inputData) throws IOException {
149         final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
150         final InputStream shadow;
151         try (InputStream inputStream = new QueueInputStream(queue)) {
152             shadow = inputStream;
153         }
154         assertEquals(0, shadow.available());
155     }
156 
157     @ParameterizedTest(name = "inputData={0}")
158     @MethodSource("inputData")
159     void testAvailableAfterOpen(final String inputData) throws IOException {
160         final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
161         try (InputStream inputStream = new QueueInputStream(queue)) {
162             // Always 0 because read() blocks.
163             assertEquals(0, inputStream.available());
164             IOUtils.toString(inputStream, StandardCharsets.UTF_8);
165             assertEquals(0, inputStream.available());
166         }
167     }
168 
169     @ParameterizedTest(name = "inputData={0}")
170     @MethodSource("inputData")
171     void testBufferedReads(final String inputData) throws IOException {
172         final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
173         try (BufferedInputStream inputStream = new BufferedInputStream(new QueueInputStream(queue));
174              QueueOutputStream outputStream = new QueueOutputStream(queue)) {
175             outputStream.write(inputData.getBytes(StandardCharsets.UTF_8));
176             final String actualData = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
177             assertEquals(inputData, actualData);
178         }
179     }
180 
181     @ParameterizedTest(name = "inputData={0}")
182     @MethodSource("inputData")
183     void testBufferedReadWrite(final String inputData) throws IOException {
184         final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
185         try (BufferedInputStream inputStream = new BufferedInputStream(new QueueInputStream(queue));
186                 BufferedOutputStream outputStream = new BufferedOutputStream(new QueueOutputStream(queue), defaultBufferSize())) {
187             outputStream.write(inputData.getBytes(StandardCharsets.UTF_8));
188             outputStream.flush();
189             final String dataCopy = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
190             assertEquals(inputData, dataCopy);
191         }
192     }
193 
194     @ParameterizedTest(name = "inputData={0}")
195     @MethodSource("inputData")
196     void testBufferedWrites(final String inputData) throws IOException {
197         final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
198         try (QueueInputStream inputStream = new QueueInputStream(queue);
199                 BufferedOutputStream outputStream = new BufferedOutputStream(new QueueOutputStream(queue), defaultBufferSize())) {
200             outputStream.write(inputData.getBytes(StandardCharsets.UTF_8));
201             outputStream.flush();
202             final String actualData = readUnbuffered(inputStream);
203             assertEquals(inputData, actualData);
204         }
205     }
206 
207     @ParameterizedTest(name = "inputData={0}")
208     @MethodSource("inputData")
209     void testBulkReadWaiting(final String inputData) throws IOException {
210         assumeFalse(inputData.isEmpty());
211 
212         final CountDownLatch onPollLatch = new CountDownLatch(1);
213         final CountDownLatch afterWriteLatch = new CountDownLatch(1);
214         final LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>() {
215             @Override
216             public Integer poll(final long timeout, final TimeUnit unit) throws InterruptedException {
217                 onPollLatch.countDown();
218                 afterWriteLatch.await(1, TimeUnit.HOURS);
219                 return super.poll(timeout, unit);
220             }
221         };
222 
223         // Simulate scenario where there is not data immediately available when bulk reading and QueueInputStream has to
224         // wait.
225         try (QueueInputStream queueInputStream = QueueInputStream.builder()
226                 .setBlockingQueue(queue)
227                 .setTimeout(Duration.ofHours(1))
228                 .get()) {
229             final QueueOutputStream queueOutputStream = queueInputStream.newQueueOutputStream();
230             final CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
231                 try {
232                     onPollLatch.await(1, TimeUnit.HOURS);
233                     queueOutputStream.write(inputData.getBytes(UTF_8));
234                     afterWriteLatch.countDown();
235                 } catch (final Exception e) {
236                     throw new RuntimeException(e);
237                 }
238             });
239 
240             final byte[] data = new byte[inputData.length()];
241             final int read = queueInputStream.read(data, 0, data.length);
242             assertEquals(inputData.length(), read);
243             final String outputData = new String(data, 0, read, StandardCharsets.UTF_8);
244             assertEquals(inputData, outputData);
245             assertDoesNotThrow(() -> future.get());
246         }
247     }
248 
249     @Test
250     void testBulkReadZeroLength() {
251         final QueueInputStream queueInputStream = new QueueInputStream();
252         final int read = queueInputStream.read(EMPTY_BYTE_ARRAY, 0, 0);
253         assertEquals(0, read);
254     }
255 
256     @Test
257     void testInvalidArguments() {
258         assertThrows(IllegalArgumentException.class, () -> QueueInputStream.builder().setTimeout(Duration.ofMillis(-1)).get(), "waitTime must not be negative");
259     }
260 
261     @SuppressWarnings("resource")
262     @ParameterizedTest(name = "inputData={0}")
263     @MethodSource("inputData")
264     void testReadAfterClose(final String inputData) throws IOException {
265         final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
266         final InputStream shadow;
267         try (InputStream inputStream = new QueueInputStream(queue)) {
268             shadow = inputStream;
269         }
270         assertEquals(IOUtils.EOF, shadow.read());
271     }
272 
273     @ParameterizedTest(name = "inputData={0}")
274     @MethodSource("inputData")
275     void testReadLineByLineFile(final String inputData) throws IOException {
276         final Path tempFile = Files.createTempFile(getClass().getSimpleName(), ".txt");
277         try (InputStream inputStream = Files.newInputStream(tempFile);
278              OutputStream outputStream = Files.newOutputStream(tempFile)) {
279 
280             doTestReadLineByLine(inputData, inputStream, outputStream);
281         } finally {
282             Files.delete(tempFile);
283         }
284     }
285 
286     @ParameterizedTest(name = "inputData={0}")
287     @MethodSource("inputData")
288     void testReadLineByLineQueue(final String inputData) throws IOException {
289         final String[] lines = inputData.split("\n");
290         final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
291         try (QueueInputStream inputStream = QueueInputStream.builder()
292                 .setBlockingQueue(queue)
293                 .setTimeout(Duration.ofHours(1))
294                 .get();
295              QueueOutputStream outputStream = inputStream.newQueueOutputStream()) {
296 
297             doTestReadLineByLine(inputData, inputStream, outputStream);
298         }
299     }
300 
301     @Test
302     void testResetArguments() throws IOException {
303         try (QueueInputStream queueInputStream = QueueInputStream.builder().setTimeout(null).get()) {
304             assertEquals(Duration.ZERO, queueInputStream.getTimeout());
305             assertEquals(0, queueInputStream.getBlockingQueue().size());
306         }
307         try (QueueInputStream queueInputStream = QueueInputStream.builder().setBlockingQueue(null).get()) {
308             assertEquals(Duration.ZERO, queueInputStream.getTimeout());
309             assertEquals(0, queueInputStream.getBlockingQueue().size());
310         }
311     }
312 
313     @Test
314 
315     /** If read is interrupted while waiting, then exception is thrown */
316     void testTimeoutInterrupted() throws Exception {
317         try (QueueInputStream inputStream = QueueInputStream.builder().setTimeout(Duration.ofMinutes(2)).get();
318                 QueueOutputStream outputStream = inputStream.newQueueOutputStream()) {
319 
320             // read in a background thread
321             final AtomicBoolean result = new AtomicBoolean();
322             final CountDownLatch latch = new CountDownLatch(1);
323             final Thread thread = new Thread(() -> {
324                 // when thread is interrupted, verify ...
325                 assertThrows(IllegalStateException.class, () -> readUnbuffered(inputStream, 3));
326                 assertTrue(Thread.currentThread().isInterrupted());
327                 result.set(true);
328                 latch.countDown();
329             }, "commons-io-QueueInputStreamTest-testTimeoutInterrupted");
330             thread.setDaemon(true);
331             thread.start();
332 
333             // interrupt and check that verification completed
334             thread.interrupt();
335             latch.await(500, TimeUnit.MILLISECONDS);
336             assertTrue(result.get());
337         }
338     }
339 
340     @Test
341 
342     /** If data is not available in queue, then read will wait until wait time elapses */
343     void testTimeoutUnavailableData() throws IOException {
344         try (QueueInputStream inputStream = QueueInputStream.builder().setTimeout(Duration.ofMillis(500)).get();
345                 QueueOutputStream outputStream = inputStream.newQueueOutputStream()) {
346             final Stopwatch stopwatch = Stopwatch.createStarted();
347             final String actualData = assertTimeout(Duration.ofSeconds(1), () -> readUnbuffered(inputStream, 3));
348             stopwatch.stop();
349             assertEquals("", actualData);
350 
351             assertTrue(stopwatch.elapsed(TimeUnit.MILLISECONDS) >= 500, () -> stopwatch.toString());
352         }
353     }
354 
355     @ParameterizedTest(name = "inputData={0}")
356     @MethodSource("inputData")
357     void testUnbufferedReadWrite(final String inputData) throws IOException {
358         try (QueueInputStream inputStream = new QueueInputStream();
359                 QueueOutputStream outputStream = inputStream.newQueueOutputStream()) {
360             writeUnbuffered(outputStream, inputData);
361             final String actualData = readUnbuffered(inputStream);
362             assertEquals(inputData, actualData);
363         }
364     }
365 
366     @ParameterizedTest(name = "inputData={0}")
367     @MethodSource("inputData")
368     void testUnbufferedReadWriteWithTimeout(final String inputData) throws IOException {
369         final Duration timeout = Duration.ofMinutes(2);
370         try (QueueInputStream inputStream = QueueInputStream.builder().setTimeout(timeout).get();
371                 QueueOutputStream outputStream = inputStream.newQueueOutputStream()) {
372             assertEquals(timeout, inputStream.getTimeout());
373             writeUnbuffered(outputStream, inputData);
374             final String actualData = assertTimeout(Duration.ofSeconds(1), () -> readUnbuffered(inputStream, inputData.length()));
375             assertEquals(inputData, actualData);
376         }
377     }
378 
379     private void writeUnbuffered(final QueueOutputStream outputStream, final String inputData) throws IOException {
380         final byte[] bytes = inputData.getBytes(StandardCharsets.UTF_8);
381         outputStream.write(bytes, 0, bytes.length);
382     }
383 }