1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.commons.io;
18
19 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
20
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.Reader;
24 import java.io.StringReader;
25 import java.nio.charset.Charset;
26 import java.util.List;
27 import java.util.concurrent.ExecutorService;
28 import java.util.concurrent.Executors;
29 import java.util.concurrent.Future;
30 import java.util.stream.Collectors;
31 import java.util.stream.IntStream;
32 import java.util.stream.Stream;
33 import java.util.zip.CRC32;
34 import java.util.zip.Checksum;
35
36 import org.apache.commons.io.function.IOConsumer;
37 import org.apache.commons.io.input.ChecksumInputStream;
38 import org.junit.jupiter.params.ParameterizedTest;
39 import org.junit.jupiter.params.provider.MethodSource;
40
41
42
43
44 class IOUtilsConcurrentTest {
45
46 private static class ChecksumReader extends Reader {
47 private final CRC32 checksum;
48 private final long expectedChecksumValue;
49 private final Reader reader;
50
51 ChecksumReader(final Reader reader, final long expectedChecksumValue) {
52 this.reader = reader;
53 this.checksum = new CRC32();
54 this.expectedChecksumValue = expectedChecksumValue;
55 }
56
57 @Override
58 public void close() throws IOException {
59 reader.close();
60 }
61
62 public long getValue() {
63 return checksum.getValue();
64 }
65
66 @Override
67 public int read() throws IOException {
68 return super.read();
69 }
70
71 @Override
72 public int read(final char[] cbuf, final int off, final int len) throws IOException {
73 final int n = reader.read(cbuf, off, len);
74 if (n > 0) {
75 final byte[] bytes = new String(cbuf, off, n).getBytes(Charset.defaultCharset());
76 checksum.update(bytes, 0, bytes.length);
77 }
78 if (n == -1) {
79 final long actual = checksum.getValue();
80 if (actual != expectedChecksumValue) {
81 throw new IOException("Checksum mismatch: expected " + expectedChecksumValue + " but got " + actual);
82 }
83 }
84 return n;
85 }
86 }
87
88
89
90
91 private static final byte[][] BYTE_DATA;
92
93
94
95 private static final long[] BYTE_DATA_CHECKSUM;
96
97
98
99 private static final int RUNS_PER_THREAD = 16;
100
101
102
103 private static final int SIZE = IOUtils.DEFAULT_BUFFER_SIZE;
104
105
106
107 private static final String[] STRING_DATA;
108
109
110
111 private static final long[] STRING_DATA_CHECKSUM;
112
113
114
115 private static final int THREAD_COUNT = 16;
116
117
118
119 private static final int VARIANTS = 16;
120
121 static {
122 final Checksum checksum = new CRC32();
123
124 BYTE_DATA = new byte[VARIANTS][];
125 BYTE_DATA_CHECKSUM = new long[VARIANTS];
126 for (int variant = 0; variant < VARIANTS; variant++) {
127 final byte[] data = new byte[SIZE];
128 for (int i = 0; i < SIZE; i++) {
129 data[i] = (byte) ((i + variant) % 256);
130 }
131 BYTE_DATA[variant] = data;
132 checksum.reset();
133 checksum.update(data, 0 , data.length);
134 BYTE_DATA_CHECKSUM[variant] = checksum.getValue();
135 }
136
137 final char[] cdata = new char[SIZE];
138 STRING_DATA = new String[VARIANTS];
139 STRING_DATA_CHECKSUM = new long[VARIANTS];
140 for (int variant = 0; variant < VARIANTS; variant++) {
141 for (int i = 0; i < SIZE; i++) {
142 cdata[i] = (char) ((i + variant) % Character.MAX_VALUE);
143 }
144 STRING_DATA[variant] = new String(cdata);
145 checksum.reset();
146 final byte[] bytes = STRING_DATA[variant].getBytes(Charset.defaultCharset());
147 checksum.update(bytes, 0, bytes.length);
148 STRING_DATA_CHECKSUM[variant] = checksum.getValue();
149 }
150 }
151
152 static Stream<IOConsumer<InputStream>> testConcurrentInputStreamTasks() {
153 return Stream.of(
154 IOUtils::consume,
155 in -> IOUtils.skip(in, Long.MAX_VALUE),
156 in -> IOUtils.skipFully(in, SIZE),
157 IOUtils::toByteArray,
158 in -> IOUtils.toByteArray(in, SIZE),
159 in -> IOUtils.toByteArray(in, SIZE, 512)
160 );
161 }
162
163 static Stream<IOConsumer<Reader>> testConcurrentReaderTasks() {
164 return Stream.of(
165 IOUtils::consume,
166 reader -> IOUtils.skip(reader, Long.MAX_VALUE),
167 reader -> IOUtils.skipFully(reader, SIZE),
168 reader -> IOUtils.toByteArray(reader, Charset.defaultCharset())
169 );
170 }
171
172 @ParameterizedTest
173 @MethodSource
174 void testConcurrentInputStreamTasks(final IOConsumer<InputStream> consumer) throws InterruptedException {
175 final ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
176 try {
177 final List<Future<Void>> futures = IntStream.range(0, THREAD_COUNT * RUNS_PER_THREAD)
178 .<Future<Void>>mapToObj(i -> threadPool.submit(() -> {
179 try (InputStream in = ChecksumInputStream
180 .builder()
181 .setByteArray(BYTE_DATA[i % VARIANTS])
182 .setChecksum(new CRC32())
183 .setExpectedChecksumValue(BYTE_DATA_CHECKSUM[i % VARIANTS])
184 .get()) {
185 consumer.accept(in);
186 }
187 return null;
188 })).collect(Collectors.toList());
189 futures.forEach(f -> assertDoesNotThrow(() -> f.get()));
190 } finally {
191 threadPool.shutdownNow();
192 }
193 }
194
195 @ParameterizedTest
196 @MethodSource
197 void testConcurrentReaderTasks(final IOConsumer<Reader> consumer) throws InterruptedException {
198 final ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
199 try {
200 final List<Future<Void>> futures = IntStream.range(0, THREAD_COUNT * RUNS_PER_THREAD)
201 .<Future<Void>>mapToObj(i -> threadPool.submit(() -> {
202 try (Reader reader = new ChecksumReader(new StringReader(STRING_DATA[i % VARIANTS]), STRING_DATA_CHECKSUM[i % VARIANTS])) {
203 consumer.accept(reader);
204 }
205 return null;
206 })).collect(Collectors.toList());
207 futures.forEach(f -> assertDoesNotThrow(() -> f.get()));
208 } finally {
209 threadPool.shutdownNow();
210 }
211 }
212 }