View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   https://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.commons.compress.archivers.zip;
20  
21  import static org.apache.commons.compress.archivers.zip.ZipArchiveEntryRequest.createZipArchiveEntryRequest;
22  
23  import java.io.IOException;
24  import java.io.UncheckedIOException;
25  import java.util.Deque;
26  import java.util.concurrent.Callable;
27  import java.util.concurrent.ConcurrentLinkedDeque;
28  import java.util.concurrent.ExecutionException;
29  import java.util.concurrent.ExecutorService;
30  import java.util.concurrent.Executors;
31  import java.util.concurrent.Future;
32  import java.util.concurrent.TimeUnit;
33  import java.util.zip.Deflater;
34  
35  import org.apache.commons.compress.parallel.InputStreamSupplier;
36  import org.apache.commons.compress.parallel.ScatterGatherBackingStore;
37  import org.apache.commons.compress.parallel.ScatterGatherBackingStoreSupplier;
38  import org.apache.commons.io.IOUtils;
39  
40  /**
41   * Creates a ZIP in parallel by using multiple threadlocal {@link ScatterZipOutputStream} instances.
42   * <p>
43   * Note that until 1.18, this class generally made no guarantees about the order of things written to the output file. Things that needed to come in a specific
44   * order (manifests, directories) had to be handled by the client of this class, usually by writing these things to the {@link ZipArchiveOutputStream}
45   * <em>before</em> calling {@link #writeTo writeTo} on this class.
46   * </p>
47   * <p>
48   * The client can supply an {@link java.util.concurrent.ExecutorService}, but for reasons of memory model consistency, this will be shut down by this class
49   * prior to completion.
50   * </p>
51   *
52   * @since 1.10
53   */
54  public class ParallelScatterZipCreator {
55  
56      private final Deque<ScatterZipOutputStream> streams = new ConcurrentLinkedDeque<>();
57      private final ExecutorService executorService;
58      private final ScatterGatherBackingStoreSupplier backingStoreSupplier;
59  
60      private final Deque<Future<? extends ScatterZipOutputStream>> futures = new ConcurrentLinkedDeque<>();
61      private final long startedAt = System.currentTimeMillis();
62      private long compressionDoneAt;
63      private long scatterDoneAt;
64  
65      private final int compressionLevel;
66  
67      private final ThreadLocal<ScatterZipOutputStream> tlScatterStreams = new ThreadLocal<ScatterZipOutputStream>() {
68          @Override
69          protected ScatterZipOutputStream initialValue() {
70              try {
71                  final ScatterZipOutputStream scatterStream = createDeferred(backingStoreSupplier);
72                  streams.add(scatterStream);
73                  return scatterStream;
74              } catch (final IOException e) {
75                  throw new UncheckedIOException(e); // NOSONAR
76              }
77          }
78      };
79  
80      /**
81       * Constructs a ParallelScatterZipCreator with default threads, which is set to the number of available processors, as defined by
82       * {@link Runtime#availableProcessors}
83       */
84      public ParallelScatterZipCreator() {
85          this(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
86      }
87  
88      /**
89       * Constructs a ParallelScatterZipCreator
90       *
91       * @param executorService The executorService to use for parallel scheduling. For technical reasons, this will be shut down by this class.
92       */
93      public ParallelScatterZipCreator(final ExecutorService executorService) {
94          this(executorService, new DefaultBackingStoreSupplier(null));
95      }
96  
97      /**
98       * Constructs a ParallelScatterZipCreator
99       *
100      * @param executorService      The executorService to use. For technical reasons, this will be shut down by this class.
101      * @param backingStoreSupplier The supplier of backing store which shall be used
102      */
103     public ParallelScatterZipCreator(final ExecutorService executorService, final ScatterGatherBackingStoreSupplier backingStoreSupplier) {
104         this(executorService, backingStoreSupplier, Deflater.DEFAULT_COMPRESSION);
105     }
106 
107     /**
108      * Constructs a ParallelScatterZipCreator
109      *
110      * @param executorService      The executorService to use. For technical reasons, this will be shut down by this class.
111      * @param backingStoreSupplier The supplier of backing store which shall be used
112      * @param compressionLevel     The compression level used in compression, this value should be -1(default level) or between 0~9.
113      * @throws IllegalArgumentException if the compression level is illegal
114      * @since 1.21
115      */
116     public ParallelScatterZipCreator(final ExecutorService executorService, final ScatterGatherBackingStoreSupplier backingStoreSupplier,
117             final int compressionLevel) throws IllegalArgumentException {
118         if ((compressionLevel < Deflater.NO_COMPRESSION || compressionLevel > Deflater.BEST_COMPRESSION) && compressionLevel != Deflater.DEFAULT_COMPRESSION) {
119             throw new IllegalArgumentException("Compression level is expected between -1~9");
120         }
121 
122         this.backingStoreSupplier = backingStoreSupplier;
123         this.executorService = executorService;
124         this.compressionLevel = compressionLevel;
125     }
126 
127     /**
128      * Adds an archive entry to this archive.
129      * <p>
130      * This method is expected to be called from a single client thread
131      * </p>
132      *
133      * @param zipArchiveEntry The entry to add.
134      * @param source          The source input stream supplier
135      */
136 
137     public void addArchiveEntry(final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier source) {
138         submitStreamAwareCallable(createCallable(zipArchiveEntry, source));
139     }
140 
141     /**
142      * Adds an archive entry to this archive.
143      * <p>
144      * This method is expected to be called from a single client thread
145      * </p>
146      *
147      * @param zipArchiveEntryRequestSupplier Should supply the entry to be added.
148      * @since 1.13
149      */
150     public void addArchiveEntry(final ZipArchiveEntryRequestSupplier zipArchiveEntryRequestSupplier) {
151         submitStreamAwareCallable(createCallable(zipArchiveEntryRequestSupplier));
152     }
153 
154     private void closeAll() {
155         for (final ScatterZipOutputStream scatterStream : streams) {
156             IOUtils.closeQuietly(scatterStream);
157         }
158     }
159 
160     /**
161      * Creates a callable that will compress the given archive entry.
162      *
163      * <p>
164      * This method is expected to be called from a single client thread.
165      * </p>
166      *
167      * Consider using {@link #addArchiveEntry addArchiveEntry}, which wraps this method and {@link #submitStreamAwareCallable submitStreamAwareCallable}. The
168      * most common use case for using {@link #createCallable createCallable} and {@link #submitStreamAwareCallable submitStreamAwareCallable} from a client is
169      * if you want to wrap the callable in something that can be prioritized by the supplied {@link ExecutorService}, for instance to process large or slow
170      * files first. Since the creation of the {@link ExecutorService} is handled by the client, all of this is up to the client.
171      *
172      * @param zipArchiveEntry The entry to add.
173      * @param source          The source input stream supplier
174      * @return A callable that should subsequently be passed to #submitStreamAwareCallable, possibly in a wrapped/adapted from. The value of this callable is
175      *         not used, but any exceptions happening inside the compression will be propagated through the callable.
176      */
177 
178     public final Callable<ScatterZipOutputStream> createCallable(final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier source) {
179         final int method = zipArchiveEntry.getMethod();
180         if (method == ZipMethod.UNKNOWN_CODE) {
181             throw new IllegalArgumentException("Method must be set on zipArchiveEntry: " + zipArchiveEntry);
182         }
183         final ZipArchiveEntryRequest zipArchiveEntryRequest = createZipArchiveEntryRequest(zipArchiveEntry, source);
184         return () -> {
185             final ScatterZipOutputStream scatterStream = tlScatterStreams.get();
186             scatterStream.addArchiveEntry(zipArchiveEntryRequest);
187             return scatterStream;
188         };
189     }
190 
191     /**
192      * Creates a callable that will compress archive entry supplied by {@link ZipArchiveEntryRequestSupplier}.
193      *
194      * <p>
195      * This method is expected to be called from a single client thread.
196      * </p>
197      *
198      * The same as {@link #createCallable(ZipArchiveEntry, InputStreamSupplier)}, but the archive entry to be added is supplied by a
199      * {@link ZipArchiveEntryRequestSupplier}.
200      *
201      * @see #createCallable(ZipArchiveEntry, InputStreamSupplier)
202      * @param zipArchiveEntryRequestSupplier Should supply the entry to be added.
203      * @return A callable that should subsequently be passed to #submitStreamAwareCallable, possibly in a wrapped/adapted from. The value of this callable is
204      *         not used, but any exceptions happening inside the compression will be propagated through the callable.
205      * @since 1.13
206      */
207     public final Callable<ScatterZipOutputStream> createCallable(final ZipArchiveEntryRequestSupplier zipArchiveEntryRequestSupplier) {
208         return () -> {
209             final ScatterZipOutputStream scatterStream = tlScatterStreams.get();
210             scatterStream.addArchiveEntry(zipArchiveEntryRequestSupplier.get());
211             return scatterStream;
212         };
213     }
214 
215     @SuppressWarnings("resource") // Caller closes
216     private ScatterZipOutputStream createDeferred(final ScatterGatherBackingStoreSupplier scatterGatherBackingStoreSupplier) throws IOException {
217         final ScatterGatherBackingStore bs = scatterGatherBackingStoreSupplier.get();
218         // lifecycle is bound to the ScatterZipOutputStream returned
219         final StreamCompressor sc = StreamCompressor.create(compressionLevel, bs); // NOSONAR
220         return new ScatterZipOutputStream(bs, sc);
221     }
222 
223     /**
224      * Gets a message describing the overall statistics of the compression run
225      *
226      * @return A string
227      */
228     public ScatterStatistics getStatisticsMessage() {
229         return new ScatterStatistics(compressionDoneAt - startedAt, scatterDoneAt - compressionDoneAt);
230     }
231 
232     /**
233      * Submits a callable for compression.
234      *
235      * @see ParallelScatterZipCreator#createCallable for details of if/when to use this.
236      * @param callable The callable to run, created by {@link #createCallable createCallable}, possibly wrapped by caller.
237      */
238     public final void submit(final Callable<? extends Object> callable) {
239         submitStreamAwareCallable(() -> {
240             callable.call();
241             return tlScatterStreams.get();
242         });
243     }
244 
245     /**
246      * Submits a callable for compression.
247      *
248      * @see ParallelScatterZipCreator#createCallable for details of if/when to use this.
249      * @param callable The callable to run, created by {@link #createCallable createCallable}, possibly wrapped by caller.
250      * @since 1.19
251      */
252     public final void submitStreamAwareCallable(final Callable<? extends ScatterZipOutputStream> callable) {
253         futures.add(executorService.submit(callable));
254     }
255 
256     /**
257      * Writes the contents this to the target {@link ZipArchiveOutputStream}.
258      * <p>
259      * It may be beneficial to write things like directories and manifest files to the targetStream before calling this method.
260      * </p>
261      * <p>
262      * Calling this method will shut down the {@link ExecutorService} used by this class. If any of the {@link Callable}s {@link #submitStreamAwareCallable
263      * submit}ted to this instance throws an exception, the archive cannot be created properly and this method will throw an exception.
264      * </p>
265      *
266      * @param targetStream The {@link ZipArchiveOutputStream} to receive the contents of the scatter streams
267      * @throws IOException          If writing fails
268      * @throws InterruptedException If we get interrupted
269      * @throws ExecutionException   If something happens in the parallel execution
270      */
271     public void writeTo(final ZipArchiveOutputStream targetStream) throws IOException, InterruptedException, ExecutionException {
272 
273         try {
274             // Make sure we catch any exceptions from parallel phase
275             try {
276                 for (final Future<?> future : futures) {
277                     future.get();
278                 }
279             } finally {
280                 executorService.shutdown();
281             }
282 
283             executorService.awaitTermination(1000 * 60L, TimeUnit.SECONDS); // == Infinity. We really *must* wait for this to complete
284 
285             // It is important that all threads terminate before we go on, ensure happens-before relationship
286             compressionDoneAt = System.currentTimeMillis();
287 
288             for (final Future<? extends ScatterZipOutputStream> future : futures) {
289                 final ScatterZipOutputStream scatterStream = future.get();
290                 scatterStream.zipEntryWriter().writeNextZipEntry(targetStream);
291             }
292 
293             for (final ScatterZipOutputStream scatterStream : streams) {
294                 scatterStream.close();
295             }
296 
297             scatterDoneAt = System.currentTimeMillis();
298         } finally {
299             closeAll();
300         }
301     }
302 }