1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * https://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19 package org.apache.commons.compress.archivers.zip;
20
21 import static org.apache.commons.compress.archivers.zip.ZipArchiveEntryRequest.createZipArchiveEntryRequest;
22
23 import java.io.IOException;
24 import java.io.UncheckedIOException;
25 import java.util.Deque;
26 import java.util.concurrent.Callable;
27 import java.util.concurrent.ConcurrentLinkedDeque;
28 import java.util.concurrent.ExecutionException;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.Future;
32 import java.util.concurrent.TimeUnit;
33 import java.util.zip.Deflater;
34
35 import org.apache.commons.compress.parallel.InputStreamSupplier;
36 import org.apache.commons.compress.parallel.ScatterGatherBackingStore;
37 import org.apache.commons.compress.parallel.ScatterGatherBackingStoreSupplier;
38 import org.apache.commons.io.IOUtils;
39
40 /**
41 * Creates a ZIP in parallel by using multiple threadlocal {@link ScatterZipOutputStream} instances.
42 * <p>
43 * Note that until 1.18, this class generally made no guarantees about the order of things written to the output file. Things that needed to come in a specific
44 * order (manifests, directories) had to be handled by the client of this class, usually by writing these things to the {@link ZipArchiveOutputStream}
45 * <em>before</em> calling {@link #writeTo writeTo} on this class.
46 * </p>
47 * <p>
48 * The client can supply an {@link java.util.concurrent.ExecutorService}, but for reasons of memory model consistency, this will be shut down by this class
49 * prior to completion.
50 * </p>
51 *
52 * @since 1.10
53 */
54 public class ParallelScatterZipCreator {
55
56 private final Deque<ScatterZipOutputStream> streams = new ConcurrentLinkedDeque<>();
57 private final ExecutorService executorService;
58 private final ScatterGatherBackingStoreSupplier backingStoreSupplier;
59
60 private final Deque<Future<? extends ScatterZipOutputStream>> futures = new ConcurrentLinkedDeque<>();
61 private final long startedAt = System.currentTimeMillis();
62 private long compressionDoneAt;
63 private long scatterDoneAt;
64
65 private final int compressionLevel;
66
67 private final ThreadLocal<ScatterZipOutputStream> tlScatterStreams = new ThreadLocal<ScatterZipOutputStream>() {
68 @Override
69 protected ScatterZipOutputStream initialValue() {
70 try {
71 final ScatterZipOutputStream scatterStream = createDeferred(backingStoreSupplier);
72 streams.add(scatterStream);
73 return scatterStream;
74 } catch (final IOException e) {
75 throw new UncheckedIOException(e); // NOSONAR
76 }
77 }
78 };
79
80 /**
81 * Constructs a ParallelScatterZipCreator with default threads, which is set to the number of available processors, as defined by
82 * {@link Runtime#availableProcessors}
83 */
84 public ParallelScatterZipCreator() {
85 this(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
86 }
87
88 /**
89 * Constructs a ParallelScatterZipCreator
90 *
91 * @param executorService The executorService to use for parallel scheduling. For technical reasons, this will be shut down by this class.
92 */
93 public ParallelScatterZipCreator(final ExecutorService executorService) {
94 this(executorService, new DefaultBackingStoreSupplier(null));
95 }
96
97 /**
98 * Constructs a ParallelScatterZipCreator
99 *
100 * @param executorService The executorService to use. For technical reasons, this will be shut down by this class.
101 * @param backingStoreSupplier The supplier of backing store which shall be used
102 */
103 public ParallelScatterZipCreator(final ExecutorService executorService, final ScatterGatherBackingStoreSupplier backingStoreSupplier) {
104 this(executorService, backingStoreSupplier, Deflater.DEFAULT_COMPRESSION);
105 }
106
107 /**
108 * Constructs a ParallelScatterZipCreator
109 *
110 * @param executorService The executorService to use. For technical reasons, this will be shut down by this class.
111 * @param backingStoreSupplier The supplier of backing store which shall be used
112 * @param compressionLevel The compression level used in compression, this value should be -1(default level) or between 0~9.
113 * @throws IllegalArgumentException if the compression level is illegal
114 * @since 1.21
115 */
116 public ParallelScatterZipCreator(final ExecutorService executorService, final ScatterGatherBackingStoreSupplier backingStoreSupplier,
117 final int compressionLevel) throws IllegalArgumentException {
118 if ((compressionLevel < Deflater.NO_COMPRESSION || compressionLevel > Deflater.BEST_COMPRESSION) && compressionLevel != Deflater.DEFAULT_COMPRESSION) {
119 throw new IllegalArgumentException("Compression level is expected between -1~9");
120 }
121
122 this.backingStoreSupplier = backingStoreSupplier;
123 this.executorService = executorService;
124 this.compressionLevel = compressionLevel;
125 }
126
127 /**
128 * Adds an archive entry to this archive.
129 * <p>
130 * This method is expected to be called from a single client thread
131 * </p>
132 *
133 * @param zipArchiveEntry The entry to add.
134 * @param source The source input stream supplier
135 */
136
137 public void addArchiveEntry(final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier source) {
138 submitStreamAwareCallable(createCallable(zipArchiveEntry, source));
139 }
140
141 /**
142 * Adds an archive entry to this archive.
143 * <p>
144 * This method is expected to be called from a single client thread
145 * </p>
146 *
147 * @param zipArchiveEntryRequestSupplier Should supply the entry to be added.
148 * @since 1.13
149 */
150 public void addArchiveEntry(final ZipArchiveEntryRequestSupplier zipArchiveEntryRequestSupplier) {
151 submitStreamAwareCallable(createCallable(zipArchiveEntryRequestSupplier));
152 }
153
154 private void closeAll() {
155 for (final ScatterZipOutputStream scatterStream : streams) {
156 IOUtils.closeQuietly(scatterStream);
157 }
158 }
159
160 /**
161 * Creates a callable that will compress the given archive entry.
162 *
163 * <p>
164 * This method is expected to be called from a single client thread.
165 * </p>
166 *
167 * Consider using {@link #addArchiveEntry addArchiveEntry}, which wraps this method and {@link #submitStreamAwareCallable submitStreamAwareCallable}. The
168 * most common use case for using {@link #createCallable createCallable} and {@link #submitStreamAwareCallable submitStreamAwareCallable} from a client is
169 * if you want to wrap the callable in something that can be prioritized by the supplied {@link ExecutorService}, for instance to process large or slow
170 * files first. Since the creation of the {@link ExecutorService} is handled by the client, all of this is up to the client.
171 *
172 * @param zipArchiveEntry The entry to add.
173 * @param source The source input stream supplier
174 * @return A callable that should subsequently be passed to #submitStreamAwareCallable, possibly in a wrapped/adapted from. The value of this callable is
175 * not used, but any exceptions happening inside the compression will be propagated through the callable.
176 */
177
178 public final Callable<ScatterZipOutputStream> createCallable(final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier source) {
179 final int method = zipArchiveEntry.getMethod();
180 if (method == ZipMethod.UNKNOWN_CODE) {
181 throw new IllegalArgumentException("Method must be set on zipArchiveEntry: " + zipArchiveEntry);
182 }
183 final ZipArchiveEntryRequest zipArchiveEntryRequest = createZipArchiveEntryRequest(zipArchiveEntry, source);
184 return () -> {
185 final ScatterZipOutputStream scatterStream = tlScatterStreams.get();
186 scatterStream.addArchiveEntry(zipArchiveEntryRequest);
187 return scatterStream;
188 };
189 }
190
191 /**
192 * Creates a callable that will compress archive entry supplied by {@link ZipArchiveEntryRequestSupplier}.
193 *
194 * <p>
195 * This method is expected to be called from a single client thread.
196 * </p>
197 *
198 * The same as {@link #createCallable(ZipArchiveEntry, InputStreamSupplier)}, but the archive entry to be added is supplied by a
199 * {@link ZipArchiveEntryRequestSupplier}.
200 *
201 * @see #createCallable(ZipArchiveEntry, InputStreamSupplier)
202 * @param zipArchiveEntryRequestSupplier Should supply the entry to be added.
203 * @return A callable that should subsequently be passed to #submitStreamAwareCallable, possibly in a wrapped/adapted from. The value of this callable is
204 * not used, but any exceptions happening inside the compression will be propagated through the callable.
205 * @since 1.13
206 */
207 public final Callable<ScatterZipOutputStream> createCallable(final ZipArchiveEntryRequestSupplier zipArchiveEntryRequestSupplier) {
208 return () -> {
209 final ScatterZipOutputStream scatterStream = tlScatterStreams.get();
210 scatterStream.addArchiveEntry(zipArchiveEntryRequestSupplier.get());
211 return scatterStream;
212 };
213 }
214
215 @SuppressWarnings("resource") // Caller closes
216 private ScatterZipOutputStream createDeferred(final ScatterGatherBackingStoreSupplier scatterGatherBackingStoreSupplier) throws IOException {
217 final ScatterGatherBackingStore bs = scatterGatherBackingStoreSupplier.get();
218 // lifecycle is bound to the ScatterZipOutputStream returned
219 final StreamCompressor sc = StreamCompressor.create(compressionLevel, bs); // NOSONAR
220 return new ScatterZipOutputStream(bs, sc);
221 }
222
223 /**
224 * Gets a message describing the overall statistics of the compression run
225 *
226 * @return A string
227 */
228 public ScatterStatistics getStatisticsMessage() {
229 return new ScatterStatistics(compressionDoneAt - startedAt, scatterDoneAt - compressionDoneAt);
230 }
231
232 /**
233 * Submits a callable for compression.
234 *
235 * @see ParallelScatterZipCreator#createCallable for details of if/when to use this.
236 * @param callable The callable to run, created by {@link #createCallable createCallable}, possibly wrapped by caller.
237 */
238 public final void submit(final Callable<? extends Object> callable) {
239 submitStreamAwareCallable(() -> {
240 callable.call();
241 return tlScatterStreams.get();
242 });
243 }
244
245 /**
246 * Submits a callable for compression.
247 *
248 * @see ParallelScatterZipCreator#createCallable for details of if/when to use this.
249 * @param callable The callable to run, created by {@link #createCallable createCallable}, possibly wrapped by caller.
250 * @since 1.19
251 */
252 public final void submitStreamAwareCallable(final Callable<? extends ScatterZipOutputStream> callable) {
253 futures.add(executorService.submit(callable));
254 }
255
256 /**
257 * Writes the contents this to the target {@link ZipArchiveOutputStream}.
258 * <p>
259 * It may be beneficial to write things like directories and manifest files to the targetStream before calling this method.
260 * </p>
261 * <p>
262 * Calling this method will shut down the {@link ExecutorService} used by this class. If any of the {@link Callable}s {@link #submitStreamAwareCallable
263 * submit}ted to this instance throws an exception, the archive cannot be created properly and this method will throw an exception.
264 * </p>
265 *
266 * @param targetStream The {@link ZipArchiveOutputStream} to receive the contents of the scatter streams
267 * @throws IOException If writing fails
268 * @throws InterruptedException If we get interrupted
269 * @throws ExecutionException If something happens in the parallel execution
270 */
271 public void writeTo(final ZipArchiveOutputStream targetStream) throws IOException, InterruptedException, ExecutionException {
272
273 try {
274 // Make sure we catch any exceptions from parallel phase
275 try {
276 for (final Future<?> future : futures) {
277 future.get();
278 }
279 } finally {
280 executorService.shutdown();
281 }
282
283 executorService.awaitTermination(1000 * 60L, TimeUnit.SECONDS); // == Infinity. We really *must* wait for this to complete
284
285 // It is important that all threads terminate before we go on, ensure happens-before relationship
286 compressionDoneAt = System.currentTimeMillis();
287
288 for (final Future<? extends ScatterZipOutputStream> future : futures) {
289 final ScatterZipOutputStream scatterStream = future.get();
290 scatterStream.zipEntryWriter().writeNextZipEntry(targetStream);
291 }
292
293 for (final ScatterZipOutputStream scatterStream : streams) {
294 scatterStream.close();
295 }
296
297 scatterDoneAt = System.currentTimeMillis();
298 } finally {
299 closeAll();
300 }
301 }
302 }