View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   https://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.commons.compress.archivers.zip;
20  
21  import static org.junit.jupiter.api.Assertions.assertArrayEquals;
22  import static org.junit.jupiter.api.Assertions.assertEquals;
23  import static org.junit.jupiter.api.Assertions.assertNotNull;
24  import static org.junit.jupiter.api.Assertions.assertThrows;
25  import static org.junit.jupiter.api.Assertions.assertTrue;
26  
27  import java.io.ByteArrayInputStream;
28  import java.io.ByteArrayOutputStream;
29  import java.io.File;
30  import java.io.IOException;
31  import java.io.InputStream;
32  import java.nio.charset.StandardCharsets;
33  import java.nio.file.Files;
34  import java.nio.file.Path;
35  import java.nio.file.Paths;
36  import java.util.Enumeration;
37  import java.util.HashMap;
38  import java.util.LinkedList;
39  import java.util.Map;
40  import java.util.concurrent.Callable;
41  import java.util.concurrent.ExecutionException;
42  import java.util.concurrent.ExecutorService;
43  import java.util.concurrent.Executors;
44  import java.util.function.Consumer;
45  import java.util.function.Function;
46  import java.util.zip.Deflater;
47  import java.util.zip.ZipEntry;
48  
49  import org.apache.commons.compress.AbstractTempDirTest;
50  import org.apache.commons.compress.AbstractTest;
51  import org.apache.commons.compress.parallel.FileBasedScatterGatherBackingStore;
52  import org.apache.commons.compress.parallel.InputStreamSupplier;
53  import org.apache.commons.compress.parallel.ScatterGatherBackingStoreSupplier;
54  import org.apache.commons.io.IOUtils;
55  import org.junit.jupiter.api.Disabled;
56  import org.junit.jupiter.api.Test;
57  
58  class ParallelScatterZipCreatorTest extends AbstractTempDirTest {
59  
60      private interface CallableConsumer extends Consumer<Callable<? extends ScatterZipOutputStream>> {
61          // empty
62      }
63  
64      private interface CallableConsumerSupplier extends Function<ParallelScatterZipCreator, CallableConsumer> {
65          // empty
66      }
67  
68      private static final long EXPECTED_FILE_SIZE = 1024 * 1024; // 1MB
69  
70      private static final int EXPECTED_FILES_NUMBER = 50;
71      private final int NUMITEMS = 5000;
72  
73      private void callableApi(final CallableConsumerSupplier consumerSupplier, final File result) throws Exception {
74          callableApi(consumerSupplier, Deflater.DEFAULT_COMPRESSION, result);
75      }
76  
77      private void callableApi(final CallableConsumerSupplier consumerSupplier, final int compressionLevel, final File result) throws Exception {
78          final Map<String, byte[]> entries;
79          final ParallelScatterZipCreator zipCreator;
80          try (ZipArchiveOutputStream zos = new ZipArchiveOutputStream(result)) {
81              zos.setEncoding(StandardCharsets.UTF_8.name());
82              final ExecutorService es = Executors.newFixedThreadPool(1);
83  
84              final ScatterGatherBackingStoreSupplier supp = () -> new FileBasedScatterGatherBackingStore(createTempFile("parallelscatter", "n1"));
85  
86              zipCreator = new ParallelScatterZipCreator(es, supp, compressionLevel);
87              entries = writeEntriesAsCallable(zipCreator, consumerSupplier.apply(zipCreator));
88              zipCreator.writeTo(zos);
89          }
90  
91          removeEntriesFoundInZipFile(result, entries);
92          assertTrue(entries.isEmpty());
93          assertNotNull(zipCreator.getStatisticsMessage());
94      }
95  
96      private void callableApiWithTestFiles(final CallableConsumerSupplier consumerSupplier, final int compressionLevel, final File result) throws Exception {
97          final ParallelScatterZipCreator zipCreator;
98          final Map<String, byte[]> entries;
99          try (ZipArchiveOutputStream zos = new ZipArchiveOutputStream(result)) {
100             zos.setEncoding(StandardCharsets.UTF_8.name());
101             final ExecutorService es = Executors.newFixedThreadPool(1);
102 
103             final ScatterGatherBackingStoreSupplier supp = () -> new FileBasedScatterGatherBackingStore(createTempFile("parallelscatter", "n1"));
104 
105             zipCreator = new ParallelScatterZipCreator(es, supp, compressionLevel);
106             entries = writeTestFilesAsCallable(zipCreator, consumerSupplier.apply(zipCreator));
107             zipCreator.writeTo(zos);
108         }
109 
110         // validate the content of the compressed files
111         try (ZipFile zf = ZipFile.builder().setFile(result).get()) {
112             final Enumeration<ZipArchiveEntry> entriesInPhysicalOrder = zf.getEntriesInPhysicalOrder();
113             while (entriesInPhysicalOrder.hasMoreElements()) {
114                 final ZipArchiveEntry zipArchiveEntry = entriesInPhysicalOrder.nextElement();
115                 try (InputStream inputStream = zf.getInputStream(zipArchiveEntry)) {
116                     final byte[] actual = IOUtils.toByteArray(inputStream);
117                     final byte[] expected = entries.remove(zipArchiveEntry.getName());
118                     assertArrayEquals(expected, actual, "For " + zipArchiveEntry.getName());
119                 }
120             }
121         }
122         assertNotNull(zipCreator.getStatisticsMessage());
123     }
124 
125     private ZipArchiveEntry createZipArchiveEntry(final Map<String, byte[]> entries, final int i, final byte[] payloadBytes) {
126         final ZipArchiveEntry za = new ZipArchiveEntry("file" + i);
127         entries.put(za.getName(), payloadBytes);
128         za.setMethod(ZipEntry.DEFLATED);
129         za.setSize(payloadBytes.length);
130         za.setUnixMode(UnixStat.FILE_FLAG | 0664);
131         return za;
132     }
133 
134     private void removeEntriesFoundInZipFile(final File result, final Map<String, byte[]> entries) throws IOException {
135         try (ZipFile zf = ZipFile.builder().setFile(result).get()) {
136             final Enumeration<ZipArchiveEntry> entriesInPhysicalOrder = zf.getEntriesInPhysicalOrder();
137             int i = 0;
138             while (entriesInPhysicalOrder.hasMoreElements()) {
139                 final ZipArchiveEntry zipArchiveEntry = entriesInPhysicalOrder.nextElement();
140                 try (InputStream inputStream = zf.getInputStream(zipArchiveEntry)) {
141                     final byte[] actual = IOUtils.toByteArray(inputStream);
142                     final byte[] expected = entries.remove(zipArchiveEntry.getName());
143                     assertArrayEquals(expected, actual, "For " + zipArchiveEntry.getName());
144                 }
145                 // check order of ZIP entries vs order of addition to the parallel ZIP creator
146                 assertEquals("file" + i++, zipArchiveEntry.getName(), "For " + zipArchiveEntry.getName());
147             }
148         }
149     }
150 
151     @Test
152     @Disabled("[COMPRESS-639]")
153     public void sameZipArchiveEntryNullPointerException() throws IOException, ExecutionException, InterruptedException {
154         final ByteArrayOutputStream testOutputStream = new ByteArrayOutputStream();
155 
156         final String fileContent = "A";
157         final int NUM_OF_FILES = 100;
158         final LinkedList<InputStream> inputStreams = new LinkedList<>();
159         for (int i = 0; i < NUM_OF_FILES; i++) {
160             inputStreams.add(new ByteArrayInputStream(fileContent.getBytes(StandardCharsets.UTF_8)));
161         }
162 
163         final ParallelScatterZipCreator zipCreator = new ParallelScatterZipCreator();
164         try (ZipArchiveOutputStream zipArchiveOutputStream = new ZipArchiveOutputStream(testOutputStream)) {
165             zipArchiveOutputStream.setUseZip64(Zip64Mode.Always);
166 
167             for (final InputStream inputStream : inputStreams) {
168                 final ZipArchiveEntry zipArchiveEntry = new ZipArchiveEntry("./dir/myfile.txt");
169                 zipArchiveEntry.setMethod(ZipEntry.DEFLATED);
170                 zipCreator.addArchiveEntry(zipArchiveEntry, () -> inputStream);
171             }
172 
173             zipCreator.writeTo(zipArchiveOutputStream);
174         } // Throws NullPointerException on close()
175     }
176 
177     @Test
178     void testCallableApiUsingSubmit() throws Exception {
179         final File result = createTempFile("parallelScatterGather2", "");
180         callableApi(zipCreator -> zipCreator::submit, result);
181     }
182 
183     @Test
184     void testCallableApiUsingSubmitStreamAwareCallable() throws Exception {
185         final File result = createTempFile("parallelScatterGather3", "");
186         callableApi(zipCreator -> zipCreator::submitStreamAwareCallable, result);
187     }
188 
189     @Test
190     void testCallableApiWithHighestLevelUsingSubmitStreamAwareCallable() throws Exception {
191         final File result = createTempFile("parallelScatterGather5", "");
192         callableApiWithTestFiles(zipCreator -> zipCreator::submitStreamAwareCallable, Deflater.BEST_COMPRESSION, result);
193     }
194 
195     @Test
196     void testCallableWithLowestLevelApiUsingSubmit() throws Exception {
197         final File result = createTempFile("parallelScatterGather4", "");
198         callableApiWithTestFiles(zipCreator -> zipCreator::submit, Deflater.NO_COMPRESSION, result);
199     }
200 
201     @Test
202     void testConcurrentCustomTempFolder() throws Exception {
203         final File result = createTempFile("parallelScatterGather1", "");
204         final ParallelScatterZipCreator zipCreator;
205         final Map<String, byte[]> entries;
206         try (ZipArchiveOutputStream zos = new ZipArchiveOutputStream(result)) {
207             zos.setEncoding(StandardCharsets.UTF_8.name());
208 
209             // Formatter:off
210             final Path dir = Paths.get("target/custom-temp-dir");
211             Files.createDirectories(dir);
212             zipCreator = new ParallelScatterZipCreator(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()),
213                     new DefaultBackingStoreSupplier(dir));
214             // Formatter:on
215 
216             entries = writeEntries(zipCreator);
217             zipCreator.writeTo(zos);
218         }
219         removeEntriesFoundInZipFile(result, entries);
220         assertTrue(entries.isEmpty());
221         assertNotNull(zipCreator.getStatisticsMessage());
222     }
223 
224     @Test
225     void testConcurrentDefaultTempFolder() throws Exception {
226         final File result = createTempFile("parallelScatterGather1", "");
227         final ParallelScatterZipCreator zipCreator;
228         final Map<String, byte[]> entries;
229         try (ZipArchiveOutputStream zos = new ZipArchiveOutputStream(result)) {
230             zos.setEncoding(StandardCharsets.UTF_8.name());
231             zipCreator = new ParallelScatterZipCreator();
232 
233             entries = writeEntries(zipCreator);
234             zipCreator.writeTo(zos);
235         }
236         removeEntriesFoundInZipFile(result, entries);
237         assertTrue(entries.isEmpty());
238         assertNotNull(zipCreator.getStatisticsMessage());
239     }
240 
241     @Test
242     void testThrowsExceptionWithCompressionLevelTooBig() {
243         final int compressLevelTooBig = Deflater.BEST_COMPRESSION + 1;
244         final ExecutorService es = Executors.newFixedThreadPool(1);
245         assertThrows(IllegalArgumentException.class, () -> new ParallelScatterZipCreator(es,
246                 () -> new FileBasedScatterGatherBackingStore(createTempFile("parallelscatter", "n1")), compressLevelTooBig));
247         es.shutdownNow();
248     }
249 
250     @Test
251     void testThrowsExceptionWithCompressionLevelTooSmall() {
252         final int compressLevelTooSmall = Deflater.DEFAULT_COMPRESSION - 1;
253         final ExecutorService es = Executors.newFixedThreadPool(1);
254         assertThrows(IllegalArgumentException.class, () -> new ParallelScatterZipCreator(es,
255                 () -> new FileBasedScatterGatherBackingStore(createTempFile("parallelscatter", "n1")), compressLevelTooSmall));
256         es.shutdownNow();
257     }
258 
259     private Map<String, byte[]> writeEntries(final ParallelScatterZipCreator zipCreator) {
260         final Map<String, byte[]> entries = new HashMap<>();
261         for (int i = 0; i < NUMITEMS; i++) {
262             final byte[] payloadBytes = ("content" + i).getBytes();
263             final ZipArchiveEntry za = createZipArchiveEntry(entries, i, payloadBytes);
264             final InputStreamSupplier iss = () -> new ByteArrayInputStream(payloadBytes);
265             if (i % 2 == 0) {
266                 zipCreator.addArchiveEntry(za, iss);
267             } else {
268                 final ZipArchiveEntryRequestSupplier zaSupplier = () -> ZipArchiveEntryRequest.createZipArchiveEntryRequest(za, iss);
269                 zipCreator.addArchiveEntry(zaSupplier);
270             }
271         }
272         return entries;
273     }
274 
275     private Map<String, byte[]> writeEntriesAsCallable(final ParallelScatterZipCreator zipCreator, final CallableConsumer consumer) {
276         final Map<String, byte[]> entries = new HashMap<>();
277         for (int i = 0; i < NUMITEMS; i++) {
278             final byte[] payloadBytes = ("content" + i).getBytes();
279             final ZipArchiveEntry za = createZipArchiveEntry(entries, i, payloadBytes);
280             final InputStreamSupplier iss = () -> new ByteArrayInputStream(payloadBytes);
281             final Callable<ScatterZipOutputStream> callable;
282             if (i % 2 == 0) {
283                 callable = zipCreator.createCallable(za, iss);
284             } else {
285                 final ZipArchiveEntryRequestSupplier zaSupplier = () -> ZipArchiveEntryRequest.createZipArchiveEntryRequest(za, iss);
286                 callable = zipCreator.createCallable(zaSupplier);
287             }
288 
289             consumer.accept(callable);
290         }
291         return entries;
292     }
293 
294     /**
295      * Try to compress the files in src/test/resources with size no bigger than {@value #EXPECTED_FILES_NUMBER} and with a mount of files no bigger than
296      * {@value #EXPECTED_FILES_NUMBER}
297      *
298      * @param zipCreator The ParallelScatterZipCreator
299      * @param consumer   The parallel consumer
300      * @return A map using file name as key and file content as value
301      * @throws IOException if exceptions occur when opening files
302      */
303     private Map<String, byte[]> writeTestFilesAsCallable(final ParallelScatterZipCreator zipCreator, final CallableConsumer consumer) throws IOException {
304         final Map<String, byte[]> entries = new HashMap<>();
305         final File baseDir = AbstractTest.getFile("");
306         int filesCount = 0;
307         for (final File file : baseDir.listFiles()) {
308             // do not compress too many files
309             if (filesCount >= EXPECTED_FILES_NUMBER) {
310                 break;
311             }
312 
313             // skip files that are too large
314             if (file.isDirectory() || file.length() > EXPECTED_FILE_SIZE) {
315                 continue;
316             }
317 
318             entries.put(file.getName(), Files.readAllBytes(file.toPath()));
319 
320             final ZipArchiveEntry zipArchiveEntry = new ZipArchiveEntry(file.getName());
321             zipArchiveEntry.setMethod(ZipEntry.DEFLATED);
322             zipArchiveEntry.setSize(file.length());
323             zipArchiveEntry.setUnixMode(UnixStat.FILE_FLAG | 0664);
324 
325             final InputStreamSupplier iss = () -> {
326                 try {
327                     return Files.newInputStream(file.toPath());
328                 } catch (final IOException e) {
329                     return null;
330                 }
331             };
332 
333             final Callable<ScatterZipOutputStream> callable;
334             if (filesCount % 2 == 0) {
335                 callable = zipCreator.createCallable(zipArchiveEntry, iss);
336             } else {
337                 final ZipArchiveEntryRequestSupplier zaSupplier = () -> ZipArchiveEntryRequest.createZipArchiveEntryRequest(zipArchiveEntry, iss);
338                 callable = zipCreator.createCallable(zaSupplier);
339             }
340 
341             consumer.accept(callable);
342             filesCount++;
343         }
344         return entries;
345     }
346 }