1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.commons.compress.archivers.zip;
20
21 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
22 import static org.junit.jupiter.api.Assertions.assertEquals;
23 import static org.junit.jupiter.api.Assertions.assertNotNull;
24 import static org.junit.jupiter.api.Assertions.assertThrows;
25 import static org.junit.jupiter.api.Assertions.assertTrue;
26
27 import java.io.ByteArrayInputStream;
28 import java.io.ByteArrayOutputStream;
29 import java.io.File;
30 import java.io.IOException;
31 import java.io.InputStream;
32 import java.nio.charset.StandardCharsets;
33 import java.nio.file.Files;
34 import java.nio.file.Path;
35 import java.nio.file.Paths;
36 import java.util.Enumeration;
37 import java.util.HashMap;
38 import java.util.LinkedList;
39 import java.util.Map;
40 import java.util.concurrent.Callable;
41 import java.util.concurrent.ExecutionException;
42 import java.util.concurrent.ExecutorService;
43 import java.util.concurrent.Executors;
44 import java.util.function.Consumer;
45 import java.util.function.Function;
46 import java.util.zip.Deflater;
47 import java.util.zip.ZipEntry;
48
49 import org.apache.commons.compress.AbstractTempDirTest;
50 import org.apache.commons.compress.AbstractTest;
51 import org.apache.commons.compress.parallel.FileBasedScatterGatherBackingStore;
52 import org.apache.commons.compress.parallel.InputStreamSupplier;
53 import org.apache.commons.compress.parallel.ScatterGatherBackingStoreSupplier;
54 import org.apache.commons.io.IOUtils;
55 import org.junit.jupiter.api.Disabled;
56 import org.junit.jupiter.api.Test;
57
58 class ParallelScatterZipCreatorTest extends AbstractTempDirTest {
59
60 private interface CallableConsumer extends Consumer<Callable<? extends ScatterZipOutputStream>> {
61
62 }
63
64 private interface CallableConsumerSupplier extends Function<ParallelScatterZipCreator, CallableConsumer> {
65
66 }
67
68 private static final long EXPECTED_FILE_SIZE = 1024 * 1024;
69
70 private static final int EXPECTED_FILES_NUMBER = 50;
71 private final int NUMITEMS = 5000;
72
73 private void callableApi(final CallableConsumerSupplier consumerSupplier, final File result) throws Exception {
74 callableApi(consumerSupplier, Deflater.DEFAULT_COMPRESSION, result);
75 }
76
77 private void callableApi(final CallableConsumerSupplier consumerSupplier, final int compressionLevel, final File result) throws Exception {
78 final Map<String, byte[]> entries;
79 final ParallelScatterZipCreator zipCreator;
80 try (ZipArchiveOutputStream zos = new ZipArchiveOutputStream(result)) {
81 zos.setEncoding(StandardCharsets.UTF_8.name());
82 final ExecutorService es = Executors.newFixedThreadPool(1);
83
84 final ScatterGatherBackingStoreSupplier supp = () -> new FileBasedScatterGatherBackingStore(createTempFile("parallelscatter", "n1"));
85
86 zipCreator = new ParallelScatterZipCreator(es, supp, compressionLevel);
87 entries = writeEntriesAsCallable(zipCreator, consumerSupplier.apply(zipCreator));
88 zipCreator.writeTo(zos);
89 }
90
91 removeEntriesFoundInZipFile(result, entries);
92 assertTrue(entries.isEmpty());
93 assertNotNull(zipCreator.getStatisticsMessage());
94 }
95
96 private void callableApiWithTestFiles(final CallableConsumerSupplier consumerSupplier, final int compressionLevel, final File result) throws Exception {
97 final ParallelScatterZipCreator zipCreator;
98 final Map<String, byte[]> entries;
99 try (ZipArchiveOutputStream zos = new ZipArchiveOutputStream(result)) {
100 zos.setEncoding(StandardCharsets.UTF_8.name());
101 final ExecutorService es = Executors.newFixedThreadPool(1);
102
103 final ScatterGatherBackingStoreSupplier supp = () -> new FileBasedScatterGatherBackingStore(createTempFile("parallelscatter", "n1"));
104
105 zipCreator = new ParallelScatterZipCreator(es, supp, compressionLevel);
106 entries = writeTestFilesAsCallable(zipCreator, consumerSupplier.apply(zipCreator));
107 zipCreator.writeTo(zos);
108 }
109
110
111 try (ZipFile zf = ZipFile.builder().setFile(result).get()) {
112 final Enumeration<ZipArchiveEntry> entriesInPhysicalOrder = zf.getEntriesInPhysicalOrder();
113 while (entriesInPhysicalOrder.hasMoreElements()) {
114 final ZipArchiveEntry zipArchiveEntry = entriesInPhysicalOrder.nextElement();
115 try (InputStream inputStream = zf.getInputStream(zipArchiveEntry)) {
116 final byte[] actual = IOUtils.toByteArray(inputStream);
117 final byte[] expected = entries.remove(zipArchiveEntry.getName());
118 assertArrayEquals(expected, actual, "For " + zipArchiveEntry.getName());
119 }
120 }
121 }
122 assertNotNull(zipCreator.getStatisticsMessage());
123 }
124
125 private ZipArchiveEntry createZipArchiveEntry(final Map<String, byte[]> entries, final int i, final byte[] payloadBytes) {
126 final ZipArchiveEntry za = new ZipArchiveEntry("file" + i);
127 entries.put(za.getName(), payloadBytes);
128 za.setMethod(ZipEntry.DEFLATED);
129 za.setSize(payloadBytes.length);
130 za.setUnixMode(UnixStat.FILE_FLAG | 0664);
131 return za;
132 }
133
134 private void removeEntriesFoundInZipFile(final File result, final Map<String, byte[]> entries) throws IOException {
135 try (ZipFile zf = ZipFile.builder().setFile(result).get()) {
136 final Enumeration<ZipArchiveEntry> entriesInPhysicalOrder = zf.getEntriesInPhysicalOrder();
137 int i = 0;
138 while (entriesInPhysicalOrder.hasMoreElements()) {
139 final ZipArchiveEntry zipArchiveEntry = entriesInPhysicalOrder.nextElement();
140 try (InputStream inputStream = zf.getInputStream(zipArchiveEntry)) {
141 final byte[] actual = IOUtils.toByteArray(inputStream);
142 final byte[] expected = entries.remove(zipArchiveEntry.getName());
143 assertArrayEquals(expected, actual, "For " + zipArchiveEntry.getName());
144 }
145
146 assertEquals("file" + i++, zipArchiveEntry.getName(), "For " + zipArchiveEntry.getName());
147 }
148 }
149 }
150
151 @Test
152 @Disabled("[COMPRESS-639]")
153 public void sameZipArchiveEntryNullPointerException() throws IOException, ExecutionException, InterruptedException {
154 final ByteArrayOutputStream testOutputStream = new ByteArrayOutputStream();
155
156 final String fileContent = "A";
157 final int NUM_OF_FILES = 100;
158 final LinkedList<InputStream> inputStreams = new LinkedList<>();
159 for (int i = 0; i < NUM_OF_FILES; i++) {
160 inputStreams.add(new ByteArrayInputStream(fileContent.getBytes(StandardCharsets.UTF_8)));
161 }
162
163 final ParallelScatterZipCreator zipCreator = new ParallelScatterZipCreator();
164 try (ZipArchiveOutputStream zipArchiveOutputStream = new ZipArchiveOutputStream(testOutputStream)) {
165 zipArchiveOutputStream.setUseZip64(Zip64Mode.Always);
166
167 for (final InputStream inputStream : inputStreams) {
168 final ZipArchiveEntry zipArchiveEntry = new ZipArchiveEntry("./dir/myfile.txt");
169 zipArchiveEntry.setMethod(ZipEntry.DEFLATED);
170 zipCreator.addArchiveEntry(zipArchiveEntry, () -> inputStream);
171 }
172
173 zipCreator.writeTo(zipArchiveOutputStream);
174 }
175 }
176
177 @Test
178 void testCallableApiUsingSubmit() throws Exception {
179 final File result = createTempFile("parallelScatterGather2", "");
180 callableApi(zipCreator -> zipCreator::submit, result);
181 }
182
183 @Test
184 void testCallableApiUsingSubmitStreamAwareCallable() throws Exception {
185 final File result = createTempFile("parallelScatterGather3", "");
186 callableApi(zipCreator -> zipCreator::submitStreamAwareCallable, result);
187 }
188
189 @Test
190 void testCallableApiWithHighestLevelUsingSubmitStreamAwareCallable() throws Exception {
191 final File result = createTempFile("parallelScatterGather5", "");
192 callableApiWithTestFiles(zipCreator -> zipCreator::submitStreamAwareCallable, Deflater.BEST_COMPRESSION, result);
193 }
194
195 @Test
196 void testCallableWithLowestLevelApiUsingSubmit() throws Exception {
197 final File result = createTempFile("parallelScatterGather4", "");
198 callableApiWithTestFiles(zipCreator -> zipCreator::submit, Deflater.NO_COMPRESSION, result);
199 }
200
201 @Test
202 void testConcurrentCustomTempFolder() throws Exception {
203 final File result = createTempFile("parallelScatterGather1", "");
204 final ParallelScatterZipCreator zipCreator;
205 final Map<String, byte[]> entries;
206 try (ZipArchiveOutputStream zos = new ZipArchiveOutputStream(result)) {
207 zos.setEncoding(StandardCharsets.UTF_8.name());
208
209
210 final Path dir = Paths.get("target/custom-temp-dir");
211 Files.createDirectories(dir);
212 zipCreator = new ParallelScatterZipCreator(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()),
213 new DefaultBackingStoreSupplier(dir));
214
215
216 entries = writeEntries(zipCreator);
217 zipCreator.writeTo(zos);
218 }
219 removeEntriesFoundInZipFile(result, entries);
220 assertTrue(entries.isEmpty());
221 assertNotNull(zipCreator.getStatisticsMessage());
222 }
223
224 @Test
225 void testConcurrentDefaultTempFolder() throws Exception {
226 final File result = createTempFile("parallelScatterGather1", "");
227 final ParallelScatterZipCreator zipCreator;
228 final Map<String, byte[]> entries;
229 try (ZipArchiveOutputStream zos = new ZipArchiveOutputStream(result)) {
230 zos.setEncoding(StandardCharsets.UTF_8.name());
231 zipCreator = new ParallelScatterZipCreator();
232
233 entries = writeEntries(zipCreator);
234 zipCreator.writeTo(zos);
235 }
236 removeEntriesFoundInZipFile(result, entries);
237 assertTrue(entries.isEmpty());
238 assertNotNull(zipCreator.getStatisticsMessage());
239 }
240
241 @Test
242 void testThrowsExceptionWithCompressionLevelTooBig() {
243 final int compressLevelTooBig = Deflater.BEST_COMPRESSION + 1;
244 final ExecutorService es = Executors.newFixedThreadPool(1);
245 assertThrows(IllegalArgumentException.class, () -> new ParallelScatterZipCreator(es,
246 () -> new FileBasedScatterGatherBackingStore(createTempFile("parallelscatter", "n1")), compressLevelTooBig));
247 es.shutdownNow();
248 }
249
250 @Test
251 void testThrowsExceptionWithCompressionLevelTooSmall() {
252 final int compressLevelTooSmall = Deflater.DEFAULT_COMPRESSION - 1;
253 final ExecutorService es = Executors.newFixedThreadPool(1);
254 assertThrows(IllegalArgumentException.class, () -> new ParallelScatterZipCreator(es,
255 () -> new FileBasedScatterGatherBackingStore(createTempFile("parallelscatter", "n1")), compressLevelTooSmall));
256 es.shutdownNow();
257 }
258
259 private Map<String, byte[]> writeEntries(final ParallelScatterZipCreator zipCreator) {
260 final Map<String, byte[]> entries = new HashMap<>();
261 for (int i = 0; i < NUMITEMS; i++) {
262 final byte[] payloadBytes = ("content" + i).getBytes();
263 final ZipArchiveEntry za = createZipArchiveEntry(entries, i, payloadBytes);
264 final InputStreamSupplier iss = () -> new ByteArrayInputStream(payloadBytes);
265 if (i % 2 == 0) {
266 zipCreator.addArchiveEntry(za, iss);
267 } else {
268 final ZipArchiveEntryRequestSupplier zaSupplier = () -> ZipArchiveEntryRequest.createZipArchiveEntryRequest(za, iss);
269 zipCreator.addArchiveEntry(zaSupplier);
270 }
271 }
272 return entries;
273 }
274
275 private Map<String, byte[]> writeEntriesAsCallable(final ParallelScatterZipCreator zipCreator, final CallableConsumer consumer) {
276 final Map<String, byte[]> entries = new HashMap<>();
277 for (int i = 0; i < NUMITEMS; i++) {
278 final byte[] payloadBytes = ("content" + i).getBytes();
279 final ZipArchiveEntry za = createZipArchiveEntry(entries, i, payloadBytes);
280 final InputStreamSupplier iss = () -> new ByteArrayInputStream(payloadBytes);
281 final Callable<ScatterZipOutputStream> callable;
282 if (i % 2 == 0) {
283 callable = zipCreator.createCallable(za, iss);
284 } else {
285 final ZipArchiveEntryRequestSupplier zaSupplier = () -> ZipArchiveEntryRequest.createZipArchiveEntryRequest(za, iss);
286 callable = zipCreator.createCallable(zaSupplier);
287 }
288
289 consumer.accept(callable);
290 }
291 return entries;
292 }
293
294
295
296
297
298
299
300
301
302
303 private Map<String, byte[]> writeTestFilesAsCallable(final ParallelScatterZipCreator zipCreator, final CallableConsumer consumer) throws IOException {
304 final Map<String, byte[]> entries = new HashMap<>();
305 final File baseDir = AbstractTest.getFile("");
306 int filesCount = 0;
307 for (final File file : baseDir.listFiles()) {
308
309 if (filesCount >= EXPECTED_FILES_NUMBER) {
310 break;
311 }
312
313
314 if (file.isDirectory() || file.length() > EXPECTED_FILE_SIZE) {
315 continue;
316 }
317
318 entries.put(file.getName(), Files.readAllBytes(file.toPath()));
319
320 final ZipArchiveEntry zipArchiveEntry = new ZipArchiveEntry(file.getName());
321 zipArchiveEntry.setMethod(ZipEntry.DEFLATED);
322 zipArchiveEntry.setSize(file.length());
323 zipArchiveEntry.setUnixMode(UnixStat.FILE_FLAG | 0664);
324
325 final InputStreamSupplier iss = () -> {
326 try {
327 return Files.newInputStream(file.toPath());
328 } catch (final IOException e) {
329 return null;
330 }
331 };
332
333 final Callable<ScatterZipOutputStream> callable;
334 if (filesCount % 2 == 0) {
335 callable = zipCreator.createCallable(zipArchiveEntry, iss);
336 } else {
337 final ZipArchiveEntryRequestSupplier zaSupplier = () -> ZipArchiveEntryRequest.createZipArchiveEntryRequest(zipArchiveEntry, iss);
338 callable = zipCreator.createCallable(zaSupplier);
339 }
340
341 consumer.accept(callable);
342 filesCount++;
343 }
344 return entries;
345 }
346 }