View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one or more
3    *  contributor license agreements.  See the NOTICE file distributed with
4    *  this work for additional information regarding copyright ownership.
5    *  The ASF licenses this file to You under the Apache License, Version 2.0
6    *  (the "License"); you may not use this file except in compliance with
7    *  the License.  You may obtain a copy of the License at
8    *
9    *      https://www.apache.org/licenses/LICENSE-2.0
10   *
11   *  Unless required by applicable law or agreed to in writing, software
12   *  distributed under the License is distributed on an "AS IS" BASIS,
13   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   *  See the License for the specific language governing permissions and
15   *  limitations under the License.
16   */
17  package org.apache.commons.io;
18  
19  import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
20  
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.Reader;
24  import java.io.StringReader;
25  import java.nio.charset.Charset;
26  import java.util.List;
27  import java.util.concurrent.ExecutorService;
28  import java.util.concurrent.Executors;
29  import java.util.concurrent.Future;
30  import java.util.stream.Collectors;
31  import java.util.stream.IntStream;
32  import java.util.stream.Stream;
33  import java.util.zip.CRC32;
34  import java.util.zip.Checksum;
35  
36  import org.apache.commons.io.function.IOConsumer;
37  import org.apache.commons.io.input.ChecksumInputStream;
38  import org.junit.jupiter.params.ParameterizedTest;
39  import org.junit.jupiter.params.provider.MethodSource;
40  
41  /**
42   * Tests {@link IOUtils} methods in a concurrent environment.
43   */
44  class IOUtilsConcurrentTest {
45  
46      private static class ChecksumReader extends Reader {
47          private final CRC32 checksum;
48          private final long expectedChecksumValue;
49          private final Reader reader;
50  
51          ChecksumReader(final Reader reader, final long expectedChecksumValue) {
52              this.reader = reader;
53              this.checksum = new CRC32();
54              this.expectedChecksumValue = expectedChecksumValue;
55          }
56  
57          @Override
58          public void close() throws IOException {
59              reader.close();
60          }
61  
62          public long getValue() {
63              return checksum.getValue();
64          }
65  
66          @Override
67          public int read() throws IOException {
68              return super.read();
69          }
70  
71          @Override
72          public int read(final char[] cbuf, final int off, final int len) throws IOException {
73              final int n = reader.read(cbuf, off, len);
74              if (n > 0) {
75                  final byte[] bytes = new String(cbuf, off, n).getBytes(Charset.defaultCharset());
76                  checksum.update(bytes, 0, bytes.length);
77              }
78              if (n == -1) {
79                  final long actual = checksum.getValue();
80                  if (actual != expectedChecksumValue) {
81                      throw new IOException("Checksum mismatch: expected " + expectedChecksumValue + " but got " + actual);
82                  }
83              }
84              return n;
85          }
86      }
87  
88      /**
89       * Test data for InputStream tests.
90       */
91      private static final byte[][] BYTE_DATA;
92      /**
93       * Checksum values for {@link #BYTE_DATA}.
94       */
95      private static final long[] BYTE_DATA_CHECKSUM;
96      /**
97       * Number of runs per thread (to increase the chance of collisions).
98       */
99      private static final int RUNS_PER_THREAD = 16;
100     /**
101      * Size of test data.
102      */
103     private static final int SIZE = IOUtils.DEFAULT_BUFFER_SIZE;
104     /**
105      * Test data for Reader tests.
106      */
107     private static final String[] STRING_DATA;
108     /**
109      * Checksum values for {@link #STRING_DATA}.
110      */
111     private static final long[] STRING_DATA_CHECKSUM;
112     /**
113      * Number of threads to use.
114      */
115     private static final int THREAD_COUNT = 16;
116     /**
117      * Number of data variants (to increase the chance of collisions).
118      */
119     private static final int VARIANTS = 16;
120 
121     static {
122         final Checksum checksum = new CRC32();
123         // Byte data
124         BYTE_DATA = new byte[VARIANTS][];
125         BYTE_DATA_CHECKSUM = new long[VARIANTS];
126         for (int variant = 0; variant < VARIANTS; variant++) {
127             final byte[] data = new byte[SIZE];
128             for (int i = 0; i < SIZE; i++) {
129                 data[i] = (byte) ((i + variant) % 256);
130             }
131             BYTE_DATA[variant] = data;
132             checksum.reset();
133             checksum.update(data, 0 , data.length);
134             BYTE_DATA_CHECKSUM[variant] = checksum.getValue();
135         }
136         // Char data
137         final char[] cdata = new char[SIZE];
138         STRING_DATA = new String[VARIANTS];
139         STRING_DATA_CHECKSUM = new long[VARIANTS];
140         for (int variant = 0; variant < VARIANTS; variant++) {
141             for (int i = 0; i < SIZE; i++) {
142                 cdata[i] = (char) ((i + variant) % Character.MAX_VALUE);
143             }
144             STRING_DATA[variant] = new String(cdata);
145             checksum.reset();
146             final byte[] bytes = STRING_DATA[variant].getBytes(Charset.defaultCharset());
147             checksum.update(bytes, 0, bytes.length);
148             STRING_DATA_CHECKSUM[variant] = checksum.getValue();
149         }
150     }
151 
152     static Stream<IOConsumer<InputStream>> testConcurrentInputStreamTasks() {
153         return Stream.of(
154                 IOUtils::consume,
155                 in -> IOUtils.skip(in, Long.MAX_VALUE),
156                 in -> IOUtils.skipFully(in, SIZE),
157                 IOUtils::toByteArray,
158                 in -> IOUtils.toByteArray(in, SIZE),
159                 in -> IOUtils.toByteArray(in, SIZE, 512)
160         );
161     }
162 
163     static Stream<IOConsumer<Reader>> testConcurrentReaderTasks() {
164         return Stream.of(
165                 IOUtils::consume,
166                 reader -> IOUtils.skip(reader, Long.MAX_VALUE),
167                 reader -> IOUtils.skipFully(reader, SIZE),
168                 reader -> IOUtils.toByteArray(reader, Charset.defaultCharset())
169         );
170     }
171 
172     @ParameterizedTest
173     @MethodSource
174     void testConcurrentInputStreamTasks(final IOConsumer<InputStream> consumer) throws InterruptedException {
175         final ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
176         try {
177             final List<Future<Void>> futures = IntStream.range(0, THREAD_COUNT * RUNS_PER_THREAD)
178                     .<Future<Void>>mapToObj(i -> threadPool.submit(() -> {
179                         try (InputStream in = ChecksumInputStream
180                                 .builder()
181                                 .setByteArray(BYTE_DATA[i % VARIANTS])
182                                 .setChecksum(new CRC32())
183                                 .setExpectedChecksumValue(BYTE_DATA_CHECKSUM[i % VARIANTS])
184                                 .get()) {
185                             consumer.accept(in);
186                         }
187                         return null;
188                     })).collect(Collectors.toList());
189             futures.forEach(f -> assertDoesNotThrow(() -> f.get()));
190         } finally {
191             threadPool.shutdownNow();
192         }
193     }
194 
195     @ParameterizedTest
196     @MethodSource
197     void testConcurrentReaderTasks(final IOConsumer<Reader> consumer) throws InterruptedException {
198         final ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
199         try {
200             final List<Future<Void>> futures = IntStream.range(0, THREAD_COUNT * RUNS_PER_THREAD)
201                     .<Future<Void>>mapToObj(i -> threadPool.submit(() -> {
202                         try (Reader reader = new ChecksumReader(new StringReader(STRING_DATA[i % VARIANTS]), STRING_DATA_CHECKSUM[i % VARIANTS])) {
203                             consumer.accept(reader);
204                         }
205                         return null;
206                     })).collect(Collectors.toList());
207             futures.forEach(f -> assertDoesNotThrow(() -> f.get()));
208         } finally {
209             threadPool.shutdownNow();
210         }
211     }
212 }