View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *      https://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.commons.io.input;
18  
19  import static java.nio.charset.StandardCharsets.UTF_8;
20  import static org.apache.commons.lang3.ArrayUtils.EMPTY_BYTE_ARRAY;
21  import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
22  import static org.junit.jupiter.api.Assertions.assertEquals;
23  import static org.junit.jupiter.api.Assertions.assertThrows;
24  import static org.junit.jupiter.api.Assertions.assertTimeout;
25  import static org.junit.jupiter.api.Assertions.assertTrue;
26  import static org.junit.jupiter.api.Assumptions.assumeFalse;
27  import static org.junit.jupiter.api.DynamicTest.dynamicTest;
28  
29  import java.io.BufferedInputStream;
30  import java.io.BufferedOutputStream;
31  import java.io.BufferedReader;
32  import java.io.ByteArrayOutputStream;
33  import java.io.IOException;
34  import java.io.InputStream;
35  import java.io.InputStreamReader;
36  import java.io.OutputStream;
37  import java.nio.charset.StandardCharsets;
38  import java.nio.file.Files;
39  import java.nio.file.Path;
40  import java.time.Duration;
41  import java.util.concurrent.BlockingQueue;
42  import java.util.concurrent.CompletableFuture;
43  import java.util.concurrent.CountDownLatch;
44  import java.util.concurrent.LinkedBlockingQueue;
45  import java.util.concurrent.TimeUnit;
46  import java.util.concurrent.atomic.AtomicBoolean;
47  import java.util.stream.Stream;
48  
49  import org.apache.commons.io.IOUtils;
50  import org.apache.commons.io.output.QueueOutputStream;
51  import org.apache.commons.io.output.QueueOutputStreamTest;
52  import org.apache.commons.lang3.StringUtils;
53  import org.junit.jupiter.api.DisplayName;
54  import org.junit.jupiter.api.DynamicTest;
55  import org.junit.jupiter.api.Test;
56  import org.junit.jupiter.api.TestFactory;
57  import org.junit.jupiter.params.ParameterizedTest;
58  import org.junit.jupiter.params.provider.Arguments;
59  import org.junit.jupiter.params.provider.MethodSource;
60  
61  import com.google.common.base.Stopwatch;
62  
63  /**
64   * Test {@link QueueInputStream}.
65   *
66   * @see QueueOutputStreamTest
67   */
68  public class QueueInputStreamTest {
69  
70      public static Stream<Arguments> inputData() {
71          // @formatter:off
72          return Stream.of(Arguments.of(""),
73                  Arguments.of("1"),
74                  Arguments.of("12"),
75                  Arguments.of("1234"),
76                  Arguments.of("12345678"),
77                  Arguments.of(StringUtils.repeat("A", 4095)),
78                  Arguments.of(StringUtils.repeat("A", 4096)),
79                  Arguments.of(StringUtils.repeat("A", 4097)),
80                  Arguments.of(StringUtils.repeat("A", 8191)),
81                  Arguments.of(StringUtils.repeat("A", 8192)),
82                  Arguments.of(StringUtils.repeat("A", 8193)),
83                  Arguments.of(StringUtils.repeat("A", 8192 * 4)));
84          // @formatter:on
85      }
86  
87      @TestFactory
88      public DynamicTest[] bulkReadErrorHandlingTests() {
89          final QueueInputStream queueInputStream = new QueueInputStream();
90          return new DynamicTest[] {
91                  dynamicTest("Offset too big", () ->
92                          assertThrows(IndexOutOfBoundsException.class, () ->
93                                  queueInputStream.read(EMPTY_BYTE_ARRAY, 1, 0))),
94  
95                  dynamicTest("Offset negative", () ->
96                          assertThrows(IndexOutOfBoundsException.class, () ->
97                                  queueInputStream.read(EMPTY_BYTE_ARRAY, -1, 0))),
98  
99                  dynamicTest("Length too big", () ->
100                         assertThrows(IndexOutOfBoundsException.class, () ->
101                                 queueInputStream.read(EMPTY_BYTE_ARRAY, 0, 1))),
102 
103                 dynamicTest("Length negative", () ->
104                         assertThrows(IndexOutOfBoundsException.class, () ->
105                                 queueInputStream.read(EMPTY_BYTE_ARRAY, 0, -1))),
106         };
107     }
108 
109     private int defaultBufferSize() {
110         return 8192;
111     }
112 
113     private void doTestReadLineByLine(final String inputData, final InputStream inputStream, final OutputStream outputStream) throws IOException {
114         final String[] lines = inputData.split("\n");
115         try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, UTF_8))) {
116             for (final String line : lines) {
117                 outputStream.write(line.getBytes(UTF_8));
118                 outputStream.write('\n');
119 
120                 final String actualLine = reader.readLine();
121                 assertEquals(line, actualLine);
122             }
123         }
124     }
125 
126     private String readUnbuffered(final InputStream inputStream) throws IOException {
127         return readUnbuffered(inputStream, Integer.MAX_VALUE);
128     }
129 
130     private String readUnbuffered(final InputStream inputStream, final int maxBytes) throws IOException {
131         if (maxBytes == 0) {
132             return "";
133         }
134 
135         final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
136         int n = -1;
137         while ((n = inputStream.read()) != -1) {
138             byteArrayOutputStream.write(n);
139             if (byteArrayOutputStream.size() >= maxBytes) {
140                 break;
141             }
142         }
143         return byteArrayOutputStream.toString(StandardCharsets.UTF_8.name());
144     }
145 
146     @SuppressWarnings("resource")
147     @ParameterizedTest(name = "inputData={0}")
148     @MethodSource("inputData")
149     void testAvailableAfterClose(final String inputData) throws IOException {
150         final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
151         final InputStream shadow;
152         try (InputStream inputStream = new QueueInputStream(queue)) {
153             shadow = inputStream;
154         }
155         assertEquals(0, shadow.available());
156     }
157 
158     @ParameterizedTest(name = "inputData={0}")
159     @MethodSource("inputData")
160     void testAvailableAfterOpen(final String inputData) throws IOException {
161         final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
162         try (InputStream inputStream = new QueueInputStream(queue)) {
163             // Always 0 because read() blocks.
164             assertEquals(0, inputStream.available());
165             IOUtils.toString(inputStream, StandardCharsets.UTF_8);
166             assertEquals(0, inputStream.available());
167         }
168     }
169 
170     @ParameterizedTest(name = "inputData={0}")
171     @MethodSource("inputData")
172     void testBufferedReads(final String inputData) throws IOException {
173         final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
174         try (BufferedInputStream inputStream = new BufferedInputStream(new QueueInputStream(queue));
175              QueueOutputStream outputStream = new QueueOutputStream(queue)) {
176             outputStream.write(inputData.getBytes(StandardCharsets.UTF_8));
177             final String actualData = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
178             assertEquals(inputData, actualData);
179         }
180     }
181 
182     @ParameterizedTest(name = "inputData={0}")
183     @MethodSource("inputData")
184     void testBufferedReadWrite(final String inputData) throws IOException {
185         final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
186         try (BufferedInputStream inputStream = new BufferedInputStream(new QueueInputStream(queue));
187                 BufferedOutputStream outputStream = new BufferedOutputStream(new QueueOutputStream(queue), defaultBufferSize())) {
188             outputStream.write(inputData.getBytes(StandardCharsets.UTF_8));
189             outputStream.flush();
190             final String dataCopy = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
191             assertEquals(inputData, dataCopy);
192         }
193     }
194 
195     @ParameterizedTest(name = "inputData={0}")
196     @MethodSource("inputData")
197     void testBufferedWrites(final String inputData) throws IOException {
198         final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
199         try (QueueInputStream inputStream = new QueueInputStream(queue);
200                 BufferedOutputStream outputStream = new BufferedOutputStream(new QueueOutputStream(queue), defaultBufferSize())) {
201             outputStream.write(inputData.getBytes(StandardCharsets.UTF_8));
202             outputStream.flush();
203             final String actualData = readUnbuffered(inputStream);
204             assertEquals(inputData, actualData);
205         }
206     }
207 
208     @ParameterizedTest(name = "inputData={0}")
209     @MethodSource("inputData")
210     void testBulkReadWaiting(final String inputData) throws IOException {
211         assumeFalse(inputData.isEmpty());
212 
213         final CountDownLatch onPollLatch = new CountDownLatch(1);
214         final CountDownLatch afterWriteLatch = new CountDownLatch(1);
215         final LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>() {
216             @Override
217             public Integer poll(final long timeout, final TimeUnit unit) throws InterruptedException {
218                 onPollLatch.countDown();
219                 afterWriteLatch.await(1, TimeUnit.HOURS);
220                 return super.poll(timeout, unit);
221             }
222         };
223 
224         // Simulate scenario where there is not data immediately available when bulk reading and QueueInputStream has to
225         // wait.
226         try (QueueInputStream queueInputStream = QueueInputStream.builder()
227                 .setBlockingQueue(queue)
228                 .setTimeout(Duration.ofHours(1))
229                 .get()) {
230             final QueueOutputStream queueOutputStream = queueInputStream.newQueueOutputStream();
231             final CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
232                 try {
233                     onPollLatch.await(1, TimeUnit.HOURS);
234                     queueOutputStream.write(inputData.getBytes(UTF_8));
235                     afterWriteLatch.countDown();
236                 } catch (final Exception e) {
237                     throw new RuntimeException(e);
238                 }
239             });
240 
241             final byte[] data = new byte[inputData.length()];
242             final int read = queueInputStream.read(data, 0, data.length);
243             assertEquals(inputData.length(), read);
244             final String outputData = new String(data, 0, read, StandardCharsets.UTF_8);
245             assertEquals(inputData, outputData);
246             assertDoesNotThrow(() -> future.get());
247         }
248     }
249 
250     @Test
251     void testBulkReadZeroLength() {
252         final QueueInputStream queueInputStream = new QueueInputStream();
253         final int read = queueInputStream.read(EMPTY_BYTE_ARRAY, 0, 0);
254         assertEquals(0, read);
255     }
256 
257     @Test
258     void testInvalidArguments() {
259         assertThrows(IllegalArgumentException.class, () -> QueueInputStream.builder().setTimeout(Duration.ofMillis(-1)).get(), "waitTime must not be negative");
260     }
261 
262     @SuppressWarnings("resource")
263     @ParameterizedTest(name = "inputData={0}")
264     @MethodSource("inputData")
265     void testReadAfterClose(final String inputData) throws IOException {
266         final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
267         final InputStream shadow;
268         try (InputStream inputStream = new QueueInputStream(queue)) {
269             shadow = inputStream;
270         }
271         assertEquals(IOUtils.EOF, shadow.read());
272     }
273 
274     @ParameterizedTest(name = "inputData={0}")
275     @MethodSource("inputData")
276     void testReadLineByLineFile(final String inputData) throws IOException {
277         final Path tempFile = Files.createTempFile(getClass().getSimpleName(), ".txt");
278         try (InputStream inputStream = Files.newInputStream(tempFile);
279              OutputStream outputStream = Files.newOutputStream(tempFile)) {
280 
281             doTestReadLineByLine(inputData, inputStream, outputStream);
282         } finally {
283             Files.delete(tempFile);
284         }
285     }
286 
287     @ParameterizedTest(name = "inputData={0}")
288     @MethodSource("inputData")
289     void testReadLineByLineQueue(final String inputData) throws IOException {
290         final String[] lines = inputData.split("\n");
291         final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
292         try (QueueInputStream inputStream = QueueInputStream.builder()
293                 .setBlockingQueue(queue)
294                 .setTimeout(Duration.ofHours(1))
295                 .get();
296              QueueOutputStream outputStream = inputStream.newQueueOutputStream()) {
297 
298             doTestReadLineByLine(inputData, inputStream, outputStream);
299         }
300     }
301 
302     @Test
303     void testResetArguments() throws IOException {
304         try (QueueInputStream queueInputStream = QueueInputStream.builder().setTimeout(null).get()) {
305             assertEquals(Duration.ZERO, queueInputStream.getTimeout());
306             assertEquals(0, queueInputStream.getBlockingQueue().size());
307         }
308         try (QueueInputStream queueInputStream = QueueInputStream.builder().setBlockingQueue(null).get()) {
309             assertEquals(Duration.ZERO, queueInputStream.getTimeout());
310             assertEquals(0, queueInputStream.getBlockingQueue().size());
311         }
312     }
313 
314     @Test
315     @DisplayName("If read is interrupted while waiting, then exception is thrown")
316     void testTimeoutInterrupted() throws Exception {
317         try (QueueInputStream inputStream = QueueInputStream.builder().setTimeout(Duration.ofMinutes(2)).get();
318                 QueueOutputStream outputStream = inputStream.newQueueOutputStream()) {
319 
320             // read in a background thread
321             final AtomicBoolean result = new AtomicBoolean();
322             final CountDownLatch latch = new CountDownLatch(1);
323             final Thread thread = new Thread(() -> {
324                 // when thread is interrupted, verify ...
325                 assertThrows(IllegalStateException.class, () -> readUnbuffered(inputStream, 3));
326                 assertTrue(Thread.currentThread().isInterrupted());
327                 result.set(true);
328                 latch.countDown();
329             });
330             thread.setDaemon(true);
331             thread.start();
332 
333             // interrupt and check that verification completed
334             thread.interrupt();
335             latch.await(500, TimeUnit.MILLISECONDS);
336             assertTrue(result.get());
337         }
338     }
339 
340     @Test
341     @DisplayName("If data is not available in queue, then read will wait until wait time elapses")
342     void testTimeoutUnavailableData() throws IOException {
343         try (QueueInputStream inputStream = QueueInputStream.builder().setTimeout(Duration.ofMillis(500)).get();
344                 QueueOutputStream outputStream = inputStream.newQueueOutputStream()) {
345             final Stopwatch stopwatch = Stopwatch.createStarted();
346             final String actualData = assertTimeout(Duration.ofSeconds(1), () -> readUnbuffered(inputStream, 3));
347             stopwatch.stop();
348             assertEquals("", actualData);
349 
350             assertTrue(stopwatch.elapsed(TimeUnit.MILLISECONDS) >= 500, () -> stopwatch.toString());
351         }
352     }
353 
354     @ParameterizedTest(name = "inputData={0}")
355     @MethodSource("inputData")
356     void testUnbufferedReadWrite(final String inputData) throws IOException {
357         try (QueueInputStream inputStream = new QueueInputStream();
358                 QueueOutputStream outputStream = inputStream.newQueueOutputStream()) {
359             writeUnbuffered(outputStream, inputData);
360             final String actualData = readUnbuffered(inputStream);
361             assertEquals(inputData, actualData);
362         }
363     }
364 
365     @ParameterizedTest(name = "inputData={0}")
366     @MethodSource("inputData")
367     void testUnbufferedReadWriteWithTimeout(final String inputData) throws IOException {
368         final Duration timeout = Duration.ofMinutes(2);
369         try (QueueInputStream inputStream = QueueInputStream.builder().setTimeout(timeout).get();
370                 QueueOutputStream outputStream = inputStream.newQueueOutputStream()) {
371             assertEquals(timeout, inputStream.getTimeout());
372             writeUnbuffered(outputStream, inputData);
373             final String actualData = assertTimeout(Duration.ofSeconds(1), () -> readUnbuffered(inputStream, inputData.length()));
374             assertEquals(inputData, actualData);
375         }
376     }
377 
378     private void writeUnbuffered(final QueueOutputStream outputStream, final String inputData) throws IOException {
379         final byte[] bytes = inputData.getBytes(StandardCharsets.UTF_8);
380         outputStream.write(bytes, 0, bytes.length);
381     }
382 }