1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.commons.io.input;
18
19 import static java.nio.charset.StandardCharsets.UTF_8;
20 import static org.apache.commons.lang3.ArrayUtils.EMPTY_BYTE_ARRAY;
21 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
22 import static org.junit.jupiter.api.Assertions.assertEquals;
23 import static org.junit.jupiter.api.Assertions.assertThrows;
24 import static org.junit.jupiter.api.Assertions.assertTimeout;
25 import static org.junit.jupiter.api.Assertions.assertTrue;
26 import static org.junit.jupiter.api.Assumptions.assumeFalse;
27 import static org.junit.jupiter.api.DynamicTest.dynamicTest;
28
29 import java.io.BufferedInputStream;
30 import java.io.BufferedOutputStream;
31 import java.io.BufferedReader;
32 import java.io.ByteArrayOutputStream;
33 import java.io.IOException;
34 import java.io.InputStream;
35 import java.io.InputStreamReader;
36 import java.io.OutputStream;
37 import java.nio.charset.StandardCharsets;
38 import java.nio.file.Files;
39 import java.nio.file.Path;
40 import java.time.Duration;
41 import java.util.concurrent.BlockingQueue;
42 import java.util.concurrent.CompletableFuture;
43 import java.util.concurrent.CountDownLatch;
44 import java.util.concurrent.LinkedBlockingQueue;
45 import java.util.concurrent.TimeUnit;
46 import java.util.concurrent.atomic.AtomicBoolean;
47 import java.util.stream.Stream;
48
49 import org.apache.commons.io.IOUtils;
50 import org.apache.commons.io.output.QueueOutputStream;
51 import org.apache.commons.io.output.QueueOutputStreamTest;
52 import org.apache.commons.lang3.StringUtils;
53 import org.junit.jupiter.api.DynamicTest;
54 import org.junit.jupiter.api.Test;
55 import org.junit.jupiter.api.TestFactory;
56 import org.junit.jupiter.params.ParameterizedTest;
57 import org.junit.jupiter.params.provider.Arguments;
58 import org.junit.jupiter.params.provider.MethodSource;
59
60 import com.google.common.base.Stopwatch;
61
62
63
64
65
66
67 public class QueueInputStreamTest {
68
69 public static Stream<Arguments> inputData() {
70
71 return Stream.of(Arguments.of(""),
72 Arguments.of("1"),
73 Arguments.of("12"),
74 Arguments.of("1234"),
75 Arguments.of("12345678"),
76 Arguments.of(StringUtils.repeat("A", 4095)),
77 Arguments.of(StringUtils.repeat("A", 4096)),
78 Arguments.of(StringUtils.repeat("A", 4097)),
79 Arguments.of(StringUtils.repeat("A", 8191)),
80 Arguments.of(StringUtils.repeat("A", 8192)),
81 Arguments.of(StringUtils.repeat("A", 8193)),
82 Arguments.of(StringUtils.repeat("A", 8192 * 4)));
83
84 }
85
86 @TestFactory
87 public DynamicTest[] bulkReadErrorHandlingTests() {
88 final QueueInputStream queueInputStream = new QueueInputStream();
89 return new DynamicTest[] {
90 dynamicTest("Offset too big", () ->
91 assertThrows(IndexOutOfBoundsException.class, () ->
92 queueInputStream.read(EMPTY_BYTE_ARRAY, 1, 0))),
93
94 dynamicTest("Offset negative", () ->
95 assertThrows(IndexOutOfBoundsException.class, () ->
96 queueInputStream.read(EMPTY_BYTE_ARRAY, -1, 0))),
97
98 dynamicTest("Length too big", () ->
99 assertThrows(IndexOutOfBoundsException.class, () ->
100 queueInputStream.read(EMPTY_BYTE_ARRAY, 0, 1))),
101
102 dynamicTest("Length negative", () ->
103 assertThrows(IndexOutOfBoundsException.class, () ->
104 queueInputStream.read(EMPTY_BYTE_ARRAY, 0, -1))),
105 };
106 }
107
108 private int defaultBufferSize() {
109 return 8192;
110 }
111
112 private void doTestReadLineByLine(final String inputData, final InputStream inputStream, final OutputStream outputStream) throws IOException {
113 final String[] lines = inputData.split("\n");
114 try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, UTF_8))) {
115 for (final String line : lines) {
116 outputStream.write(line.getBytes(UTF_8));
117 outputStream.write('\n');
118
119 final String actualLine = reader.readLine();
120 assertEquals(line, actualLine);
121 }
122 }
123 }
124
125 private String readUnbuffered(final InputStream inputStream) throws IOException {
126 return readUnbuffered(inputStream, Integer.MAX_VALUE);
127 }
128
129 private String readUnbuffered(final InputStream inputStream, final int maxBytes) throws IOException {
130 if (maxBytes == 0) {
131 return "";
132 }
133
134 final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
135 int n = -1;
136 while ((n = inputStream.read()) != -1) {
137 byteArrayOutputStream.write(n);
138 if (byteArrayOutputStream.size() >= maxBytes) {
139 break;
140 }
141 }
142 return byteArrayOutputStream.toString(StandardCharsets.UTF_8.name());
143 }
144
145 @SuppressWarnings("resource")
146 @ParameterizedTest(name = "inputData={0}")
147 @MethodSource("inputData")
148 void testAvailableAfterClose(final String inputData) throws IOException {
149 final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
150 final InputStream shadow;
151 try (InputStream inputStream = new QueueInputStream(queue)) {
152 shadow = inputStream;
153 }
154 assertEquals(0, shadow.available());
155 }
156
157 @ParameterizedTest(name = "inputData={0}")
158 @MethodSource("inputData")
159 void testAvailableAfterOpen(final String inputData) throws IOException {
160 final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
161 try (InputStream inputStream = new QueueInputStream(queue)) {
162
163 assertEquals(0, inputStream.available());
164 IOUtils.toString(inputStream, StandardCharsets.UTF_8);
165 assertEquals(0, inputStream.available());
166 }
167 }
168
169 @ParameterizedTest(name = "inputData={0}")
170 @MethodSource("inputData")
171 void testBufferedReads(final String inputData) throws IOException {
172 final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
173 try (BufferedInputStream inputStream = new BufferedInputStream(new QueueInputStream(queue));
174 QueueOutputStream outputStream = new QueueOutputStream(queue)) {
175 outputStream.write(inputData.getBytes(StandardCharsets.UTF_8));
176 final String actualData = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
177 assertEquals(inputData, actualData);
178 }
179 }
180
181 @ParameterizedTest(name = "inputData={0}")
182 @MethodSource("inputData")
183 void testBufferedReadWrite(final String inputData) throws IOException {
184 final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
185 try (BufferedInputStream inputStream = new BufferedInputStream(new QueueInputStream(queue));
186 BufferedOutputStream outputStream = new BufferedOutputStream(new QueueOutputStream(queue), defaultBufferSize())) {
187 outputStream.write(inputData.getBytes(StandardCharsets.UTF_8));
188 outputStream.flush();
189 final String dataCopy = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
190 assertEquals(inputData, dataCopy);
191 }
192 }
193
194 @ParameterizedTest(name = "inputData={0}")
195 @MethodSource("inputData")
196 void testBufferedWrites(final String inputData) throws IOException {
197 final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
198 try (QueueInputStream inputStream = new QueueInputStream(queue);
199 BufferedOutputStream outputStream = new BufferedOutputStream(new QueueOutputStream(queue), defaultBufferSize())) {
200 outputStream.write(inputData.getBytes(StandardCharsets.UTF_8));
201 outputStream.flush();
202 final String actualData = readUnbuffered(inputStream);
203 assertEquals(inputData, actualData);
204 }
205 }
206
207 @ParameterizedTest(name = "inputData={0}")
208 @MethodSource("inputData")
209 void testBulkReadWaiting(final String inputData) throws IOException {
210 assumeFalse(inputData.isEmpty());
211
212 final CountDownLatch onPollLatch = new CountDownLatch(1);
213 final CountDownLatch afterWriteLatch = new CountDownLatch(1);
214 final LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>() {
215 @Override
216 public Integer poll(final long timeout, final TimeUnit unit) throws InterruptedException {
217 onPollLatch.countDown();
218 afterWriteLatch.await(1, TimeUnit.HOURS);
219 return super.poll(timeout, unit);
220 }
221 };
222
223
224
225 try (QueueInputStream queueInputStream = QueueInputStream.builder()
226 .setBlockingQueue(queue)
227 .setTimeout(Duration.ofHours(1))
228 .get()) {
229 final QueueOutputStream queueOutputStream = queueInputStream.newQueueOutputStream();
230 final CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
231 try {
232 onPollLatch.await(1, TimeUnit.HOURS);
233 queueOutputStream.write(inputData.getBytes(UTF_8));
234 afterWriteLatch.countDown();
235 } catch (final Exception e) {
236 throw new RuntimeException(e);
237 }
238 });
239
240 final byte[] data = new byte[inputData.length()];
241 final int read = queueInputStream.read(data, 0, data.length);
242 assertEquals(inputData.length(), read);
243 final String outputData = new String(data, 0, read, StandardCharsets.UTF_8);
244 assertEquals(inputData, outputData);
245 assertDoesNotThrow(() -> future.get());
246 }
247 }
248
249 @Test
250 void testBulkReadZeroLength() {
251 final QueueInputStream queueInputStream = new QueueInputStream();
252 final int read = queueInputStream.read(EMPTY_BYTE_ARRAY, 0, 0);
253 assertEquals(0, read);
254 }
255
256 @Test
257 void testInvalidArguments() {
258 assertThrows(IllegalArgumentException.class, () -> QueueInputStream.builder().setTimeout(Duration.ofMillis(-1)).get(), "waitTime must not be negative");
259 }
260
261 @SuppressWarnings("resource")
262 @ParameterizedTest(name = "inputData={0}")
263 @MethodSource("inputData")
264 void testReadAfterClose(final String inputData) throws IOException {
265 final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
266 final InputStream shadow;
267 try (InputStream inputStream = new QueueInputStream(queue)) {
268 shadow = inputStream;
269 }
270 assertEquals(IOUtils.EOF, shadow.read());
271 }
272
273 @ParameterizedTest(name = "inputData={0}")
274 @MethodSource("inputData")
275 void testReadLineByLineFile(final String inputData) throws IOException {
276 final Path tempFile = Files.createTempFile(getClass().getSimpleName(), ".txt");
277 try (InputStream inputStream = Files.newInputStream(tempFile);
278 OutputStream outputStream = Files.newOutputStream(tempFile)) {
279
280 doTestReadLineByLine(inputData, inputStream, outputStream);
281 } finally {
282 Files.delete(tempFile);
283 }
284 }
285
286 @ParameterizedTest(name = "inputData={0}")
287 @MethodSource("inputData")
288 void testReadLineByLineQueue(final String inputData) throws IOException {
289 final String[] lines = inputData.split("\n");
290 final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
291 try (QueueInputStream inputStream = QueueInputStream.builder()
292 .setBlockingQueue(queue)
293 .setTimeout(Duration.ofHours(1))
294 .get();
295 QueueOutputStream outputStream = inputStream.newQueueOutputStream()) {
296
297 doTestReadLineByLine(inputData, inputStream, outputStream);
298 }
299 }
300
301 @Test
302 void testResetArguments() throws IOException {
303 try (QueueInputStream queueInputStream = QueueInputStream.builder().setTimeout(null).get()) {
304 assertEquals(Duration.ZERO, queueInputStream.getTimeout());
305 assertEquals(0, queueInputStream.getBlockingQueue().size());
306 }
307 try (QueueInputStream queueInputStream = QueueInputStream.builder().setBlockingQueue(null).get()) {
308 assertEquals(Duration.ZERO, queueInputStream.getTimeout());
309 assertEquals(0, queueInputStream.getBlockingQueue().size());
310 }
311 }
312
313 @Test
314
315
316 void testTimeoutInterrupted() throws Exception {
317 try (QueueInputStream inputStream = QueueInputStream.builder().setTimeout(Duration.ofMinutes(2)).get();
318 QueueOutputStream outputStream = inputStream.newQueueOutputStream()) {
319
320
321 final AtomicBoolean result = new AtomicBoolean();
322 final CountDownLatch latch = new CountDownLatch(1);
323 final Thread thread = new Thread(() -> {
324
325 assertThrows(IllegalStateException.class, () -> readUnbuffered(inputStream, 3));
326 assertTrue(Thread.currentThread().isInterrupted());
327 result.set(true);
328 latch.countDown();
329 }, "commons-io-QueueInputStreamTest-testTimeoutInterrupted");
330 thread.setDaemon(true);
331 thread.start();
332
333
334 thread.interrupt();
335 latch.await(500, TimeUnit.MILLISECONDS);
336 assertTrue(result.get());
337 }
338 }
339
340 @Test
341
342
343 void testTimeoutUnavailableData() throws IOException {
344 try (QueueInputStream inputStream = QueueInputStream.builder().setTimeout(Duration.ofMillis(500)).get();
345 QueueOutputStream outputStream = inputStream.newQueueOutputStream()) {
346 final Stopwatch stopwatch = Stopwatch.createStarted();
347 final String actualData = assertTimeout(Duration.ofSeconds(1), () -> readUnbuffered(inputStream, 3));
348 stopwatch.stop();
349 assertEquals("", actualData);
350
351 assertTrue(stopwatch.elapsed(TimeUnit.MILLISECONDS) >= 500, () -> stopwatch.toString());
352 }
353 }
354
355 @ParameterizedTest(name = "inputData={0}")
356 @MethodSource("inputData")
357 void testUnbufferedReadWrite(final String inputData) throws IOException {
358 try (QueueInputStream inputStream = new QueueInputStream();
359 QueueOutputStream outputStream = inputStream.newQueueOutputStream()) {
360 writeUnbuffered(outputStream, inputData);
361 final String actualData = readUnbuffered(inputStream);
362 assertEquals(inputData, actualData);
363 }
364 }
365
366 @ParameterizedTest(name = "inputData={0}")
367 @MethodSource("inputData")
368 void testUnbufferedReadWriteWithTimeout(final String inputData) throws IOException {
369 final Duration timeout = Duration.ofMinutes(2);
370 try (QueueInputStream inputStream = QueueInputStream.builder().setTimeout(timeout).get();
371 QueueOutputStream outputStream = inputStream.newQueueOutputStream()) {
372 assertEquals(timeout, inputStream.getTimeout());
373 writeUnbuffered(outputStream, inputData);
374 final String actualData = assertTimeout(Duration.ofSeconds(1), () -> readUnbuffered(inputStream, inputData.length()));
375 assertEquals(inputData, actualData);
376 }
377 }
378
379 private void writeUnbuffered(final QueueOutputStream outputStream, final String inputData) throws IOException {
380 final byte[] bytes = inputData.getBytes(StandardCharsets.UTF_8);
381 outputStream.write(bytes, 0, bytes.length);
382 }
383 }