1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.commons.io.output;
18
19 import static java.nio.charset.StandardCharsets.UTF_8;
20 import static java.util.concurrent.TimeUnit.SECONDS;
21 import static org.junit.jupiter.api.Assertions.assertEquals;
22 import static org.junit.jupiter.api.Assertions.assertNotNull;
23 import static org.junit.jupiter.api.Assertions.assertThrows;
24
25 import java.io.InterruptedIOException;
26 import java.nio.charset.StandardCharsets;
27 import java.util.concurrent.Callable;
28 import java.util.concurrent.Exchanger;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.LinkedBlockingQueue;
32
33 import org.apache.commons.io.IOUtils;
34 import org.apache.commons.io.input.QueueInputStream;
35 import org.apache.commons.io.input.QueueInputStreamTest;
36 import org.junit.jupiter.api.AfterAll;
37 import org.junit.jupiter.api.Test;
38
39
40
41
42
43
44 public class QueueOutputStreamTest {
45
46 private static final ExecutorService executorService = Executors.newFixedThreadPool(5);
47
48 @AfterAll
49 public static void afterAll() {
50 executorService.shutdown();
51 }
52
53 private static <T> T callInThrowAwayThread(final Callable<T> callable) throws Exception {
54 final Exchanger<T> exchanger = new Exchanger<>();
55 executorService.submit(() -> {
56 final T value = callable.call();
57 exchanger.exchange(value);
58 return null;
59 });
60 return exchanger.exchange(null);
61 }
62
63 @Test
64 public void testNullArgument() {
65 assertThrows(NullPointerException.class, () -> new QueueOutputStream(null), "queue is required");
66 }
67
68 @Test
69 public void testWriteInterrupted() throws Exception {
70 try (QueueOutputStream outputStream = new QueueOutputStream(new LinkedBlockingQueue<>(1));
71 final QueueInputStream inputStream = outputStream.newQueueInputStream()) {
72
73 final int timeout = 1;
74 final Exchanger<Thread> writerThreadExchanger = new Exchanger<>();
75 final Exchanger<Exception> exceptionExchanger = new Exchanger<>();
76 executorService.submit(() -> {
77 final Thread writerThread = writerThreadExchanger.exchange(null, timeout, SECONDS);
78 writerThread.interrupt();
79 return null;
80 });
81
82 executorService.submit(() -> {
83 try {
84 writerThreadExchanger.exchange(Thread.currentThread(), timeout, SECONDS);
85 outputStream.write("ABC".getBytes(StandardCharsets.UTF_8));
86 } catch (final Exception e) {
87 Thread.interrupted();
88 exceptionExchanger.exchange(e, timeout, SECONDS);
89 }
90 return null;
91 });
92
93 final Exception exception = exceptionExchanger.exchange(null, timeout, SECONDS);
94 assertNotNull(exception);
95 assertEquals(exception.getClass(), InterruptedIOException.class);
96 }
97 }
98
99 @Test
100 public void testWriteString() throws Exception {
101 try (QueueOutputStream outputStream = new QueueOutputStream();
102 final QueueInputStream inputStream = outputStream.newQueueInputStream()) {
103 outputStream.write("ABC".getBytes(UTF_8));
104 final String value = IOUtils.toString(inputStream, UTF_8);
105 assertEquals("ABC", value);
106 }
107 }
108
109 @Test
110 public void testWriteStringMultiThread() throws Exception {
111 try (QueueOutputStream outputStream = callInThrowAwayThread(QueueOutputStream::new);
112 final QueueInputStream inputStream = callInThrowAwayThread(outputStream::newQueueInputStream)) {
113 callInThrowAwayThread(() -> {
114 outputStream.write("ABC".getBytes(UTF_8));
115 return null;
116 });
117
118 final String value = callInThrowAwayThread(() -> IOUtils.toString(inputStream, UTF_8));
119 assertEquals("ABC", value);
120 }
121 }
122 }