View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.commons.io.input;
18  
19  import static org.junit.jupiter.api.Assertions.assertEquals;
20  import static org.junit.jupiter.api.Assertions.assertThrows;
21  import static org.junit.jupiter.api.Assertions.assertTimeout;
22  import static org.junit.jupiter.api.Assertions.assertTrue;
23  
24  import java.io.BufferedInputStream;
25  import java.io.BufferedOutputStream;
26  import java.io.ByteArrayOutputStream;
27  import java.io.IOException;
28  import java.io.InputStream;
29  import java.nio.charset.StandardCharsets;
30  import java.time.Duration;
31  import java.util.concurrent.BlockingQueue;
32  import java.util.concurrent.CountDownLatch;
33  import java.util.concurrent.LinkedBlockingQueue;
34  import java.util.concurrent.TimeUnit;
35  import java.util.concurrent.atomic.AtomicBoolean;
36  import java.util.stream.Stream;
37  
38  import org.apache.commons.io.IOUtils;
39  import org.apache.commons.io.output.QueueOutputStream;
40  import org.apache.commons.io.output.QueueOutputStreamTest;
41  import org.apache.commons.lang3.StringUtils;
42  import org.junit.jupiter.api.DisplayName;
43  import org.junit.jupiter.api.Test;
44  import org.junit.jupiter.params.ParameterizedTest;
45  import org.junit.jupiter.params.provider.Arguments;
46  import org.junit.jupiter.params.provider.MethodSource;
47  
48  import com.google.common.base.Stopwatch;
49  
50  /**
51   * Test {@link QueueInputStream}.
52   *
53   * @see QueueOutputStreamTest
54   */
55  public class QueueInputStreamTest {
56  
57      public static Stream<Arguments> inputData() {
58          // @formatter:off
59          return Stream.of(Arguments.of(""),
60                  Arguments.of("1"),
61                  Arguments.of("12"),
62                  Arguments.of("1234"),
63                  Arguments.of("12345678"),
64                  Arguments.of(StringUtils.repeat("A", 4095)),
65                  Arguments.of(StringUtils.repeat("A", 4096)),
66                  Arguments.of(StringUtils.repeat("A", 4097)),
67                  Arguments.of(StringUtils.repeat("A", 8191)),
68                  Arguments.of(StringUtils.repeat("A", 8192)),
69                  Arguments.of(StringUtils.repeat("A", 8193)),
70                  Arguments.of(StringUtils.repeat("A", 8192 * 4)));
71          // @formatter:on
72      }
73  
74      private int defaultBufferSize() {
75          return 8192;
76      }
77  
78      private String readUnbuffered(final InputStream inputStream) throws IOException {
79          return readUnbuffered(inputStream, Integer.MAX_VALUE);
80      }
81  
82      private String readUnbuffered(final InputStream inputStream, final int maxBytes) throws IOException {
83          if (maxBytes == 0) {
84              return "";
85          }
86  
87          final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
88          int n = -1;
89          while ((n = inputStream.read()) != -1) {
90              byteArrayOutputStream.write(n);
91              if (byteArrayOutputStream.size() >= maxBytes) {
92                  break;
93              }
94          }
95          return byteArrayOutputStream.toString(StandardCharsets.UTF_8.name());
96      }
97  
98      @ParameterizedTest(name = "inputData={0}")
99      @MethodSource("inputData")
100     public void testBufferedReads(final String inputData) throws IOException {
101         final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
102         try (BufferedInputStream inputStream = new BufferedInputStream(new QueueInputStream(queue));
103                 final QueueOutputStream outputStream = new QueueOutputStream(queue)) {
104             outputStream.write(inputData.getBytes(StandardCharsets.UTF_8));
105             final String actualData = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
106             assertEquals(inputData, actualData);
107         }
108     }
109 
110     @ParameterizedTest(name = "inputData={0}")
111     @MethodSource("inputData")
112     public void testBufferedReadWrite(final String inputData) throws IOException {
113         final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
114         try (BufferedInputStream inputStream = new BufferedInputStream(new QueueInputStream(queue));
115                 final BufferedOutputStream outputStream = new BufferedOutputStream(new QueueOutputStream(queue), defaultBufferSize())) {
116             outputStream.write(inputData.getBytes(StandardCharsets.UTF_8));
117             outputStream.flush();
118             final String dataCopy = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
119             assertEquals(inputData, dataCopy);
120         }
121     }
122 
123     @ParameterizedTest(name = "inputData={0}")
124     @MethodSource("inputData")
125     public void testBufferedWrites(final String inputData) throws IOException {
126         final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
127         try (QueueInputStream inputStream = new QueueInputStream(queue);
128                 final BufferedOutputStream outputStream = new BufferedOutputStream(new QueueOutputStream(queue), defaultBufferSize())) {
129             outputStream.write(inputData.getBytes(StandardCharsets.UTF_8));
130             outputStream.flush();
131             final String actualData = readUnbuffered(inputStream);
132             assertEquals(inputData, actualData);
133         }
134     }
135 
136     @Test
137     public void testInvalidArguments() {
138         assertThrows(NullPointerException.class, () -> new QueueInputStream(null), "queue is required");
139         assertThrows(IllegalArgumentException.class, () -> QueueInputStream.builder().setTimeout(Duration.ofMillis(-1)).get(), "waitTime must not be negative");
140     }
141 
142     @Test
143     public void testResetArguments() throws IOException {
144         try (QueueInputStream queueInputStream = QueueInputStream.builder().setTimeout(null).get()) {
145             assertEquals(Duration.ZERO, queueInputStream.getTimeout());
146             assertEquals(0, queueInputStream.getBlockingQueue().size());
147         }
148         try (QueueInputStream queueInputStream = QueueInputStream.builder().setBlockingQueue(null).get()) {
149             assertEquals(Duration.ZERO, queueInputStream.getTimeout());
150             assertEquals(0, queueInputStream.getBlockingQueue().size());
151         }
152     }
153 
154     @Test
155     @DisplayName("If read is interrupted while waiting, then exception is thrown")
156     public void testTimeoutInterrupted() throws Exception {
157         try (QueueInputStream inputStream = QueueInputStream.builder().setTimeout(Duration.ofMinutes(2)).get();
158                 final QueueOutputStream outputStream = inputStream.newQueueOutputStream()) {
159 
160             // read in a background thread
161             final AtomicBoolean result = new AtomicBoolean();
162             final CountDownLatch latch = new CountDownLatch(1);
163             final Thread thread = new Thread(() -> {
164                 // when thread is interrupted, verify ...
165                 assertThrows(IllegalStateException.class, () -> readUnbuffered(inputStream, 3));
166                 assertTrue(Thread.currentThread().isInterrupted());
167                 result.set(true);
168                 latch.countDown();
169             });
170             thread.setDaemon(true);
171             thread.start();
172 
173             // interrupt and check that verification completed
174             thread.interrupt();
175             latch.await(500, TimeUnit.MILLISECONDS);
176             assertTrue(result.get());
177         }
178     }
179 
180     @Test
181     @DisplayName("If data is not available in queue, then read will wait until wait time elapses")
182     public void testTimeoutUnavailableData() throws IOException {
183         try (QueueInputStream inputStream = QueueInputStream.builder().setTimeout(Duration.ofMillis(500)).get();
184                 final QueueOutputStream outputStream = inputStream.newQueueOutputStream()) {
185             final Stopwatch stopwatch = Stopwatch.createStarted();
186             final String actualData = assertTimeout(Duration.ofSeconds(1), () -> readUnbuffered(inputStream, 3));
187             stopwatch.stop();
188             assertEquals("", actualData);
189 
190             assertTrue(stopwatch.elapsed(TimeUnit.MILLISECONDS) >= 500, () -> stopwatch.toString());
191         }
192     }
193 
194     @ParameterizedTest(name = "inputData={0}")
195     @MethodSource("inputData")
196     public void testUnbufferedReadWrite(final String inputData) throws IOException {
197         try (QueueInputStream inputStream = new QueueInputStream();
198                 final QueueOutputStream outputStream = inputStream.newQueueOutputStream()) {
199             writeUnbuffered(outputStream, inputData);
200             final String actualData = readUnbuffered(inputStream);
201             assertEquals(inputData, actualData);
202         }
203     }
204 
205     @ParameterizedTest(name = "inputData={0}")
206     @MethodSource("inputData")
207     public void testUnbufferedReadWriteWithTimeout(final String inputData) throws IOException {
208         final Duration timeout = Duration.ofMinutes(2);
209         try (QueueInputStream inputStream = QueueInputStream.builder().setTimeout(timeout).get();
210                 final QueueOutputStream outputStream = inputStream.newQueueOutputStream()) {
211             assertEquals(timeout, inputStream.getTimeout());
212             writeUnbuffered(outputStream, inputData);
213             final String actualData = assertTimeout(Duration.ofSeconds(1), () -> readUnbuffered(inputStream, inputData.length()));
214             assertEquals(inputData, actualData);
215         }
216     }
217 
218     private void writeUnbuffered(final QueueOutputStream outputStream, final String inputData) throws IOException {
219         final byte[] bytes = inputData.getBytes(StandardCharsets.UTF_8);
220         outputStream.write(bytes, 0, bytes.length);
221     }
222 }