View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *      https://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.commons.io.input;
18  
19  import static org.junit.jupiter.api.Assertions.assertEquals;
20  import static org.junit.jupiter.api.Assertions.assertNotEquals;
21  import static org.junit.jupiter.api.Assertions.assertNull;
22  import static org.junit.jupiter.api.Assertions.assertThrows;
23  import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
24  import static org.junit.jupiter.api.Assertions.assertTrue;
25  
26  import java.io.ByteArrayInputStream;
27  import java.io.IOException;
28  import java.io.InputStream;
29  import java.nio.file.Files;
30  import java.nio.file.Paths;
31  import java.util.concurrent.atomic.AtomicBoolean;
32  
33  import org.apache.commons.io.IOUtils;
34  import org.apache.commons.io.input.ObservableInputStream.Observer;
35  import org.apache.commons.io.output.NullOutputStream;
36  import org.apache.commons.io.test.CustomIOException;
37  import org.junit.jupiter.api.Test;
38  
39  /**
40   * Tests {@link ObservableInputStream}.
41   */
42  class ObservableInputStreamTest {
43  
44      private static final class DataViewObserver extends MethodCountObserver {
45          private byte[] buffer;
46          private int lastValue = -1;
47          private int length = -1;
48          private int offset = -1;
49  
50          @Override
51          public void data(final byte[] buffer, final int offset, final int length) throws IOException {
52              this.buffer = buffer;
53              this.offset = offset;
54              this.length = length;
55          }
56  
57          @Override
58          public void data(final int value) throws IOException {
59              super.data(value);
60              lastValue = value;
61          }
62      }
63  
64      private static final class LengthObserver extends Observer {
65          private long total;
66  
67          @Override
68          public void data(final byte[] buffer, final int offset, final int length) throws IOException {
69              this.total += length;
70          }
71  
72          @Override
73          public void data(final int value) throws IOException {
74              total++;
75          }
76  
77          public long getTotal() {
78              return total;
79          }
80      }
81  
82      private static class MethodCountObserver extends Observer {
83          private long closedCount;
84          private long dataBufferCount;
85          private long dataCount;
86          private long errorCount;
87          private long finishedCount;
88  
89          @Override
90          public void closed() throws IOException {
91              closedCount++;
92          }
93  
94          @Override
95          public void data(final byte[] buffer, final int offset, final int length) throws IOException {
96              dataBufferCount++;
97          }
98  
99          @Override
100         public void data(final int value) throws IOException {
101             dataCount++;
102         }
103 
104         @Override
105         public void error(final IOException exception) throws IOException {
106             errorCount++;
107         }
108 
109         @Override
110         public void finished() throws IOException {
111             finishedCount++;
112         }
113 
114         public long getClosedCount() {
115             return closedCount;
116         }
117 
118         public long getDataBufferCount() {
119             return dataBufferCount;
120         }
121 
122         public long getDataCount() {
123             return dataCount;
124         }
125 
126         public long getErrorCount() {
127             return errorCount;
128         }
129 
130         public long getFinishedCount() {
131             return finishedCount;
132         }
133 
134     }
135 
136     private ObservableInputStream brokenObservableInputStream() {
137         return new ObservableInputStream(BrokenInputStream.INSTANCE);
138     }
139 
140     private InputStream createInputStream() {
141         final byte[] buffer = MessageDigestInputStreamTest.generateRandomByteStream(IOUtils.DEFAULT_BUFFER_SIZE);
142         return createInputStream(new ByteArrayInputStream(buffer));
143     }
144 
145     private ObservableInputStream createInputStream(final InputStream origin) {
146         return new ObservableInputStream(origin);
147     }
148 
149     @Test
150     void testAfterReadConsumer() throws Exception {
151         final AtomicBoolean boolRef = new AtomicBoolean();
152         // @formatter:off
153         try (InputStream bounded = new ObservableInputStream.Builder()
154                 .setCharSequence("Hi")
155                 .setAfterRead(i -> boolRef.set(true))
156                 .get()) {
157             IOUtils.consume(bounded);
158         }
159         // @formatter:on
160         assertTrue(boolRef.get());
161         // Throwing
162         final String message = "test exception message";
163         // @formatter:off
164         try (InputStream bounded = new ObservableInputStream.Builder()
165                 .setCharSequence("Hi")
166                 .setAfterRead(i -> {
167                     throw new CustomIOException(message);
168                 })
169                 .get()) {
170             assertEquals(message, assertThrowsExactly(CustomIOException.class, () -> IOUtils.consume(bounded)).getMessage());
171         }
172         // @formatter:on
173     }
174 
175     @SuppressWarnings("resource")
176     @Test
177     void testAvailableAfterClose() throws Exception {
178         final InputStream shadow;
179         try (InputStream in = createInputStream()) {
180             assertTrue(in.available() > 0);
181             shadow = in;
182         }
183         assertEquals(0, shadow.available());
184     }
185 
186     @Test
187     void testAvailableAfterOpen() throws Exception {
188         try (InputStream in = createInputStream()) {
189             assertTrue(in.available() > 0);
190             assertNotEquals(IOUtils.EOF, in.read());
191             assertTrue(in.available() > 0);
192         }
193     }
194 
195     @Test
196     void testBrokenInputStreamRead() throws IOException {
197         try (ObservableInputStream ois = brokenObservableInputStream()) {
198             assertThrows(IOException.class, ois::read);
199         }
200     }
201 
202     @Test
203     void testBrokenInputStreamReadBuffer() throws IOException {
204         try (ObservableInputStream ois = brokenObservableInputStream()) {
205             assertThrows(IOException.class, () -> ois.read(new byte[1]));
206         }
207     }
208 
209     @Test
210     void testBrokenInputStreamReadSubBuffer() throws IOException {
211         try (ObservableInputStream ois = brokenObservableInputStream()) {
212             assertThrows(IOException.class, () -> ois.read(new byte[2], 0, 1));
213         }
214     }
215 
216     /**
217      * Tests that {@link Observer#data(int)} is called.
218      */
219     @Test
220     void testDataByteCalled_add() throws Exception {
221         final byte[] buffer = MessageDigestInputStreamTest.generateRandomByteStream(IOUtils.DEFAULT_BUFFER_SIZE);
222         final DataViewObserver lko = new DataViewObserver();
223         try (ObservableInputStream ois = new ObservableInputStream(new ByteArrayInputStream(buffer))) {
224             assertEquals(-1, lko.lastValue);
225             ois.read();
226             assertEquals(-1, lko.lastValue);
227             assertEquals(0, lko.getFinishedCount());
228             assertEquals(0, lko.getClosedCount());
229             ois.add(lko);
230             for (int i = 1; i < buffer.length; i++) {
231                 final int result = ois.read();
232                 assertEquals((byte) result, buffer[i]);
233                 assertEquals(result, lko.lastValue);
234                 assertEquals(0, lko.getFinishedCount());
235                 assertEquals(0, lko.getClosedCount());
236             }
237             final int result = ois.read();
238             assertEquals(-1, result);
239             assertEquals(1, lko.getFinishedCount());
240             assertEquals(0, lko.getClosedCount());
241             ois.close();
242             assertEquals(1, lko.getFinishedCount());
243             assertEquals(1, lko.getClosedCount());
244         }
245     }
246 
247     /**
248      * Tests that {@link Observer#data(int)} is called.
249      */
250     @Test
251     void testDataByteCalled_ctor() throws Exception {
252         final byte[] buffer = MessageDigestInputStreamTest.generateRandomByteStream(IOUtils.DEFAULT_BUFFER_SIZE);
253         final DataViewObserver lko = new DataViewObserver();
254         try (ObservableInputStream ois = new ObservableInputStream(new ByteArrayInputStream(buffer), lko)) {
255             assertEquals(-1, lko.lastValue);
256             ois.read();
257             assertNotEquals(-1, lko.lastValue);
258             assertEquals(0, lko.getFinishedCount());
259             assertEquals(0, lko.getClosedCount());
260             for (int i = 1; i < buffer.length; i++) {
261                 final int result = ois.read();
262                 assertEquals((byte) result, buffer[i]);
263                 assertEquals(result, lko.lastValue);
264                 assertEquals(0, lko.getFinishedCount());
265                 assertEquals(0, lko.getClosedCount());
266             }
267             final int result = ois.read();
268             assertEquals(-1, result);
269             assertEquals(1, lko.getFinishedCount());
270             assertEquals(0, lko.getClosedCount());
271             ois.close();
272             assertEquals(1, lko.getFinishedCount());
273             assertEquals(1, lko.getClosedCount());
274         }
275     }
276 
277     /**
278      * Tests that {@link Observer#data(byte[],int,int)} is called.
279      */
280     @Test
281     void testDataBytesCalled() throws Exception {
282         final byte[] buffer = MessageDigestInputStreamTest.generateRandomByteStream(IOUtils.DEFAULT_BUFFER_SIZE);
283         try (ByteArrayInputStream bais = new ByteArrayInputStream(buffer);
284                 ObservableInputStream ois = createInputStream(bais)) {
285             final DataViewObserver observer = new DataViewObserver();
286             final byte[] readBuffer = new byte[23];
287             assertNull(observer.buffer);
288             ois.read(readBuffer);
289             assertNull(observer.buffer);
290             ois.add(observer);
291             for (;;) {
292                 if (bais.available() >= 2048) {
293                     final int result = ois.read(readBuffer);
294                     if (result == -1) {
295                         ois.close();
296                         break;
297                     }
298                     assertEquals(readBuffer, observer.buffer);
299                     assertEquals(0, observer.offset);
300                     assertEquals(readBuffer.length, observer.length);
301                 } else {
302                     final int res = Math.min(11, bais.available());
303                     final int result = ois.read(readBuffer, 1, 11);
304                     if (result == -1) {
305                         ois.close();
306                         break;
307                     }
308                     assertEquals(readBuffer, observer.buffer);
309                     assertEquals(1, observer.offset);
310                     assertEquals(res, observer.length);
311                 }
312             }
313         }
314     }
315 
316     @Test
317     void testGetObservers0() throws IOException {
318         try (ObservableInputStream ois = new ObservableInputStream(new NullInputStream())) {
319             assertTrue(ois.getObservers().isEmpty());
320         }
321     }
322 
323     @Test
324     void testGetObservers1() throws IOException {
325         final DataViewObserver observer0 = new DataViewObserver();
326         try (ObservableInputStream ois = new ObservableInputStream(new NullInputStream(), observer0)) {
327             assertEquals(observer0, ois.getObservers().get(0));
328         }
329     }
330 
331     @Test
332     void testGetObserversOrder() throws IOException {
333         final DataViewObserver observer0 = new DataViewObserver();
334         final DataViewObserver observer1 = new DataViewObserver();
335         try (ObservableInputStream ois = new ObservableInputStream(new NullInputStream(), observer0, observer1)) {
336             assertEquals(observer0, ois.getObservers().get(0));
337             assertEquals(observer1, ois.getObservers().get(1));
338         }
339     }
340 
341     private void testNotificationCallbacks(final int bufferSize) throws IOException {
342         final byte[] buffer = IOUtils.byteArray();
343         final LengthObserver lengthObserver = new LengthObserver();
344         final MethodCountObserver methodCountObserver = new MethodCountObserver();
345         try (ObservableInputStream ois = new ObservableInputStream(new ByteArrayInputStream(buffer), lengthObserver, methodCountObserver)) {
346             assertEquals(IOUtils.DEFAULT_BUFFER_SIZE, IOUtils.copy(ois, NullOutputStream.INSTANCE, bufferSize));
347         }
348         assertEquals(IOUtils.DEFAULT_BUFFER_SIZE, lengthObserver.getTotal());
349         assertEquals(1, methodCountObserver.getClosedCount());
350         assertEquals(1, methodCountObserver.getFinishedCount());
351         assertEquals(0, methodCountObserver.getErrorCount());
352         assertEquals(0, methodCountObserver.getDataCount());
353         assertEquals(buffer.length / bufferSize, methodCountObserver.getDataBufferCount());
354     }
355 
356     @Test
357     void testNotificationCallbacksBufferSize1() throws Exception {
358         testNotificationCallbacks(1);
359     }
360 
361     @Test
362     void testNotificationCallbacksBufferSize2() throws Exception {
363         testNotificationCallbacks(2);
364     }
365 
366     @Test
367     void testNotificationCallbacksBufferSizeDefault() throws Exception {
368         testNotificationCallbacks(IOUtils.DEFAULT_BUFFER_SIZE);
369     }
370 
371     @Test
372     void testReadAfterClose_ByteArrayInputStream() throws Exception {
373         try (InputStream in = createInputStream()) {
374             in.close();
375             assertNotEquals(IOUtils.EOF, in.read());
376         }
377     }
378 
379     @SuppressWarnings("resource")
380     @Test
381     void testReadAfterClose_ChannelInputStream() throws Exception {
382         try (InputStream in = createInputStream(Files.newInputStream(Paths.get("src/test/resources/org/apache/commons/io/abitmorethan16k.txt")))) {
383             in.close();
384             // ChannelInputStream throws when closed
385             assertThrows(IOException.class, in::read);
386         }
387     }
388 
389 }