001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   https://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.commons.compress.archivers.zip;
020
021import static org.apache.commons.compress.archivers.zip.ZipArchiveEntryRequest.createZipArchiveEntryRequest;
022
023import java.io.IOException;
024import java.io.UncheckedIOException;
025import java.util.Deque;
026import java.util.concurrent.Callable;
027import java.util.concurrent.ConcurrentLinkedDeque;
028import java.util.concurrent.ExecutionException;
029import java.util.concurrent.ExecutorService;
030import java.util.concurrent.Executors;
031import java.util.concurrent.Future;
032import java.util.concurrent.TimeUnit;
033import java.util.zip.Deflater;
034
035import org.apache.commons.compress.parallel.InputStreamSupplier;
036import org.apache.commons.compress.parallel.ScatterGatherBackingStore;
037import org.apache.commons.compress.parallel.ScatterGatherBackingStoreSupplier;
038import org.apache.commons.io.IOUtils;
039
040/**
041 * Creates a ZIP in parallel by using multiple threadlocal {@link ScatterZipOutputStream} instances.
042 * <p>
043 * Note that until 1.18, this class generally made no guarantees about the order of things written to the output file. Things that needed to come in a specific
044 * order (manifests, directories) had to be handled by the client of this class, usually by writing these things to the {@link ZipArchiveOutputStream}
045 * <em>before</em> calling {@link #writeTo writeTo} on this class.
046 * </p>
047 * <p>
048 * The client can supply an {@link java.util.concurrent.ExecutorService}, but for reasons of memory model consistency, this will be shut down by this class
049 * prior to completion.
050 * </p>
051 *
052 * @since 1.10
053 */
054public class ParallelScatterZipCreator {
055
056    private final Deque<ScatterZipOutputStream> streams = new ConcurrentLinkedDeque<>();
057    private final ExecutorService executorService;
058    private final ScatterGatherBackingStoreSupplier backingStoreSupplier;
059
060    private final Deque<Future<? extends ScatterZipOutputStream>> futures = new ConcurrentLinkedDeque<>();
061    private final long startedAt = System.currentTimeMillis();
062    private long compressionDoneAt;
063    private long scatterDoneAt;
064
065    private final int compressionLevel;
066
067    private final ThreadLocal<ScatterZipOutputStream> tlScatterStreams = new ThreadLocal<ScatterZipOutputStream>() {
068        @Override
069        protected ScatterZipOutputStream initialValue() {
070            try {
071                final ScatterZipOutputStream scatterStream = createDeferred(backingStoreSupplier);
072                streams.add(scatterStream);
073                return scatterStream;
074            } catch (final IOException e) {
075                throw new UncheckedIOException(e); // NOSONAR
076            }
077        }
078    };
079
080    /**
081     * Constructs a ParallelScatterZipCreator with default threads, which is set to the number of available processors, as defined by
082     * {@link Runtime#availableProcessors}
083     */
084    public ParallelScatterZipCreator() {
085        this(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
086    }
087
088    /**
089     * Constructs a ParallelScatterZipCreator
090     *
091     * @param executorService The executorService to use for parallel scheduling. For technical reasons, this will be shut down by this class.
092     */
093    public ParallelScatterZipCreator(final ExecutorService executorService) {
094        this(executorService, new DefaultBackingStoreSupplier(null));
095    }
096
097    /**
098     * Constructs a ParallelScatterZipCreator
099     *
100     * @param executorService      The executorService to use. For technical reasons, this will be shut down by this class.
101     * @param backingStoreSupplier The supplier of backing store which shall be used
102     */
103    public ParallelScatterZipCreator(final ExecutorService executorService, final ScatterGatherBackingStoreSupplier backingStoreSupplier) {
104        this(executorService, backingStoreSupplier, Deflater.DEFAULT_COMPRESSION);
105    }
106
107    /**
108     * Constructs a ParallelScatterZipCreator
109     *
110     * @param executorService      The executorService to use. For technical reasons, this will be shut down by this class.
111     * @param backingStoreSupplier The supplier of backing store which shall be used
112     * @param compressionLevel     The compression level used in compression, this value should be -1(default level) or between 0~9.
113     * @throws IllegalArgumentException if the compression level is illegal
114     * @since 1.21
115     */
116    public ParallelScatterZipCreator(final ExecutorService executorService, final ScatterGatherBackingStoreSupplier backingStoreSupplier,
117            final int compressionLevel) throws IllegalArgumentException {
118        if ((compressionLevel < Deflater.NO_COMPRESSION || compressionLevel > Deflater.BEST_COMPRESSION) && compressionLevel != Deflater.DEFAULT_COMPRESSION) {
119            throw new IllegalArgumentException("Compression level is expected between -1~9");
120        }
121
122        this.backingStoreSupplier = backingStoreSupplier;
123        this.executorService = executorService;
124        this.compressionLevel = compressionLevel;
125    }
126
127    /**
128     * Adds an archive entry to this archive.
129     * <p>
130     * This method is expected to be called from a single client thread
131     * </p>
132     *
133     * @param zipArchiveEntry The entry to add.
134     * @param source          The source input stream supplier
135     */
136
137    public void addArchiveEntry(final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier source) {
138        submitStreamAwareCallable(createCallable(zipArchiveEntry, source));
139    }
140
141    /**
142     * Adds an archive entry to this archive.
143     * <p>
144     * This method is expected to be called from a single client thread
145     * </p>
146     *
147     * @param zipArchiveEntryRequestSupplier Should supply the entry to be added.
148     * @since 1.13
149     */
150    public void addArchiveEntry(final ZipArchiveEntryRequestSupplier zipArchiveEntryRequestSupplier) {
151        submitStreamAwareCallable(createCallable(zipArchiveEntryRequestSupplier));
152    }
153
154    private void closeAll() {
155        for (final ScatterZipOutputStream scatterStream : streams) {
156            IOUtils.closeQuietly(scatterStream);
157        }
158    }
159
160    /**
161     * Creates a callable that will compress the given archive entry.
162     *
163     * <p>
164     * This method is expected to be called from a single client thread.
165     * </p>
166     *
167     * Consider using {@link #addArchiveEntry addArchiveEntry}, which wraps this method and {@link #submitStreamAwareCallable submitStreamAwareCallable}. The
168     * most common use case for using {@link #createCallable createCallable} and {@link #submitStreamAwareCallable submitStreamAwareCallable} from a client is
169     * if you want to wrap the callable in something that can be prioritized by the supplied {@link ExecutorService}, for instance to process large or slow
170     * files first. Since the creation of the {@link ExecutorService} is handled by the client, all of this is up to the client.
171     *
172     * @param zipArchiveEntry The entry to add.
173     * @param source          The source input stream supplier
174     * @return A callable that should subsequently be passed to #submitStreamAwareCallable, possibly in a wrapped/adapted from. The value of this callable is
175     *         not used, but any exceptions happening inside the compression will be propagated through the callable.
176     */
177
178    public final Callable<ScatterZipOutputStream> createCallable(final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier source) {
179        final int method = zipArchiveEntry.getMethod();
180        if (method == ZipMethod.UNKNOWN_CODE) {
181            throw new IllegalArgumentException("Method must be set on zipArchiveEntry: " + zipArchiveEntry);
182        }
183        final ZipArchiveEntryRequest zipArchiveEntryRequest = createZipArchiveEntryRequest(zipArchiveEntry, source);
184        return () -> {
185            final ScatterZipOutputStream scatterStream = tlScatterStreams.get();
186            scatterStream.addArchiveEntry(zipArchiveEntryRequest);
187            return scatterStream;
188        };
189    }
190
191    /**
192     * Creates a callable that will compress archive entry supplied by {@link ZipArchiveEntryRequestSupplier}.
193     *
194     * <p>
195     * This method is expected to be called from a single client thread.
196     * </p>
197     *
198     * The same as {@link #createCallable(ZipArchiveEntry, InputStreamSupplier)}, but the archive entry to be added is supplied by a
199     * {@link ZipArchiveEntryRequestSupplier}.
200     *
201     * @see #createCallable(ZipArchiveEntry, InputStreamSupplier)
202     * @param zipArchiveEntryRequestSupplier Should supply the entry to be added.
203     * @return A callable that should subsequently be passed to #submitStreamAwareCallable, possibly in a wrapped/adapted from. The value of this callable is
204     *         not used, but any exceptions happening inside the compression will be propagated through the callable.
205     * @since 1.13
206     */
207    public final Callable<ScatterZipOutputStream> createCallable(final ZipArchiveEntryRequestSupplier zipArchiveEntryRequestSupplier) {
208        return () -> {
209            final ScatterZipOutputStream scatterStream = tlScatterStreams.get();
210            scatterStream.addArchiveEntry(zipArchiveEntryRequestSupplier.get());
211            return scatterStream;
212        };
213    }
214
215    @SuppressWarnings("resource") // Caller closes
216    private ScatterZipOutputStream createDeferred(final ScatterGatherBackingStoreSupplier scatterGatherBackingStoreSupplier) throws IOException {
217        final ScatterGatherBackingStore bs = scatterGatherBackingStoreSupplier.get();
218        // lifecycle is bound to the ScatterZipOutputStream returned
219        final StreamCompressor sc = StreamCompressor.create(compressionLevel, bs); // NOSONAR
220        return new ScatterZipOutputStream(bs, sc);
221    }
222
223    /**
224     * Gets a message describing the overall statistics of the compression run
225     *
226     * @return A string
227     */
228    public ScatterStatistics getStatisticsMessage() {
229        return new ScatterStatistics(compressionDoneAt - startedAt, scatterDoneAt - compressionDoneAt);
230    }
231
232    /**
233     * Submits a callable for compression.
234     *
235     * @see ParallelScatterZipCreator#createCallable for details of if/when to use this.
236     * @param callable The callable to run, created by {@link #createCallable createCallable}, possibly wrapped by caller.
237     */
238    public final void submit(final Callable<? extends Object> callable) {
239        submitStreamAwareCallable(() -> {
240            callable.call();
241            return tlScatterStreams.get();
242        });
243    }
244
245    /**
246     * Submits a callable for compression.
247     *
248     * @see ParallelScatterZipCreator#createCallable for details of if/when to use this.
249     * @param callable The callable to run, created by {@link #createCallable createCallable}, possibly wrapped by caller.
250     * @since 1.19
251     */
252    public final void submitStreamAwareCallable(final Callable<? extends ScatterZipOutputStream> callable) {
253        futures.add(executorService.submit(callable));
254    }
255
256    /**
257     * Writes the contents this to the target {@link ZipArchiveOutputStream}.
258     * <p>
259     * It may be beneficial to write things like directories and manifest files to the targetStream before calling this method.
260     * </p>
261     * <p>
262     * Calling this method will shut down the {@link ExecutorService} used by this class. If any of the {@link Callable}s {@link #submitStreamAwareCallable
263     * submit}ted to this instance throws an exception, the archive cannot be created properly and this method will throw an exception.
264     * </p>
265     *
266     * @param targetStream The {@link ZipArchiveOutputStream} to receive the contents of the scatter streams
267     * @throws IOException          If writing fails
268     * @throws InterruptedException If we get interrupted
269     * @throws ExecutionException   If something happens in the parallel execution
270     */
271    public void writeTo(final ZipArchiveOutputStream targetStream) throws IOException, InterruptedException, ExecutionException {
272
273        try {
274            // Make sure we catch any exceptions from parallel phase
275            try {
276                for (final Future<?> future : futures) {
277                    future.get();
278                }
279            } finally {
280                executorService.shutdown();
281            }
282
283            executorService.awaitTermination(1000 * 60L, TimeUnit.SECONDS); // == Infinity. We really *must* wait for this to complete
284
285            // It is important that all threads terminate before we go on, ensure happens-before relationship
286            compressionDoneAt = System.currentTimeMillis();
287
288            for (final Future<? extends ScatterZipOutputStream> future : futures) {
289                final ScatterZipOutputStream scatterStream = future.get();
290                scatterStream.zipEntryWriter().writeNextZipEntry(targetStream);
291            }
292
293            for (final ScatterZipOutputStream scatterStream : streams) {
294                scatterStream.close();
295            }
296
297            scatterDoneAt = System.currentTimeMillis();
298        } finally {
299            closeAll();
300        }
301    }
302}