001/*
002 *  Licensed to the Apache Software Foundation (ASF) under one or more
003 *  contributor license agreements.  See the NOTICE file distributed with
004 *  this work for additional information regarding copyright ownership.
005 *  The ASF licenses this file to You under the Apache License, Version 2.0
006 *  (the "License"); you may not use this file except in compliance with
007 *  the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 *  Unless required by applicable law or agreed to in writing, software
012 *  distributed under the License is distributed on an "AS IS" BASIS,
013 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 *  See the License for the specific language governing permissions and
015 *  limitations under the License.
016 *
017 */
018package org.apache.commons.compress.archivers.zip;
019
020import org.apache.commons.compress.parallel.FileBasedScatterGatherBackingStore;
021import org.apache.commons.compress.parallel.InputStreamSupplier;
022import org.apache.commons.compress.parallel.ScatterGatherBackingStore;
023import org.apache.commons.compress.parallel.ScatterGatherBackingStoreSupplier;
024
025import java.io.File;
026import java.io.IOException;
027import java.util.Deque;
028import java.util.concurrent.Callable;
029import java.util.concurrent.ConcurrentLinkedDeque;
030import java.util.concurrent.ExecutionException;
031import java.util.concurrent.ExecutorService;
032import java.util.concurrent.Executors;
033import java.util.concurrent.Future;
034import java.util.concurrent.TimeUnit;
035import java.util.concurrent.atomic.AtomicInteger;
036import java.util.zip.Deflater;
037
038import static org.apache.commons.compress.archivers.zip.ZipArchiveEntryRequest.createZipArchiveEntryRequest;
039
040/**
041 * Creates a zip in parallel by using multiple threadlocal {@link ScatterZipOutputStream} instances.
042 * <p>
043 * Note that until 1.18, this class generally made no guarantees about the order of things written to
044 * the output file. Things that needed to come in a specific order (manifests, directories)
045 * had to be handled by the client of this class, usually by writing these things to the
046 * {@link ZipArchiveOutputStream} <em>before</em> calling {@link #writeTo writeTo} on this class.</p>
047 * <p>
048 * The client can supply an {@link java.util.concurrent.ExecutorService}, but for reasons of
049 * memory model consistency, this will be shut down by this class prior to completion.
050 * </p>
051 * @since 1.10
052 */
053public class ParallelScatterZipCreator {
054    private final Deque<ScatterZipOutputStream> streams = new ConcurrentLinkedDeque<>();
055    private final ExecutorService es;
056    private final ScatterGatherBackingStoreSupplier backingStoreSupplier;
057    private final Deque<Future<? extends ScatterZipOutputStream>> futures = new ConcurrentLinkedDeque<>();
058
059    private final long startedAt = System.currentTimeMillis();
060    private long compressionDoneAt;
061    private long scatterDoneAt;
062    private final int compressionLevel;
063
064    private static class DefaultBackingStoreSupplier implements ScatterGatherBackingStoreSupplier {
065        final AtomicInteger storeNum = new AtomicInteger(0);
066
067        @Override
068        public ScatterGatherBackingStore get() throws IOException {
069            final File tempFile = File.createTempFile("parallelscatter", "n" + storeNum.incrementAndGet());
070            return new FileBasedScatterGatherBackingStore(tempFile);
071        }
072    }
073
074    private ScatterZipOutputStream createDeferred(final ScatterGatherBackingStoreSupplier scatterGatherBackingStoreSupplier)
075            throws IOException {
076        final ScatterGatherBackingStore bs = scatterGatherBackingStoreSupplier.get();
077        // lifecycle is bound to the ScatterZipOutputStream returned
078        final StreamCompressor sc = StreamCompressor.create(compressionLevel, bs); //NOSONAR
079        return new ScatterZipOutputStream(bs, sc);
080    }
081
082    private final ThreadLocal<ScatterZipOutputStream> tlScatterStreams = new ThreadLocal<ScatterZipOutputStream>() {
083        @Override
084        protected ScatterZipOutputStream initialValue() {
085            try {
086                final ScatterZipOutputStream scatterStream = createDeferred(backingStoreSupplier);
087                streams.add(scatterStream);
088                return scatterStream;
089            } catch (final IOException e) {
090                throw new RuntimeException(e); //NOSONAR
091            }
092        }
093    };
094
095    /**
096     * Create a ParallelScatterZipCreator with default threads, which is set to the number of available
097     * processors, as defined by {@link java.lang.Runtime#availableProcessors}
098     */
099    public ParallelScatterZipCreator() {
100        this(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
101    }
102
103    /**
104     * Create a ParallelScatterZipCreator
105     *
106     * @param executorService The executorService to use for parallel scheduling. For technical reasons,
107     *                        this will be shut down by this class.
108     */
109    public ParallelScatterZipCreator(final ExecutorService executorService) {
110        this(executorService, new DefaultBackingStoreSupplier());
111    }
112
113    /**
114     * Create a ParallelScatterZipCreator
115     *
116     * @param executorService The executorService to use. For technical reasons, this will be shut down
117     *                        by this class.
118     * @param backingStoreSupplier The supplier of backing store which shall be used
119     */
120    public ParallelScatterZipCreator(final ExecutorService executorService,
121                                     final ScatterGatherBackingStoreSupplier backingStoreSupplier) {
122        this(executorService, backingStoreSupplier, Deflater.DEFAULT_COMPRESSION);
123    }
124
125    /**
126     * Create a ParallelScatterZipCreator
127     *
128     * @param executorService      The executorService to use. For technical reasons, this will be shut down
129     *                             by this class.
130     * @param backingStoreSupplier The supplier of backing store which shall be used
131     * @param compressionLevel     The compression level used in compression, this value should be
132     *                             -1(default level) or between 0~9.
133     * @throws IllegalArgumentException if the compression level is illegal
134     * @since 1.21
135     */
136    public ParallelScatterZipCreator(final ExecutorService executorService,
137                                     final ScatterGatherBackingStoreSupplier backingStoreSupplier,
138                                     final int compressionLevel) throws IllegalArgumentException {
139        if ((compressionLevel < Deflater.NO_COMPRESSION || compressionLevel > Deflater.BEST_COMPRESSION)
140                && compressionLevel != Deflater.DEFAULT_COMPRESSION) {
141            throw new IllegalArgumentException("Compression level is expected between -1~9");
142        }
143
144        this.backingStoreSupplier = backingStoreSupplier;
145        es = executorService;
146        this.compressionLevel = compressionLevel;
147    }
148
149    /**
150     * Adds an archive entry to this archive.
151     * <p>
152     * This method is expected to be called from a single client thread
153     * </p>
154     *
155     * @param zipArchiveEntry The entry to add.
156     * @param source          The source input stream supplier
157     */
158
159    public void addArchiveEntry(final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier source) {
160        submitStreamAwareCallable(createCallable(zipArchiveEntry, source));
161    }
162
163    /**
164     * Adds an archive entry to this archive.
165     * <p>
166     * This method is expected to be called from a single client thread
167     * </p>
168     *
169     * @param zipArchiveEntryRequestSupplier Should supply the entry to be added.
170     * @since 1.13
171     */
172    public void addArchiveEntry(final ZipArchiveEntryRequestSupplier zipArchiveEntryRequestSupplier) {
173        submitStreamAwareCallable(createCallable(zipArchiveEntryRequestSupplier));
174    }
175
176    /**
177     * Submit a callable for compression.
178     *
179     * @see ParallelScatterZipCreator#createCallable for details of if/when to use this.
180     *
181     * @param callable The callable to run, created by {@link #createCallable createCallable}, possibly wrapped by caller.
182     */
183    public final void submit(final Callable<? extends Object> callable) {
184        submitStreamAwareCallable(() -> {
185            callable.call();
186            return tlScatterStreams.get();
187        });
188    }
189
190    /**
191     * Submit a callable for compression.
192     *
193     * @see ParallelScatterZipCreator#createCallable for details of if/when to use this.
194     *
195     * @param callable The callable to run, created by {@link #createCallable createCallable}, possibly wrapped by caller.
196     * @since 1.19
197     */
198    public final void submitStreamAwareCallable(final Callable<? extends ScatterZipOutputStream> callable) {
199        futures.add(es.submit(callable));
200    }
201
202    /**
203     * Create a callable that will compress the given archive entry.
204     *
205     * <p>This method is expected to be called from a single client thread.</p>
206     *
207     * Consider using {@link #addArchiveEntry addArchiveEntry}, which wraps this method and {@link #submitStreamAwareCallable submitStreamAwareCallable}.
208     * The most common use case for using {@link #createCallable createCallable} and {@link #submitStreamAwareCallable submitStreamAwareCallable} from a
209     * client is if you want to wrap the callable in something that can be prioritized by the supplied
210     * {@link ExecutorService}, for instance to process large or slow files first.
211     * Since the creation of the {@link ExecutorService} is handled by the client, all of this is up to the client.
212     *
213     * @param zipArchiveEntry The entry to add.
214     * @param source          The source input stream supplier
215     * @return A callable that should subsequently passed to #submitStreamAwareCallable, possibly in a wrapped/adapted from. The
216     * value of this callable is not used, but any exceptions happening inside the compression
217     * will be propagated through the callable.
218     */
219
220    public final Callable<ScatterZipOutputStream> createCallable(final ZipArchiveEntry zipArchiveEntry,
221        final InputStreamSupplier source) {
222        final int method = zipArchiveEntry.getMethod();
223        if (method == ZipMethod.UNKNOWN_CODE) {
224            throw new IllegalArgumentException("Method must be set on zipArchiveEntry: " + zipArchiveEntry);
225        }
226        final ZipArchiveEntryRequest zipArchiveEntryRequest = createZipArchiveEntryRequest(zipArchiveEntry, source);
227        return () -> {
228            final ScatterZipOutputStream scatterStream = tlScatterStreams.get();
229            scatterStream.addArchiveEntry(zipArchiveEntryRequest);
230            return scatterStream;
231        };
232    }
233
234    /**
235     * Create a callable that will compress archive entry supplied by {@link ZipArchiveEntryRequestSupplier}.
236     *
237     * <p>This method is expected to be called from a single client thread.</p>
238     *
239     * The same as {@link #createCallable(ZipArchiveEntry, InputStreamSupplier)}, but the archive entry
240     * to be added is supplied by a {@link ZipArchiveEntryRequestSupplier}.
241     *
242     * @see #createCallable(ZipArchiveEntry, InputStreamSupplier)
243     *
244     * @param zipArchiveEntryRequestSupplier Should supply the entry to be added.
245     * @return A callable that should subsequently passed to #submitStreamAwareCallable, possibly in a wrapped/adapted from. The
246     * value of this callable is not used, but any exceptions happening inside the compression
247     * will be propagated through the callable.
248     * @since 1.13
249     */
250    public final Callable<ScatterZipOutputStream> createCallable(final ZipArchiveEntryRequestSupplier zipArchiveEntryRequestSupplier) {
251        return () -> {
252            final ScatterZipOutputStream scatterStream = tlScatterStreams.get();
253            scatterStream.addArchiveEntry(zipArchiveEntryRequestSupplier.get());
254            return scatterStream;
255        };
256    }
257
258    /**
259     * Write the contents this to the target {@link ZipArchiveOutputStream}.
260     * <p>
261     * It may be beneficial to write things like directories and manifest files to the targetStream
262     * before calling this method.
263     * </p>
264     *
265     * <p>Calling this method will shut down the {@link ExecutorService} used by this class. If any of the {@link
266     * Callable}s {@link #submitStreamAwareCallable submit}ted to this instance throws an exception, the archive can not be created properly and
267     * this method will throw an exception.</p>
268     *
269     * @param targetStream The {@link ZipArchiveOutputStream} to receive the contents of the scatter streams
270     * @throws IOException          If writing fails
271     * @throws InterruptedException If we get interrupted
272     * @throws ExecutionException   If something happens in the parallel execution
273     */
274    public void writeTo(final ZipArchiveOutputStream targetStream)
275            throws IOException, InterruptedException, ExecutionException {
276
277        try {
278            // Make sure we catch any exceptions from parallel phase
279            try {
280                for (final Future<?> future : futures) {
281                    future.get();
282                }
283            } finally {
284                es.shutdown();
285            }
286
287            es.awaitTermination(1000 * 60L, TimeUnit.SECONDS);  // == Infinity. We really *must* wait for this to complete
288
289            // It is important that all threads terminate before we go on, ensure happens-before relationship
290            compressionDoneAt = System.currentTimeMillis();
291
292            for (final Future<? extends ScatterZipOutputStream> future : futures) {
293                final ScatterZipOutputStream scatterStream = future.get();
294                scatterStream.zipEntryWriter().writeNextZipEntry(targetStream);
295            }
296
297            for (final ScatterZipOutputStream scatterStream : streams) {
298                scatterStream.close();
299            }
300
301            scatterDoneAt = System.currentTimeMillis();
302        } finally {
303            closeAll();
304        }
305    }
306
307    /**
308     * Returns a message describing the overall statistics of the compression run
309     *
310     * @return A string
311     */
312    public ScatterStatistics getStatisticsMessage() {
313        return new ScatterStatistics(compressionDoneAt - startedAt, scatterDoneAt - compressionDoneAt);
314    }
315
316    private void closeAll() {
317        for (final ScatterZipOutputStream scatterStream : streams) {
318            try {
319                scatterStream.close();
320            } catch (final IOException ex) { //NOSONAR
321                // no way to properly log this
322            }
323        }
324    }
325}
326