1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.commons.io.input;
18
19 import static java.nio.charset.StandardCharsets.UTF_8;
20 import static org.apache.commons.lang3.ArrayUtils.EMPTY_BYTE_ARRAY;
21 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
22 import static org.junit.jupiter.api.Assertions.assertEquals;
23 import static org.junit.jupiter.api.Assertions.assertThrows;
24 import static org.junit.jupiter.api.Assertions.assertTimeout;
25 import static org.junit.jupiter.api.Assertions.assertTrue;
26 import static org.junit.jupiter.api.Assumptions.assumeFalse;
27 import static org.junit.jupiter.api.DynamicTest.dynamicTest;
28
29 import java.io.BufferedInputStream;
30 import java.io.BufferedOutputStream;
31 import java.io.BufferedReader;
32 import java.io.ByteArrayOutputStream;
33 import java.io.IOException;
34 import java.io.InputStream;
35 import java.io.InputStreamReader;
36 import java.io.OutputStream;
37 import java.nio.charset.StandardCharsets;
38 import java.nio.file.Files;
39 import java.nio.file.Path;
40 import java.time.Duration;
41 import java.util.concurrent.BlockingQueue;
42 import java.util.concurrent.CompletableFuture;
43 import java.util.concurrent.CountDownLatch;
44 import java.util.concurrent.LinkedBlockingQueue;
45 import java.util.concurrent.TimeUnit;
46 import java.util.concurrent.atomic.AtomicBoolean;
47 import java.util.stream.Stream;
48
49 import org.apache.commons.io.IOUtils;
50 import org.apache.commons.io.output.QueueOutputStream;
51 import org.apache.commons.io.output.QueueOutputStreamTest;
52 import org.apache.commons.lang3.StringUtils;
53 import org.junit.jupiter.api.DisplayName;
54 import org.junit.jupiter.api.DynamicTest;
55 import org.junit.jupiter.api.Test;
56 import org.junit.jupiter.api.TestFactory;
57 import org.junit.jupiter.params.ParameterizedTest;
58 import org.junit.jupiter.params.provider.Arguments;
59 import org.junit.jupiter.params.provider.MethodSource;
60
61 import com.google.common.base.Stopwatch;
62
63
64
65
66
67
68 public class QueueInputStreamTest {
69
70 public static Stream<Arguments> inputData() {
71
72 return Stream.of(Arguments.of(""),
73 Arguments.of("1"),
74 Arguments.of("12"),
75 Arguments.of("1234"),
76 Arguments.of("12345678"),
77 Arguments.of(StringUtils.repeat("A", 4095)),
78 Arguments.of(StringUtils.repeat("A", 4096)),
79 Arguments.of(StringUtils.repeat("A", 4097)),
80 Arguments.of(StringUtils.repeat("A", 8191)),
81 Arguments.of(StringUtils.repeat("A", 8192)),
82 Arguments.of(StringUtils.repeat("A", 8193)),
83 Arguments.of(StringUtils.repeat("A", 8192 * 4)));
84
85 }
86
87 @TestFactory
88 public DynamicTest[] bulkReadErrorHandlingTests() {
89 final QueueInputStream queueInputStream = new QueueInputStream();
90 return new DynamicTest[] {
91 dynamicTest("Offset too big", () ->
92 assertThrows(IndexOutOfBoundsException.class, () ->
93 queueInputStream.read(EMPTY_BYTE_ARRAY, 1, 0))),
94
95 dynamicTest("Offset negative", () ->
96 assertThrows(IndexOutOfBoundsException.class, () ->
97 queueInputStream.read(EMPTY_BYTE_ARRAY, -1, 0))),
98
99 dynamicTest("Length too big", () ->
100 assertThrows(IndexOutOfBoundsException.class, () ->
101 queueInputStream.read(EMPTY_BYTE_ARRAY, 0, 1))),
102
103 dynamicTest("Length negative", () ->
104 assertThrows(IndexOutOfBoundsException.class, () ->
105 queueInputStream.read(EMPTY_BYTE_ARRAY, 0, -1))),
106 };
107 }
108
109 private int defaultBufferSize() {
110 return 8192;
111 }
112
113 private void doTestReadLineByLine(final String inputData, final InputStream inputStream, final OutputStream outputStream) throws IOException {
114 final String[] lines = inputData.split("\n");
115 try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, UTF_8))) {
116 for (final String line : lines) {
117 outputStream.write(line.getBytes(UTF_8));
118 outputStream.write('\n');
119
120 final String actualLine = reader.readLine();
121 assertEquals(line, actualLine);
122 }
123 }
124 }
125
126 private String readUnbuffered(final InputStream inputStream) throws IOException {
127 return readUnbuffered(inputStream, Integer.MAX_VALUE);
128 }
129
130 private String readUnbuffered(final InputStream inputStream, final int maxBytes) throws IOException {
131 if (maxBytes == 0) {
132 return "";
133 }
134
135 final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
136 int n = -1;
137 while ((n = inputStream.read()) != -1) {
138 byteArrayOutputStream.write(n);
139 if (byteArrayOutputStream.size() >= maxBytes) {
140 break;
141 }
142 }
143 return byteArrayOutputStream.toString(StandardCharsets.UTF_8.name());
144 }
145
146 @SuppressWarnings("resource")
147 @ParameterizedTest(name = "inputData={0}")
148 @MethodSource("inputData")
149 void testAvailableAfterClose(final String inputData) throws IOException {
150 final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
151 final InputStream shadow;
152 try (InputStream inputStream = new QueueInputStream(queue)) {
153 shadow = inputStream;
154 }
155 assertEquals(0, shadow.available());
156 }
157
158 @ParameterizedTest(name = "inputData={0}")
159 @MethodSource("inputData")
160 void testAvailableAfterOpen(final String inputData) throws IOException {
161 final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
162 try (InputStream inputStream = new QueueInputStream(queue)) {
163
164 assertEquals(0, inputStream.available());
165 IOUtils.toString(inputStream, StandardCharsets.UTF_8);
166 assertEquals(0, inputStream.available());
167 }
168 }
169
170 @ParameterizedTest(name = "inputData={0}")
171 @MethodSource("inputData")
172 void testBufferedReads(final String inputData) throws IOException {
173 final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
174 try (BufferedInputStream inputStream = new BufferedInputStream(new QueueInputStream(queue));
175 QueueOutputStream outputStream = new QueueOutputStream(queue)) {
176 outputStream.write(inputData.getBytes(StandardCharsets.UTF_8));
177 final String actualData = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
178 assertEquals(inputData, actualData);
179 }
180 }
181
182 @ParameterizedTest(name = "inputData={0}")
183 @MethodSource("inputData")
184 void testBufferedReadWrite(final String inputData) throws IOException {
185 final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
186 try (BufferedInputStream inputStream = new BufferedInputStream(new QueueInputStream(queue));
187 BufferedOutputStream outputStream = new BufferedOutputStream(new QueueOutputStream(queue), defaultBufferSize())) {
188 outputStream.write(inputData.getBytes(StandardCharsets.UTF_8));
189 outputStream.flush();
190 final String dataCopy = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
191 assertEquals(inputData, dataCopy);
192 }
193 }
194
195 @ParameterizedTest(name = "inputData={0}")
196 @MethodSource("inputData")
197 void testBufferedWrites(final String inputData) throws IOException {
198 final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
199 try (QueueInputStream inputStream = new QueueInputStream(queue);
200 BufferedOutputStream outputStream = new BufferedOutputStream(new QueueOutputStream(queue), defaultBufferSize())) {
201 outputStream.write(inputData.getBytes(StandardCharsets.UTF_8));
202 outputStream.flush();
203 final String actualData = readUnbuffered(inputStream);
204 assertEquals(inputData, actualData);
205 }
206 }
207
208 @ParameterizedTest(name = "inputData={0}")
209 @MethodSource("inputData")
210 void testBulkReadWaiting(final String inputData) throws IOException {
211 assumeFalse(inputData.isEmpty());
212
213 final CountDownLatch onPollLatch = new CountDownLatch(1);
214 final CountDownLatch afterWriteLatch = new CountDownLatch(1);
215 final LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>() {
216 @Override
217 public Integer poll(final long timeout, final TimeUnit unit) throws InterruptedException {
218 onPollLatch.countDown();
219 afterWriteLatch.await(1, TimeUnit.HOURS);
220 return super.poll(timeout, unit);
221 }
222 };
223
224
225
226 try (QueueInputStream queueInputStream = QueueInputStream.builder()
227 .setBlockingQueue(queue)
228 .setTimeout(Duration.ofHours(1))
229 .get()) {
230 final QueueOutputStream queueOutputStream = queueInputStream.newQueueOutputStream();
231 final CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
232 try {
233 onPollLatch.await(1, TimeUnit.HOURS);
234 queueOutputStream.write(inputData.getBytes(UTF_8));
235 afterWriteLatch.countDown();
236 } catch (final Exception e) {
237 throw new RuntimeException(e);
238 }
239 });
240
241 final byte[] data = new byte[inputData.length()];
242 final int read = queueInputStream.read(data, 0, data.length);
243 assertEquals(inputData.length(), read);
244 final String outputData = new String(data, 0, read, StandardCharsets.UTF_8);
245 assertEquals(inputData, outputData);
246 assertDoesNotThrow(() -> future.get());
247 }
248 }
249
250 @Test
251 void testBulkReadZeroLength() {
252 final QueueInputStream queueInputStream = new QueueInputStream();
253 final int read = queueInputStream.read(EMPTY_BYTE_ARRAY, 0, 0);
254 assertEquals(0, read);
255 }
256
257 @Test
258 void testInvalidArguments() {
259 assertThrows(IllegalArgumentException.class, () -> QueueInputStream.builder().setTimeout(Duration.ofMillis(-1)).get(), "waitTime must not be negative");
260 }
261
262 @SuppressWarnings("resource")
263 @ParameterizedTest(name = "inputData={0}")
264 @MethodSource("inputData")
265 void testReadAfterClose(final String inputData) throws IOException {
266 final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
267 final InputStream shadow;
268 try (InputStream inputStream = new QueueInputStream(queue)) {
269 shadow = inputStream;
270 }
271 assertEquals(IOUtils.EOF, shadow.read());
272 }
273
274 @ParameterizedTest(name = "inputData={0}")
275 @MethodSource("inputData")
276 void testReadLineByLineFile(final String inputData) throws IOException {
277 final Path tempFile = Files.createTempFile(getClass().getSimpleName(), ".txt");
278 try (InputStream inputStream = Files.newInputStream(tempFile);
279 OutputStream outputStream = Files.newOutputStream(tempFile)) {
280
281 doTestReadLineByLine(inputData, inputStream, outputStream);
282 } finally {
283 Files.delete(tempFile);
284 }
285 }
286
287 @ParameterizedTest(name = "inputData={0}")
288 @MethodSource("inputData")
289 void testReadLineByLineQueue(final String inputData) throws IOException {
290 final String[] lines = inputData.split("\n");
291 final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
292 try (QueueInputStream inputStream = QueueInputStream.builder()
293 .setBlockingQueue(queue)
294 .setTimeout(Duration.ofHours(1))
295 .get();
296 QueueOutputStream outputStream = inputStream.newQueueOutputStream()) {
297
298 doTestReadLineByLine(inputData, inputStream, outputStream);
299 }
300 }
301
302 @Test
303 void testResetArguments() throws IOException {
304 try (QueueInputStream queueInputStream = QueueInputStream.builder().setTimeout(null).get()) {
305 assertEquals(Duration.ZERO, queueInputStream.getTimeout());
306 assertEquals(0, queueInputStream.getBlockingQueue().size());
307 }
308 try (QueueInputStream queueInputStream = QueueInputStream.builder().setBlockingQueue(null).get()) {
309 assertEquals(Duration.ZERO, queueInputStream.getTimeout());
310 assertEquals(0, queueInputStream.getBlockingQueue().size());
311 }
312 }
313
314 @Test
315 @DisplayName("If read is interrupted while waiting, then exception is thrown")
316 void testTimeoutInterrupted() throws Exception {
317 try (QueueInputStream inputStream = QueueInputStream.builder().setTimeout(Duration.ofMinutes(2)).get();
318 QueueOutputStream outputStream = inputStream.newQueueOutputStream()) {
319
320
321 final AtomicBoolean result = new AtomicBoolean();
322 final CountDownLatch latch = new CountDownLatch(1);
323 final Thread thread = new Thread(() -> {
324
325 assertThrows(IllegalStateException.class, () -> readUnbuffered(inputStream, 3));
326 assertTrue(Thread.currentThread().isInterrupted());
327 result.set(true);
328 latch.countDown();
329 });
330 thread.setDaemon(true);
331 thread.start();
332
333
334 thread.interrupt();
335 latch.await(500, TimeUnit.MILLISECONDS);
336 assertTrue(result.get());
337 }
338 }
339
340 @Test
341 @DisplayName("If data is not available in queue, then read will wait until wait time elapses")
342 void testTimeoutUnavailableData() throws IOException {
343 try (QueueInputStream inputStream = QueueInputStream.builder().setTimeout(Duration.ofMillis(500)).get();
344 QueueOutputStream outputStream = inputStream.newQueueOutputStream()) {
345 final Stopwatch stopwatch = Stopwatch.createStarted();
346 final String actualData = assertTimeout(Duration.ofSeconds(1), () -> readUnbuffered(inputStream, 3));
347 stopwatch.stop();
348 assertEquals("", actualData);
349
350 assertTrue(stopwatch.elapsed(TimeUnit.MILLISECONDS) >= 500, () -> stopwatch.toString());
351 }
352 }
353
354 @ParameterizedTest(name = "inputData={0}")
355 @MethodSource("inputData")
356 void testUnbufferedReadWrite(final String inputData) throws IOException {
357 try (QueueInputStream inputStream = new QueueInputStream();
358 QueueOutputStream outputStream = inputStream.newQueueOutputStream()) {
359 writeUnbuffered(outputStream, inputData);
360 final String actualData = readUnbuffered(inputStream);
361 assertEquals(inputData, actualData);
362 }
363 }
364
365 @ParameterizedTest(name = "inputData={0}")
366 @MethodSource("inputData")
367 void testUnbufferedReadWriteWithTimeout(final String inputData) throws IOException {
368 final Duration timeout = Duration.ofMinutes(2);
369 try (QueueInputStream inputStream = QueueInputStream.builder().setTimeout(timeout).get();
370 QueueOutputStream outputStream = inputStream.newQueueOutputStream()) {
371 assertEquals(timeout, inputStream.getTimeout());
372 writeUnbuffered(outputStream, inputData);
373 final String actualData = assertTimeout(Duration.ofSeconds(1), () -> readUnbuffered(inputStream, inputData.length()));
374 assertEquals(inputData, actualData);
375 }
376 }
377
378 private void writeUnbuffered(final QueueOutputStream outputStream, final String inputData) throws IOException {
379 final byte[] bytes = inputData.getBytes(StandardCharsets.UTF_8);
380 outputStream.write(bytes, 0, bytes.length);
381 }
382 }