1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.commons.io.input;
18
19 import static org.junit.jupiter.api.Assertions.assertEquals;
20 import static org.junit.jupiter.api.Assertions.assertNotEquals;
21 import static org.junit.jupiter.api.Assertions.assertNull;
22 import static org.junit.jupiter.api.Assertions.assertThrows;
23 import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
24 import static org.junit.jupiter.api.Assertions.assertTrue;
25
26 import java.io.ByteArrayInputStream;
27 import java.io.IOException;
28 import java.io.InputStream;
29 import java.nio.file.Files;
30 import java.nio.file.Paths;
31 import java.util.concurrent.atomic.AtomicBoolean;
32
33 import org.apache.commons.io.IOUtils;
34 import org.apache.commons.io.input.ObservableInputStream.Observer;
35 import org.apache.commons.io.output.NullOutputStream;
36 import org.apache.commons.io.test.CustomIOException;
37 import org.junit.jupiter.api.Test;
38
39
40
41
42 class ObservableInputStreamTest {
43
44 private static final class DataViewObserver extends MethodCountObserver {
45 private byte[] buffer;
46 private int lastValue = -1;
47 private int length = -1;
48 private int offset = -1;
49
50 @Override
51 public void data(final byte[] buffer, final int offset, final int length) throws IOException {
52 this.buffer = buffer;
53 this.offset = offset;
54 this.length = length;
55 }
56
57 @Override
58 public void data(final int value) throws IOException {
59 super.data(value);
60 lastValue = value;
61 }
62 }
63
64 private static final class LengthObserver extends Observer {
65 private long total;
66
67 @Override
68 public void data(final byte[] buffer, final int offset, final int length) throws IOException {
69 this.total += length;
70 }
71
72 @Override
73 public void data(final int value) throws IOException {
74 total++;
75 }
76
77 public long getTotal() {
78 return total;
79 }
80 }
81
82 private static class MethodCountObserver extends Observer {
83 private long closedCount;
84 private long dataBufferCount;
85 private long dataCount;
86 private long errorCount;
87 private long finishedCount;
88
89 @Override
90 public void closed() throws IOException {
91 closedCount++;
92 }
93
94 @Override
95 public void data(final byte[] buffer, final int offset, final int length) throws IOException {
96 dataBufferCount++;
97 }
98
99 @Override
100 public void data(final int value) throws IOException {
101 dataCount++;
102 }
103
104 @Override
105 public void error(final IOException exception) throws IOException {
106 errorCount++;
107 }
108
109 @Override
110 public void finished() throws IOException {
111 finishedCount++;
112 }
113
114 public long getClosedCount() {
115 return closedCount;
116 }
117
118 public long getDataBufferCount() {
119 return dataBufferCount;
120 }
121
122 public long getDataCount() {
123 return dataCount;
124 }
125
126 public long getErrorCount() {
127 return errorCount;
128 }
129
130 public long getFinishedCount() {
131 return finishedCount;
132 }
133
134 }
135
136 private ObservableInputStream brokenObservableInputStream() {
137 return new ObservableInputStream(BrokenInputStream.INSTANCE);
138 }
139
140 private InputStream createInputStream() {
141 final byte[] buffer = MessageDigestInputStreamTest.generateRandomByteStream(IOUtils.DEFAULT_BUFFER_SIZE);
142 return createInputStream(new ByteArrayInputStream(buffer));
143 }
144
145 private ObservableInputStream createInputStream(final InputStream origin) {
146 return new ObservableInputStream(origin);
147 }
148
149 @Test
150 void testAfterReadConsumer() throws Exception {
151 final AtomicBoolean boolRef = new AtomicBoolean();
152
153 try (InputStream bounded = new ObservableInputStream.Builder()
154 .setCharSequence("Hi")
155 .setAfterRead(i -> boolRef.set(true))
156 .get()) {
157 IOUtils.consume(bounded);
158 }
159
160 assertTrue(boolRef.get());
161
162 final String message = "test exception message";
163
164 try (InputStream bounded = new ObservableInputStream.Builder()
165 .setCharSequence("Hi")
166 .setAfterRead(i -> {
167 throw new CustomIOException(message);
168 })
169 .get()) {
170 assertEquals(message, assertThrowsExactly(CustomIOException.class, () -> IOUtils.consume(bounded)).getMessage());
171 }
172
173 }
174
175 @SuppressWarnings("resource")
176 @Test
177 void testAvailableAfterClose() throws Exception {
178 final InputStream shadow;
179 try (InputStream in = createInputStream()) {
180 assertTrue(in.available() > 0);
181 shadow = in;
182 }
183 assertEquals(0, shadow.available());
184 }
185
186 @Test
187 void testAvailableAfterOpen() throws Exception {
188 try (InputStream in = createInputStream()) {
189 assertTrue(in.available() > 0);
190 assertNotEquals(IOUtils.EOF, in.read());
191 assertTrue(in.available() > 0);
192 }
193 }
194
195 @Test
196 void testBrokenInputStreamRead() throws IOException {
197 try (ObservableInputStream ois = brokenObservableInputStream()) {
198 assertThrows(IOException.class, ois::read);
199 }
200 }
201
202 @Test
203 void testBrokenInputStreamReadBuffer() throws IOException {
204 try (ObservableInputStream ois = brokenObservableInputStream()) {
205 assertThrows(IOException.class, () -> ois.read(new byte[1]));
206 }
207 }
208
209 @Test
210 void testBrokenInputStreamReadSubBuffer() throws IOException {
211 try (ObservableInputStream ois = brokenObservableInputStream()) {
212 assertThrows(IOException.class, () -> ois.read(new byte[2], 0, 1));
213 }
214 }
215
216
217
218
219 @Test
220 void testDataByteCalled_add() throws Exception {
221 final byte[] buffer = MessageDigestInputStreamTest.generateRandomByteStream(IOUtils.DEFAULT_BUFFER_SIZE);
222 final DataViewObserver lko = new DataViewObserver();
223 try (ObservableInputStream ois = new ObservableInputStream(new ByteArrayInputStream(buffer))) {
224 assertEquals(-1, lko.lastValue);
225 ois.read();
226 assertEquals(-1, lko.lastValue);
227 assertEquals(0, lko.getFinishedCount());
228 assertEquals(0, lko.getClosedCount());
229 ois.add(lko);
230 for (int i = 1; i < buffer.length; i++) {
231 final int result = ois.read();
232 assertEquals((byte) result, buffer[i]);
233 assertEquals(result, lko.lastValue);
234 assertEquals(0, lko.getFinishedCount());
235 assertEquals(0, lko.getClosedCount());
236 }
237 final int result = ois.read();
238 assertEquals(-1, result);
239 assertEquals(1, lko.getFinishedCount());
240 assertEquals(0, lko.getClosedCount());
241 ois.close();
242 assertEquals(1, lko.getFinishedCount());
243 assertEquals(1, lko.getClosedCount());
244 }
245 }
246
247
248
249
250 @Test
251 void testDataByteCalled_ctor() throws Exception {
252 final byte[] buffer = MessageDigestInputStreamTest.generateRandomByteStream(IOUtils.DEFAULT_BUFFER_SIZE);
253 final DataViewObserver lko = new DataViewObserver();
254 try (ObservableInputStream ois = new ObservableInputStream(new ByteArrayInputStream(buffer), lko)) {
255 assertEquals(-1, lko.lastValue);
256 ois.read();
257 assertNotEquals(-1, lko.lastValue);
258 assertEquals(0, lko.getFinishedCount());
259 assertEquals(0, lko.getClosedCount());
260 for (int i = 1; i < buffer.length; i++) {
261 final int result = ois.read();
262 assertEquals((byte) result, buffer[i]);
263 assertEquals(result, lko.lastValue);
264 assertEquals(0, lko.getFinishedCount());
265 assertEquals(0, lko.getClosedCount());
266 }
267 final int result = ois.read();
268 assertEquals(-1, result);
269 assertEquals(1, lko.getFinishedCount());
270 assertEquals(0, lko.getClosedCount());
271 ois.close();
272 assertEquals(1, lko.getFinishedCount());
273 assertEquals(1, lko.getClosedCount());
274 }
275 }
276
277
278
279
280 @Test
281 void testDataBytesCalled() throws Exception {
282 final byte[] buffer = MessageDigestInputStreamTest.generateRandomByteStream(IOUtils.DEFAULT_BUFFER_SIZE);
283 try (ByteArrayInputStream bais = new ByteArrayInputStream(buffer);
284 ObservableInputStream ois = createInputStream(bais)) {
285 final DataViewObserver observer = new DataViewObserver();
286 final byte[] readBuffer = new byte[23];
287 assertNull(observer.buffer);
288 ois.read(readBuffer);
289 assertNull(observer.buffer);
290 ois.add(observer);
291 for (;;) {
292 if (bais.available() >= 2048) {
293 final int result = ois.read(readBuffer);
294 if (result == -1) {
295 ois.close();
296 break;
297 }
298 assertEquals(readBuffer, observer.buffer);
299 assertEquals(0, observer.offset);
300 assertEquals(readBuffer.length, observer.length);
301 } else {
302 final int res = Math.min(11, bais.available());
303 final int result = ois.read(readBuffer, 1, 11);
304 if (result == -1) {
305 ois.close();
306 break;
307 }
308 assertEquals(readBuffer, observer.buffer);
309 assertEquals(1, observer.offset);
310 assertEquals(res, observer.length);
311 }
312 }
313 }
314 }
315
316 @Test
317 void testGetObservers0() throws IOException {
318 try (ObservableInputStream ois = new ObservableInputStream(new NullInputStream())) {
319 assertTrue(ois.getObservers().isEmpty());
320 }
321 }
322
323 @Test
324 void testGetObservers1() throws IOException {
325 final DataViewObserver observer0 = new DataViewObserver();
326 try (ObservableInputStream ois = new ObservableInputStream(new NullInputStream(), observer0)) {
327 assertEquals(observer0, ois.getObservers().get(0));
328 }
329 }
330
331 @Test
332 void testGetObserversOrder() throws IOException {
333 final DataViewObserver observer0 = new DataViewObserver();
334 final DataViewObserver observer1 = new DataViewObserver();
335 try (ObservableInputStream ois = new ObservableInputStream(new NullInputStream(), observer0, observer1)) {
336 assertEquals(observer0, ois.getObservers().get(0));
337 assertEquals(observer1, ois.getObservers().get(1));
338 }
339 }
340
341 private void testNotificationCallbacks(final int bufferSize) throws IOException {
342 final byte[] buffer = IOUtils.byteArray();
343 final LengthObserver lengthObserver = new LengthObserver();
344 final MethodCountObserver methodCountObserver = new MethodCountObserver();
345 try (ObservableInputStream ois = new ObservableInputStream(new ByteArrayInputStream(buffer), lengthObserver, methodCountObserver)) {
346 assertEquals(IOUtils.DEFAULT_BUFFER_SIZE, IOUtils.copy(ois, NullOutputStream.INSTANCE, bufferSize));
347 }
348 assertEquals(IOUtils.DEFAULT_BUFFER_SIZE, lengthObserver.getTotal());
349 assertEquals(1, methodCountObserver.getClosedCount());
350 assertEquals(1, methodCountObserver.getFinishedCount());
351 assertEquals(0, methodCountObserver.getErrorCount());
352 assertEquals(0, methodCountObserver.getDataCount());
353 assertEquals(buffer.length / bufferSize, methodCountObserver.getDataBufferCount());
354 }
355
356 @Test
357 void testNotificationCallbacksBufferSize1() throws Exception {
358 testNotificationCallbacks(1);
359 }
360
361 @Test
362 void testNotificationCallbacksBufferSize2() throws Exception {
363 testNotificationCallbacks(2);
364 }
365
366 @Test
367 void testNotificationCallbacksBufferSizeDefault() throws Exception {
368 testNotificationCallbacks(IOUtils.DEFAULT_BUFFER_SIZE);
369 }
370
371 @Test
372 void testReadAfterClose_ByteArrayInputStream() throws Exception {
373 try (InputStream in = createInputStream()) {
374 in.close();
375 assertNotEquals(IOUtils.EOF, in.read());
376 }
377 }
378
379 @SuppressWarnings("resource")
380 @Test
381 void testReadAfterClose_ChannelInputStream() throws Exception {
382 try (InputStream in = createInputStream(Files.newInputStream(Paths.get("src/test/resources/org/apache/commons/io/abitmorethan16k.txt")))) {
383 in.close();
384
385 assertThrows(IOException.class, in::read);
386 }
387 }
388
389 }