View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one or more
3    *  contributor license agreements.  See the NOTICE file distributed with
4    *  this work for additional information regarding copyright ownership.
5    *  The ASF licenses this file to You under the Apache License, Version 2.0
6    *  (the "License"); you may not use this file except in compliance with
7    *  the License.  You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   *  Unless required by applicable law or agreed to in writing, software
12   *  distributed under the License is distributed on an "AS IS" BASIS,
13   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   *  See the License for the specific language governing permissions and
15   *  limitations under the License.
16   */
17  package org.apache.commons.compress.archivers.zip;
18  
19  import static org.apache.commons.compress.archivers.zip.ZipArchiveEntryRequest.createZipArchiveEntryRequest;
20  
21  import java.io.IOException;
22  import java.io.UncheckedIOException;
23  import java.util.Deque;
24  import java.util.concurrent.Callable;
25  import java.util.concurrent.ConcurrentLinkedDeque;
26  import java.util.concurrent.ExecutionException;
27  import java.util.concurrent.ExecutorService;
28  import java.util.concurrent.Executors;
29  import java.util.concurrent.Future;
30  import java.util.concurrent.TimeUnit;
31  import java.util.zip.Deflater;
32  
33  import org.apache.commons.compress.parallel.InputStreamSupplier;
34  import org.apache.commons.compress.parallel.ScatterGatherBackingStore;
35  import org.apache.commons.compress.parallel.ScatterGatherBackingStoreSupplier;
36  
37  /**
38   * Creates a ZIP in parallel by using multiple threadlocal {@link ScatterZipOutputStream} instances.
39   * <p>
40   * Note that until 1.18, this class generally made no guarantees about the order of things written to the output file. Things that needed to come in a specific
41   * order (manifests, directories) had to be handled by the client of this class, usually by writing these things to the {@link ZipArchiveOutputStream}
42   * <em>before</em> calling {@link #writeTo writeTo} on this class.
43   * </p>
44   * <p>
45   * The client can supply an {@link java.util.concurrent.ExecutorService}, but for reasons of memory model consistency, this will be shut down by this class
46   * prior to completion.
47   * </p>
48   *
49   * @since 1.10
50   */
51  public class ParallelScatterZipCreator {
52  
53      private final Deque<ScatterZipOutputStream> streams = new ConcurrentLinkedDeque<>();
54      private final ExecutorService executorService;
55      private final ScatterGatherBackingStoreSupplier backingStoreSupplier;
56  
57      private final Deque<Future<? extends ScatterZipOutputStream>> futures = new ConcurrentLinkedDeque<>();
58      private final long startedAt = System.currentTimeMillis();
59      private long compressionDoneAt;
60      private long scatterDoneAt;
61  
62      private final int compressionLevel;
63  
64      private final ThreadLocal<ScatterZipOutputStream> tlScatterStreams = new ThreadLocal<ScatterZipOutputStream>() {
65          @Override
66          protected ScatterZipOutputStream initialValue() {
67              try {
68                  final ScatterZipOutputStream scatterStream = createDeferred(backingStoreSupplier);
69                  streams.add(scatterStream);
70                  return scatterStream;
71              } catch (final IOException e) {
72                  throw new UncheckedIOException(e); // NOSONAR
73              }
74          }
75      };
76  
77      /**
78       * Constructs a ParallelScatterZipCreator with default threads, which is set to the number of available processors, as defined by
79       * {@link Runtime#availableProcessors}
80       */
81      public ParallelScatterZipCreator() {
82          this(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
83      }
84  
85      /**
86       * Constructs a ParallelScatterZipCreator
87       *
88       * @param executorService The executorService to use for parallel scheduling. For technical reasons, this will be shut down by this class.
89       */
90      public ParallelScatterZipCreator(final ExecutorService executorService) {
91          this(executorService, new DefaultBackingStoreSupplier(null));
92      }
93  
94      /**
95       * Constructs a ParallelScatterZipCreator
96       *
97       * @param executorService      The executorService to use. For technical reasons, this will be shut down by this class.
98       * @param backingStoreSupplier The supplier of backing store which shall be used
99       */
100     public ParallelScatterZipCreator(final ExecutorService executorService, final ScatterGatherBackingStoreSupplier backingStoreSupplier) {
101         this(executorService, backingStoreSupplier, Deflater.DEFAULT_COMPRESSION);
102     }
103 
104     /**
105      * Constructs a ParallelScatterZipCreator
106      *
107      * @param executorService      The executorService to use. For technical reasons, this will be shut down by this class.
108      * @param backingStoreSupplier The supplier of backing store which shall be used
109      * @param compressionLevel     The compression level used in compression, this value should be -1(default level) or between 0~9.
110      * @throws IllegalArgumentException if the compression level is illegal
111      * @since 1.21
112      */
113     public ParallelScatterZipCreator(final ExecutorService executorService, final ScatterGatherBackingStoreSupplier backingStoreSupplier,
114             final int compressionLevel) throws IllegalArgumentException {
115         if ((compressionLevel < Deflater.NO_COMPRESSION || compressionLevel > Deflater.BEST_COMPRESSION) && compressionLevel != Deflater.DEFAULT_COMPRESSION) {
116             throw new IllegalArgumentException("Compression level is expected between -1~9");
117         }
118 
119         this.backingStoreSupplier = backingStoreSupplier;
120         this.executorService = executorService;
121         this.compressionLevel = compressionLevel;
122     }
123 
124     /**
125      * Adds an archive entry to this archive.
126      * <p>
127      * This method is expected to be called from a single client thread
128      * </p>
129      *
130      * @param zipArchiveEntry The entry to add.
131      * @param source          The source input stream supplier
132      */
133 
134     public void addArchiveEntry(final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier source) {
135         submitStreamAwareCallable(createCallable(zipArchiveEntry, source));
136     }
137 
138     /**
139      * Adds an archive entry to this archive.
140      * <p>
141      * This method is expected to be called from a single client thread
142      * </p>
143      *
144      * @param zipArchiveEntryRequestSupplier Should supply the entry to be added.
145      * @since 1.13
146      */
147     public void addArchiveEntry(final ZipArchiveEntryRequestSupplier zipArchiveEntryRequestSupplier) {
148         submitStreamAwareCallable(createCallable(zipArchiveEntryRequestSupplier));
149     }
150 
151     private void closeAll() {
152         for (final ScatterZipOutputStream scatterStream : streams) {
153             try {
154                 scatterStream.close();
155             } catch (final IOException ex) { // NOSONAR
156                 // no way to properly log this
157             }
158         }
159     }
160 
161     /**
162      * Creates a callable that will compress the given archive entry.
163      *
164      * <p>
165      * This method is expected to be called from a single client thread.
166      * </p>
167      *
168      * Consider using {@link #addArchiveEntry addArchiveEntry}, which wraps this method and {@link #submitStreamAwareCallable submitStreamAwareCallable}. The
169      * most common use case for using {@link #createCallable createCallable} and {@link #submitStreamAwareCallable submitStreamAwareCallable} from a client is
170      * if you want to wrap the callable in something that can be prioritized by the supplied {@link ExecutorService}, for instance to process large or slow
171      * files first. Since the creation of the {@link ExecutorService} is handled by the client, all of this is up to the client.
172      *
173      * @param zipArchiveEntry The entry to add.
174      * @param source          The source input stream supplier
175      * @return A callable that should subsequently be passed to #submitStreamAwareCallable, possibly in a wrapped/adapted from. The value of this callable is
176      *         not used, but any exceptions happening inside the compression will be propagated through the callable.
177      */
178 
179     public final Callable<ScatterZipOutputStream> createCallable(final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier source) {
180         final int method = zipArchiveEntry.getMethod();
181         if (method == ZipMethod.UNKNOWN_CODE) {
182             throw new IllegalArgumentException("Method must be set on zipArchiveEntry: " + zipArchiveEntry);
183         }
184         final ZipArchiveEntryRequest zipArchiveEntryRequest = createZipArchiveEntryRequest(zipArchiveEntry, source);
185         return () -> {
186             final ScatterZipOutputStream scatterStream = tlScatterStreams.get();
187             scatterStream.addArchiveEntry(zipArchiveEntryRequest);
188             return scatterStream;
189         };
190     }
191 
192     /**
193      * Creates a callable that will compress archive entry supplied by {@link ZipArchiveEntryRequestSupplier}.
194      *
195      * <p>
196      * This method is expected to be called from a single client thread.
197      * </p>
198      *
199      * The same as {@link #createCallable(ZipArchiveEntry, InputStreamSupplier)}, but the archive entry to be added is supplied by a
200      * {@link ZipArchiveEntryRequestSupplier}.
201      *
202      * @see #createCallable(ZipArchiveEntry, InputStreamSupplier)
203      *
204      * @param zipArchiveEntryRequestSupplier Should supply the entry to be added.
205      * @return A callable that should subsequently be passed to #submitStreamAwareCallable, possibly in a wrapped/adapted from. The value of this callable is
206      *         not used, but any exceptions happening inside the compression will be propagated through the callable.
207      * @since 1.13
208      */
209     public final Callable<ScatterZipOutputStream> createCallable(final ZipArchiveEntryRequestSupplier zipArchiveEntryRequestSupplier) {
210         return () -> {
211             final ScatterZipOutputStream scatterStream = tlScatterStreams.get();
212             scatterStream.addArchiveEntry(zipArchiveEntryRequestSupplier.get());
213             return scatterStream;
214         };
215     }
216 
217     @SuppressWarnings("resource") // Caller closes
218     private ScatterZipOutputStream createDeferred(final ScatterGatherBackingStoreSupplier scatterGatherBackingStoreSupplier) throws IOException {
219         final ScatterGatherBackingStore bs = scatterGatherBackingStoreSupplier.get();
220         // lifecycle is bound to the ScatterZipOutputStream returned
221         final StreamCompressor sc = StreamCompressor.create(compressionLevel, bs); // NOSONAR
222         return new ScatterZipOutputStream(bs, sc);
223     }
224 
225     /**
226      * Gets a message describing the overall statistics of the compression run
227      *
228      * @return A string
229      */
230     public ScatterStatistics getStatisticsMessage() {
231         return new ScatterStatistics(compressionDoneAt - startedAt, scatterDoneAt - compressionDoneAt);
232     }
233 
234     /**
235      * Submits a callable for compression.
236      *
237      * @see ParallelScatterZipCreator#createCallable for details of if/when to use this.
238      *
239      * @param callable The callable to run, created by {@link #createCallable createCallable}, possibly wrapped by caller.
240      */
241     public final void submit(final Callable<? extends Object> callable) {
242         submitStreamAwareCallable(() -> {
243             callable.call();
244             return tlScatterStreams.get();
245         });
246     }
247 
248     /**
249      * Submits a callable for compression.
250      *
251      * @see ParallelScatterZipCreator#createCallable for details of if/when to use this.
252      *
253      * @param callable The callable to run, created by {@link #createCallable createCallable}, possibly wrapped by caller.
254      * @since 1.19
255      */
256     public final void submitStreamAwareCallable(final Callable<? extends ScatterZipOutputStream> callable) {
257         futures.add(executorService.submit(callable));
258     }
259 
260     /**
261      * Writes the contents this to the target {@link ZipArchiveOutputStream}.
262      * <p>
263      * It may be beneficial to write things like directories and manifest files to the targetStream before calling this method.
264      * </p>
265      * <p>
266      * Calling this method will shut down the {@link ExecutorService} used by this class. If any of the {@link Callable}s {@link #submitStreamAwareCallable
267      * submit}ted to this instance throws an exception, the archive can not be created properly and this method will throw an exception.
268      * </p>
269      *
270      * @param targetStream The {@link ZipArchiveOutputStream} to receive the contents of the scatter streams
271      * @throws IOException          If writing fails
272      * @throws InterruptedException If we get interrupted
273      * @throws ExecutionException   If something happens in the parallel execution
274      */
275     public void writeTo(final ZipArchiveOutputStream targetStream) throws IOException, InterruptedException, ExecutionException {
276 
277         try {
278             // Make sure we catch any exceptions from parallel phase
279             try {
280                 for (final Future<?> future : futures) {
281                     future.get();
282                 }
283             } finally {
284                 executorService.shutdown();
285             }
286 
287             executorService.awaitTermination(1000 * 60L, TimeUnit.SECONDS); // == Infinity. We really *must* wait for this to complete
288 
289             // It is important that all threads terminate before we go on, ensure happens-before relationship
290             compressionDoneAt = System.currentTimeMillis();
291 
292             for (final Future<? extends ScatterZipOutputStream> future : futures) {
293                 final ScatterZipOutputStream scatterStream = future.get();
294                 scatterStream.zipEntryWriter().writeNextZipEntry(targetStream);
295             }
296 
297             for (final ScatterZipOutputStream scatterStream : streams) {
298                 scatterStream.close();
299             }
300 
301             scatterDoneAt = System.currentTimeMillis();
302         } finally {
303             closeAll();
304         }
305     }
306 }