1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.commons.io;
18
19 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
20
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.Reader;
24 import java.io.StringReader;
25 import java.nio.charset.Charset;
26 import java.util.List;
27 import java.util.concurrent.ExecutorService;
28 import java.util.concurrent.Executors;
29 import java.util.concurrent.Future;
30 import java.util.stream.Collectors;
31 import java.util.stream.IntStream;
32 import java.util.stream.Stream;
33 import java.util.zip.CRC32;
34 import java.util.zip.Checksum;
35
36 import org.apache.commons.io.function.IOConsumer;
37 import org.apache.commons.io.input.ChecksumInputStream;
38 import org.junit.jupiter.params.ParameterizedTest;
39 import org.junit.jupiter.params.provider.MethodSource;
40
41
42
43
44 class IOUtilsConcurrentTest {
45
46 private static class ChecksumReader extends Reader {
47 private final CRC32 checksum;
48 private final long expectedChecksumValue;
49 private final Reader reader;
50
51 ChecksumReader(final Reader reader, final long expectedChecksumValue) {
52 this.reader = reader;
53 this.checksum = new CRC32();
54 this.expectedChecksumValue = expectedChecksumValue;
55 }
56
57 @Override
58 public void close() throws IOException {
59 reader.close();
60 }
61
62 public long getValue() {
63 return checksum.getValue();
64 }
65
66 @Override
67 public int read() throws IOException {
68 return super.read();
69 }
70
71 @Override
72 public int read(final char[] cbuf, final int off, final int len) throws IOException {
73 final int n = reader.read(cbuf, off, len);
74 if (n > 0) {
75 final byte[] bytes = new String(cbuf, off, n).getBytes(Charset.defaultCharset());
76 checksum.update(bytes, 0, bytes.length);
77 }
78 if (n == -1) {
79 final long actual = checksum.getValue();
80 if (actual != expectedChecksumValue) {
81 throw new IOException("Checksum mismatch: expected " + expectedChecksumValue + " but got " + actual);
82 }
83 }
84 return n;
85 }
86 }
87
88
89
90
91 private static final byte[][] BYTE_DATA;
92
93
94
95
96 private static final long[] BYTE_DATA_CHECKSUM;
97
98
99
100
101 private static final int RUNS_PER_THREAD = 16;
102
103
104
105
106 private static final int SIZE = IOUtils.DEFAULT_BUFFER_SIZE;
107
108
109
110
111 private static final String[] STRING_DATA;
112
113
114
115
116 private static final long[] STRING_DATA_CHECKSUM;
117
118
119
120
121 private static final int THREAD_COUNT = 16;
122
123
124
125
126 private static final int VARIANTS = 16;
127
128 static {
129 final Checksum checksum = new CRC32();
130
131 BYTE_DATA = new byte[VARIANTS][];
132 BYTE_DATA_CHECKSUM = new long[VARIANTS];
133 for (int variant = 0; variant < VARIANTS; variant++) {
134 final byte[] data = new byte[SIZE];
135 for (int i = 0; i < SIZE; i++) {
136 data[i] = (byte) ((i + variant) % 256);
137 }
138 BYTE_DATA[variant] = data;
139 checksum.reset();
140 checksum.update(data, 0 , data.length);
141 BYTE_DATA_CHECKSUM[variant] = checksum.getValue();
142 }
143
144 final char[] cdata = new char[SIZE];
145 STRING_DATA = new String[VARIANTS];
146 STRING_DATA_CHECKSUM = new long[VARIANTS];
147 for (int variant = 0; variant < VARIANTS; variant++) {
148 for (int i = 0; i < SIZE; i++) {
149 cdata[i] = (char) ((i + variant) % Character.MAX_VALUE);
150 }
151 STRING_DATA[variant] = new String(cdata);
152 checksum.reset();
153 final byte[] bytes = STRING_DATA[variant].getBytes(Charset.defaultCharset());
154 checksum.update(bytes, 0, bytes.length);
155 STRING_DATA_CHECKSUM[variant] = checksum.getValue();
156 }
157 }
158
159 static Stream<IOConsumer<InputStream>> testConcurrentInputStreamTasks() {
160 return Stream.of(
161 IOUtils::consume,
162 in -> IOUtils.skip(in, Long.MAX_VALUE),
163 in -> IOUtils.skipFully(in, SIZE),
164 IOUtils::toByteArray,
165 in -> IOUtils.toByteArray(in, SIZE),
166 in -> IOUtils.toByteArray(in, SIZE, 512)
167 );
168 }
169
170 static Stream<IOConsumer<Reader>> testConcurrentReaderTasks() {
171 return Stream.of(
172 IOUtils::consume,
173 reader -> IOUtils.skip(reader, Long.MAX_VALUE),
174 reader -> IOUtils.skipFully(reader, SIZE),
175 reader -> IOUtils.toByteArray(reader, Charset.defaultCharset())
176 );
177 }
178
179 @ParameterizedTest
180 @MethodSource
181 void testConcurrentInputStreamTasks(final IOConsumer<InputStream> consumer) throws InterruptedException {
182 final ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
183 try {
184 final List<Future<Void>> futures = IntStream.range(0, THREAD_COUNT * RUNS_PER_THREAD)
185 .<Future<Void>>mapToObj(i -> threadPool.submit(() -> {
186 try (InputStream in = ChecksumInputStream
187 .builder()
188 .setByteArray(BYTE_DATA[i % VARIANTS])
189 .setChecksum(new CRC32())
190 .setExpectedChecksumValue(BYTE_DATA_CHECKSUM[i % VARIANTS])
191 .get()) {
192 consumer.accept(in);
193 }
194 return null;
195 })).collect(Collectors.toList());
196 futures.forEach(f -> assertDoesNotThrow(() -> f.get()));
197 } finally {
198 threadPool.shutdownNow();
199 }
200 }
201
202 @ParameterizedTest
203 @MethodSource
204 void testConcurrentReaderTasks(final IOConsumer<Reader> consumer) throws InterruptedException {
205 final ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
206 try {
207 final List<Future<Void>> futures = IntStream.range(0, THREAD_COUNT * RUNS_PER_THREAD)
208 .<Future<Void>>mapToObj(i -> threadPool.submit(() -> {
209 try (Reader reader = new ChecksumReader(new StringReader(STRING_DATA[i % VARIANTS]), STRING_DATA_CHECKSUM[i % VARIANTS])) {
210 consumer.accept(reader);
211 }
212 return null;
213 })).collect(Collectors.toList());
214 futures.forEach(f -> assertDoesNotThrow(() -> f.get()));
215 } finally {
216 threadPool.shutdownNow();
217 }
218 }
219 }