View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one or more
3    *  contributor license agreements.  See the NOTICE file distributed with
4    *  this work for additional information regarding copyright ownership.
5    *  The ASF licenses this file to You under the Apache License, Version 2.0
6    *  (the "License"); you may not use this file except in compliance with
7    *  the License.  You may obtain a copy of the License at
8    *
9    *      https://www.apache.org/licenses/LICENSE-2.0
10   *
11   *  Unless required by applicable law or agreed to in writing, software
12   *  distributed under the License is distributed on an "AS IS" BASIS,
13   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   *  See the License for the specific language governing permissions and
15   *  limitations under the License.
16   */
17  package org.apache.commons.io;
18  
19  import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
20  
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.Reader;
24  import java.io.StringReader;
25  import java.nio.charset.Charset;
26  import java.util.List;
27  import java.util.concurrent.ExecutorService;
28  import java.util.concurrent.Executors;
29  import java.util.concurrent.Future;
30  import java.util.stream.Collectors;
31  import java.util.stream.IntStream;
32  import java.util.stream.Stream;
33  import java.util.zip.CRC32;
34  import java.util.zip.Checksum;
35  
36  import org.apache.commons.io.function.IOConsumer;
37  import org.apache.commons.io.input.ChecksumInputStream;
38  import org.junit.jupiter.params.ParameterizedTest;
39  import org.junit.jupiter.params.provider.MethodSource;
40  
41  /**
42   * Tests {@link IOUtils} methods in a concurrent environment.
43   */
44  class IOUtilsConcurrentTest {
45  
46      private static class ChecksumReader extends Reader {
47          private final CRC32 checksum;
48          private final long expectedChecksumValue;
49          private final Reader reader;
50  
51          ChecksumReader(final Reader reader, final long expectedChecksumValue) {
52              this.reader = reader;
53              this.checksum = new CRC32();
54              this.expectedChecksumValue = expectedChecksumValue;
55          }
56  
57          @Override
58          public void close() throws IOException {
59              reader.close();
60          }
61  
62          public long getValue() {
63              return checksum.getValue();
64          }
65  
66          @Override
67          public int read() throws IOException {
68              return super.read();
69          }
70  
71          @Override
72          public int read(final char[] cbuf, final int off, final int len) throws IOException {
73              final int n = reader.read(cbuf, off, len);
74              if (n > 0) {
75                  final byte[] bytes = new String(cbuf, off, n).getBytes(Charset.defaultCharset());
76                  checksum.update(bytes, 0, bytes.length);
77              }
78              if (n == -1) {
79                  final long actual = checksum.getValue();
80                  if (actual != expectedChecksumValue) {
81                      throw new IOException("Checksum mismatch: expected " + expectedChecksumValue + " but got " + actual);
82                  }
83              }
84              return n;
85          }
86      }
87  
88      /**
89       * Test data for InputStream tests.
90       */
91      private static final byte[][] BYTE_DATA;
92  
93      /**
94       * Checksum values for {@link #BYTE_DATA}.
95       */
96      private static final long[] BYTE_DATA_CHECKSUM;
97  
98      /**
99       * Number of runs per thread (to increase the chance of collisions).
100      */
101     private static final int RUNS_PER_THREAD = 16;
102 
103     /**
104      * Size of test data.
105      */
106     private static final int SIZE = IOUtils.DEFAULT_BUFFER_SIZE;
107 
108     /**
109      * Test data for Reader tests.
110      */
111     private static final String[] STRING_DATA;
112 
113     /**
114      * Checksum values for {@link #STRING_DATA}.
115      */
116     private static final long[] STRING_DATA_CHECKSUM;
117 
118     /**
119      * Number of threads to use.
120      */
121     private static final int THREAD_COUNT = 16;
122 
123     /**
124      * Number of data variants (to increase the chance of collisions).
125      */
126     private static final int VARIANTS = 16;
127 
128     static {
129         final Checksum checksum = new CRC32();
130         // Byte data
131         BYTE_DATA = new byte[VARIANTS][];
132         BYTE_DATA_CHECKSUM = new long[VARIANTS];
133         for (int variant = 0; variant < VARIANTS; variant++) {
134             final byte[] data = new byte[SIZE];
135             for (int i = 0; i < SIZE; i++) {
136                 data[i] = (byte) ((i + variant) % 256);
137             }
138             BYTE_DATA[variant] = data;
139             checksum.reset();
140             checksum.update(data, 0 , data.length);
141             BYTE_DATA_CHECKSUM[variant] = checksum.getValue();
142         }
143         // Char data
144         final char[] cdata = new char[SIZE];
145         STRING_DATA = new String[VARIANTS];
146         STRING_DATA_CHECKSUM = new long[VARIANTS];
147         for (int variant = 0; variant < VARIANTS; variant++) {
148             for (int i = 0; i < SIZE; i++) {
149                 cdata[i] = (char) ((i + variant) % Character.MAX_VALUE);
150             }
151             STRING_DATA[variant] = new String(cdata);
152             checksum.reset();
153             final byte[] bytes = STRING_DATA[variant].getBytes(Charset.defaultCharset());
154             checksum.update(bytes, 0, bytes.length);
155             STRING_DATA_CHECKSUM[variant] = checksum.getValue();
156         }
157     }
158 
159     static Stream<IOConsumer<InputStream>> testConcurrentInputStreamTasks() {
160         return Stream.of(
161                 IOUtils::consume,
162                 in -> IOUtils.skip(in, Long.MAX_VALUE),
163                 in -> IOUtils.skipFully(in, SIZE),
164                 IOUtils::toByteArray,
165                 in -> IOUtils.toByteArray(in, SIZE),
166                 in -> IOUtils.toByteArray(in, SIZE, 512)
167         );
168     }
169 
170     static Stream<IOConsumer<Reader>> testConcurrentReaderTasks() {
171         return Stream.of(
172                 IOUtils::consume,
173                 reader -> IOUtils.skip(reader, Long.MAX_VALUE),
174                 reader -> IOUtils.skipFully(reader, SIZE),
175                 reader -> IOUtils.toByteArray(reader, Charset.defaultCharset())
176         );
177     }
178 
179     @ParameterizedTest
180     @MethodSource
181     void testConcurrentInputStreamTasks(final IOConsumer<InputStream> consumer) throws InterruptedException {
182         final ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
183         try {
184             final List<Future<Void>> futures = IntStream.range(0, THREAD_COUNT * RUNS_PER_THREAD)
185                     .<Future<Void>>mapToObj(i -> threadPool.submit(() -> {
186                         try (InputStream in = ChecksumInputStream
187                                 .builder()
188                                 .setByteArray(BYTE_DATA[i % VARIANTS])
189                                 .setChecksum(new CRC32())
190                                 .setExpectedChecksumValue(BYTE_DATA_CHECKSUM[i % VARIANTS])
191                                 .get()) {
192                             consumer.accept(in);
193                         }
194                         return null;
195                     })).collect(Collectors.toList());
196             futures.forEach(f -> assertDoesNotThrow(() -> f.get()));
197         } finally {
198             threadPool.shutdownNow();
199         }
200     }
201 
202     @ParameterizedTest
203     @MethodSource
204     void testConcurrentReaderTasks(final IOConsumer<Reader> consumer) throws InterruptedException {
205         final ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
206         try {
207             final List<Future<Void>> futures = IntStream.range(0, THREAD_COUNT * RUNS_PER_THREAD)
208                     .<Future<Void>>mapToObj(i -> threadPool.submit(() -> {
209                         try (Reader reader = new ChecksumReader(new StringReader(STRING_DATA[i % VARIANTS]), STRING_DATA_CHECKSUM[i % VARIANTS])) {
210                             consumer.accept(reader);
211                         }
212                         return null;
213                     })).collect(Collectors.toList());
214             futures.forEach(f -> assertDoesNotThrow(() -> f.get()));
215         } finally {
216             threadPool.shutdownNow();
217         }
218     }
219 }