001/*
002 *  Licensed to the Apache Software Foundation (ASF) under one or more
003 *  contributor license agreements.  See the NOTICE file distributed with
004 *  this work for additional information regarding copyright ownership.
005 *  The ASF licenses this file to You under the Apache License, Version 2.0
006 *  (the "License"); you may not use this file except in compliance with
007 *  the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 *  Unless required by applicable law or agreed to in writing, software
012 *  distributed under the License is distributed on an "AS IS" BASIS,
013 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 *  See the License for the specific language governing permissions and
015 *  limitations under the License.
016 */
017package org.apache.commons.compress.archivers.zip;
018
019import static org.apache.commons.compress.archivers.zip.ZipArchiveEntryRequest.createZipArchiveEntryRequest;
020
021import java.io.IOException;
022import java.io.UncheckedIOException;
023import java.util.Deque;
024import java.util.concurrent.Callable;
025import java.util.concurrent.ConcurrentLinkedDeque;
026import java.util.concurrent.ExecutionException;
027import java.util.concurrent.ExecutorService;
028import java.util.concurrent.Executors;
029import java.util.concurrent.Future;
030import java.util.concurrent.TimeUnit;
031import java.util.zip.Deflater;
032
033import org.apache.commons.compress.parallel.InputStreamSupplier;
034import org.apache.commons.compress.parallel.ScatterGatherBackingStore;
035import org.apache.commons.compress.parallel.ScatterGatherBackingStoreSupplier;
036
037/**
038 * Creates a ZIP in parallel by using multiple threadlocal {@link ScatterZipOutputStream} instances.
039 * <p>
040 * Note that until 1.18, this class generally made no guarantees about the order of things written to the output file. Things that needed to come in a specific
041 * order (manifests, directories) had to be handled by the client of this class, usually by writing these things to the {@link ZipArchiveOutputStream}
042 * <em>before</em> calling {@link #writeTo writeTo} on this class.
043 * </p>
044 * <p>
045 * The client can supply an {@link java.util.concurrent.ExecutorService}, but for reasons of memory model consistency, this will be shut down by this class
046 * prior to completion.
047 * </p>
048 *
049 * @since 1.10
050 */
051public class ParallelScatterZipCreator {
052
053    private final Deque<ScatterZipOutputStream> streams = new ConcurrentLinkedDeque<>();
054    private final ExecutorService executorService;
055    private final ScatterGatherBackingStoreSupplier backingStoreSupplier;
056
057    private final Deque<Future<? extends ScatterZipOutputStream>> futures = new ConcurrentLinkedDeque<>();
058    private final long startedAt = System.currentTimeMillis();
059    private long compressionDoneAt;
060    private long scatterDoneAt;
061
062    private final int compressionLevel;
063
064    private final ThreadLocal<ScatterZipOutputStream> tlScatterStreams = new ThreadLocal<ScatterZipOutputStream>() {
065        @Override
066        protected ScatterZipOutputStream initialValue() {
067            try {
068                final ScatterZipOutputStream scatterStream = createDeferred(backingStoreSupplier);
069                streams.add(scatterStream);
070                return scatterStream;
071            } catch (final IOException e) {
072                throw new UncheckedIOException(e); // NOSONAR
073            }
074        }
075    };
076
077    /**
078     * Constructs a ParallelScatterZipCreator with default threads, which is set to the number of available processors, as defined by
079     * {@link Runtime#availableProcessors}
080     */
081    public ParallelScatterZipCreator() {
082        this(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
083    }
084
085    /**
086     * Constructs a ParallelScatterZipCreator
087     *
088     * @param executorService The executorService to use for parallel scheduling. For technical reasons, this will be shut down by this class.
089     */
090    public ParallelScatterZipCreator(final ExecutorService executorService) {
091        this(executorService, new DefaultBackingStoreSupplier(null));
092    }
093
094    /**
095     * Constructs a ParallelScatterZipCreator
096     *
097     * @param executorService      The executorService to use. For technical reasons, this will be shut down by this class.
098     * @param backingStoreSupplier The supplier of backing store which shall be used
099     */
100    public ParallelScatterZipCreator(final ExecutorService executorService, final ScatterGatherBackingStoreSupplier backingStoreSupplier) {
101        this(executorService, backingStoreSupplier, Deflater.DEFAULT_COMPRESSION);
102    }
103
104    /**
105     * Constructs a ParallelScatterZipCreator
106     *
107     * @param executorService      The executorService to use. For technical reasons, this will be shut down by this class.
108     * @param backingStoreSupplier The supplier of backing store which shall be used
109     * @param compressionLevel     The compression level used in compression, this value should be -1(default level) or between 0~9.
110     * @throws IllegalArgumentException if the compression level is illegal
111     * @since 1.21
112     */
113    public ParallelScatterZipCreator(final ExecutorService executorService, final ScatterGatherBackingStoreSupplier backingStoreSupplier,
114            final int compressionLevel) throws IllegalArgumentException {
115        if ((compressionLevel < Deflater.NO_COMPRESSION || compressionLevel > Deflater.BEST_COMPRESSION) && compressionLevel != Deflater.DEFAULT_COMPRESSION) {
116            throw new IllegalArgumentException("Compression level is expected between -1~9");
117        }
118
119        this.backingStoreSupplier = backingStoreSupplier;
120        this.executorService = executorService;
121        this.compressionLevel = compressionLevel;
122    }
123
124    /**
125     * Adds an archive entry to this archive.
126     * <p>
127     * This method is expected to be called from a single client thread
128     * </p>
129     *
130     * @param zipArchiveEntry The entry to add.
131     * @param source          The source input stream supplier
132     */
133
134    public void addArchiveEntry(final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier source) {
135        submitStreamAwareCallable(createCallable(zipArchiveEntry, source));
136    }
137
138    /**
139     * Adds an archive entry to this archive.
140     * <p>
141     * This method is expected to be called from a single client thread
142     * </p>
143     *
144     * @param zipArchiveEntryRequestSupplier Should supply the entry to be added.
145     * @since 1.13
146     */
147    public void addArchiveEntry(final ZipArchiveEntryRequestSupplier zipArchiveEntryRequestSupplier) {
148        submitStreamAwareCallable(createCallable(zipArchiveEntryRequestSupplier));
149    }
150
151    private void closeAll() {
152        for (final ScatterZipOutputStream scatterStream : streams) {
153            try {
154                scatterStream.close();
155            } catch (final IOException ex) { // NOSONAR
156                // no way to properly log this
157            }
158        }
159    }
160
161    /**
162     * Creates a callable that will compress the given archive entry.
163     *
164     * <p>
165     * This method is expected to be called from a single client thread.
166     * </p>
167     *
168     * Consider using {@link #addArchiveEntry addArchiveEntry}, which wraps this method and {@link #submitStreamAwareCallable submitStreamAwareCallable}. The
169     * most common use case for using {@link #createCallable createCallable} and {@link #submitStreamAwareCallable submitStreamAwareCallable} from a client is
170     * if you want to wrap the callable in something that can be prioritized by the supplied {@link ExecutorService}, for instance to process large or slow
171     * files first. Since the creation of the {@link ExecutorService} is handled by the client, all of this is up to the client.
172     *
173     * @param zipArchiveEntry The entry to add.
174     * @param source          The source input stream supplier
175     * @return A callable that should subsequently be passed to #submitStreamAwareCallable, possibly in a wrapped/adapted from. The value of this callable is
176     *         not used, but any exceptions happening inside the compression will be propagated through the callable.
177     */
178
179    public final Callable<ScatterZipOutputStream> createCallable(final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier source) {
180        final int method = zipArchiveEntry.getMethod();
181        if (method == ZipMethod.UNKNOWN_CODE) {
182            throw new IllegalArgumentException("Method must be set on zipArchiveEntry: " + zipArchiveEntry);
183        }
184        final ZipArchiveEntryRequest zipArchiveEntryRequest = createZipArchiveEntryRequest(zipArchiveEntry, source);
185        return () -> {
186            final ScatterZipOutputStream scatterStream = tlScatterStreams.get();
187            scatterStream.addArchiveEntry(zipArchiveEntryRequest);
188            return scatterStream;
189        };
190    }
191
192    /**
193     * Creates a callable that will compress archive entry supplied by {@link ZipArchiveEntryRequestSupplier}.
194     *
195     * <p>
196     * This method is expected to be called from a single client thread.
197     * </p>
198     *
199     * The same as {@link #createCallable(ZipArchiveEntry, InputStreamSupplier)}, but the archive entry to be added is supplied by a
200     * {@link ZipArchiveEntryRequestSupplier}.
201     *
202     * @see #createCallable(ZipArchiveEntry, InputStreamSupplier)
203     *
204     * @param zipArchiveEntryRequestSupplier Should supply the entry to be added.
205     * @return A callable that should subsequently be passed to #submitStreamAwareCallable, possibly in a wrapped/adapted from. The value of this callable is
206     *         not used, but any exceptions happening inside the compression will be propagated through the callable.
207     * @since 1.13
208     */
209    public final Callable<ScatterZipOutputStream> createCallable(final ZipArchiveEntryRequestSupplier zipArchiveEntryRequestSupplier) {
210        return () -> {
211            final ScatterZipOutputStream scatterStream = tlScatterStreams.get();
212            scatterStream.addArchiveEntry(zipArchiveEntryRequestSupplier.get());
213            return scatterStream;
214        };
215    }
216
217    @SuppressWarnings("resource") // Caller closes
218    private ScatterZipOutputStream createDeferred(final ScatterGatherBackingStoreSupplier scatterGatherBackingStoreSupplier) throws IOException {
219        final ScatterGatherBackingStore bs = scatterGatherBackingStoreSupplier.get();
220        // lifecycle is bound to the ScatterZipOutputStream returned
221        final StreamCompressor sc = StreamCompressor.create(compressionLevel, bs); // NOSONAR
222        return new ScatterZipOutputStream(bs, sc);
223    }
224
225    /**
226     * Gets a message describing the overall statistics of the compression run
227     *
228     * @return A string
229     */
230    public ScatterStatistics getStatisticsMessage() {
231        return new ScatterStatistics(compressionDoneAt - startedAt, scatterDoneAt - compressionDoneAt);
232    }
233
234    /**
235     * Submits a callable for compression.
236     *
237     * @see ParallelScatterZipCreator#createCallable for details of if/when to use this.
238     *
239     * @param callable The callable to run, created by {@link #createCallable createCallable}, possibly wrapped by caller.
240     */
241    public final void submit(final Callable<? extends Object> callable) {
242        submitStreamAwareCallable(() -> {
243            callable.call();
244            return tlScatterStreams.get();
245        });
246    }
247
248    /**
249     * Submits a callable for compression.
250     *
251     * @see ParallelScatterZipCreator#createCallable for details of if/when to use this.
252     *
253     * @param callable The callable to run, created by {@link #createCallable createCallable}, possibly wrapped by caller.
254     * @since 1.19
255     */
256    public final void submitStreamAwareCallable(final Callable<? extends ScatterZipOutputStream> callable) {
257        futures.add(executorService.submit(callable));
258    }
259
260    /**
261     * Writes the contents this to the target {@link ZipArchiveOutputStream}.
262     * <p>
263     * It may be beneficial to write things like directories and manifest files to the targetStream before calling this method.
264     * </p>
265     * <p>
266     * Calling this method will shut down the {@link ExecutorService} used by this class. If any of the {@link Callable}s {@link #submitStreamAwareCallable
267     * submit}ted to this instance throws an exception, the archive can not be created properly and this method will throw an exception.
268     * </p>
269     *
270     * @param targetStream The {@link ZipArchiveOutputStream} to receive the contents of the scatter streams
271     * @throws IOException          If writing fails
272     * @throws InterruptedException If we get interrupted
273     * @throws ExecutionException   If something happens in the parallel execution
274     */
275    public void writeTo(final ZipArchiveOutputStream targetStream) throws IOException, InterruptedException, ExecutionException {
276
277        try {
278            // Make sure we catch any exceptions from parallel phase
279            try {
280                for (final Future<?> future : futures) {
281                    future.get();
282                }
283            } finally {
284                executorService.shutdown();
285            }
286
287            executorService.awaitTermination(1000 * 60L, TimeUnit.SECONDS); // == Infinity. We really *must* wait for this to complete
288
289            // It is important that all threads terminate before we go on, ensure happens-before relationship
290            compressionDoneAt = System.currentTimeMillis();
291
292            for (final Future<? extends ScatterZipOutputStream> future : futures) {
293                final ScatterZipOutputStream scatterStream = future.get();
294                scatterStream.zipEntryWriter().writeNextZipEntry(targetStream);
295            }
296
297            for (final ScatterZipOutputStream scatterStream : streams) {
298                scatterStream.close();
299            }
300
301            scatterDoneAt = System.currentTimeMillis();
302        } finally {
303            closeAll();
304        }
305    }
306}