001/*
002 *  Licensed to the Apache Software Foundation (ASF) under one or more
003 *  contributor license agreements.  See the NOTICE file distributed with
004 *  this work for additional information regarding copyright ownership.
005 *  The ASF licenses this file to You under the Apache License, Version 2.0
006 *  (the "License"); you may not use this file except in compliance with
007 *  the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 *  Unless required by applicable law or agreed to in writing, software
012 *  distributed under the License is distributed on an "AS IS" BASIS,
013 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 *  See the License for the specific language governing permissions and
015 *  limitations under the License.
016 *
017 */
018package org.apache.commons.compress.archivers.zip;
019
020import org.apache.commons.compress.parallel.FileBasedScatterGatherBackingStore;
021import org.apache.commons.compress.parallel.InputStreamSupplier;
022import org.apache.commons.compress.parallel.ScatterGatherBackingStore;
023import org.apache.commons.compress.parallel.ScatterGatherBackingStoreSupplier;
024
025import java.io.File;
026import java.io.IOException;
027import java.util.Deque;
028import java.util.concurrent.Callable;
029import java.util.concurrent.ConcurrentLinkedDeque;
030import java.util.concurrent.ExecutionException;
031import java.util.concurrent.ExecutorService;
032import java.util.concurrent.Executors;
033import java.util.concurrent.Future;
034import java.util.concurrent.TimeUnit;
035import java.util.concurrent.atomic.AtomicInteger;
036import java.util.zip.Deflater;
037
038import static org.apache.commons.compress.archivers.zip.ZipArchiveEntryRequest.createZipArchiveEntryRequest;
039
040/**
041 * Creates a zip in parallel by using multiple threadlocal {@link ScatterZipOutputStream} instances.
042 * <p>
043 * Note that until 1.18, this class generally made no guarantees about the order of things written to
044 * the output file. Things that needed to come in a specific order (manifests, directories)
045 * had to be handled by the client of this class, usually by writing these things to the
046 * {@link ZipArchiveOutputStream} <em>before</em> calling {@link #writeTo writeTo} on this class.</p>
047 * <p>
048 * The client can supply an {@link java.util.concurrent.ExecutorService}, but for reasons of
049 * memory model consistency, this will be shut down by this class prior to completion.
050 * </p>
051 * @since 1.10
052 */
053public class ParallelScatterZipCreator {
054    private final Deque<ScatterZipOutputStream> streams = new ConcurrentLinkedDeque<>();
055    private final ExecutorService es;
056    private final ScatterGatherBackingStoreSupplier backingStoreSupplier;
057    private final Deque<Future<? extends ScatterZipOutputStream>> futures = new ConcurrentLinkedDeque<>();
058
059    private final long startedAt = System.currentTimeMillis();
060    private long compressionDoneAt = 0;
061    private long scatterDoneAt;
062
063    private static class DefaultBackingStoreSupplier implements ScatterGatherBackingStoreSupplier {
064        final AtomicInteger storeNum = new AtomicInteger(0);
065
066        @Override
067        public ScatterGatherBackingStore get() throws IOException {
068            final File tempFile = File.createTempFile("parallelscatter", "n" + storeNum.incrementAndGet());
069            return new FileBasedScatterGatherBackingStore(tempFile);
070        }
071    }
072
073    private ScatterZipOutputStream createDeferred(final ScatterGatherBackingStoreSupplier scatterGatherBackingStoreSupplier)
074            throws IOException {
075        final ScatterGatherBackingStore bs = scatterGatherBackingStoreSupplier.get();
076        // lifecycle is bound to the ScatterZipOutputStream returned
077        final StreamCompressor sc = StreamCompressor.create(Deflater.DEFAULT_COMPRESSION, bs); //NOSONAR
078        return new ScatterZipOutputStream(bs, sc);
079    }
080
081    private final ThreadLocal<ScatterZipOutputStream> tlScatterStreams = new ThreadLocal<ScatterZipOutputStream>() {
082        @Override
083        protected ScatterZipOutputStream initialValue() {
084            try {
085                final ScatterZipOutputStream scatterStream = createDeferred(backingStoreSupplier);
086                streams.add(scatterStream);
087                return scatterStream;
088            } catch (final IOException e) {
089                throw new RuntimeException(e); //NOSONAR
090            }
091        }
092    };
093
094    /**
095     * Create a ParallelScatterZipCreator with default threads, which is set to the number of available
096     * processors, as defined by {@link java.lang.Runtime#availableProcessors}
097     */
098    public ParallelScatterZipCreator() {
099        this(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
100    }
101
102    /**
103     * Create a ParallelScatterZipCreator
104     *
105     * @param executorService The executorService to use for parallel scheduling. For technical reasons,
106     *                        this will be shut down by this class.
107     */
108    public ParallelScatterZipCreator(final ExecutorService executorService) {
109        this(executorService, new DefaultBackingStoreSupplier());
110    }
111
112    /**
113     * Create a ParallelScatterZipCreator
114     *
115     * @param executorService The executorService to use. For technical reasons, this will be shut down
116     *                        by this class.
117     * @param backingStoreSupplier The supplier of backing store which shall be used
118     */
119    public ParallelScatterZipCreator(final ExecutorService executorService,
120                                     final ScatterGatherBackingStoreSupplier backingStoreSupplier) {
121        this.backingStoreSupplier = backingStoreSupplier;
122        es = executorService;
123    }
124
125    /**
126     * Adds an archive entry to this archive.
127     * <p>
128     * This method is expected to be called from a single client thread
129     * </p>
130     *
131     * @param zipArchiveEntry The entry to add.
132     * @param source          The source input stream supplier
133     */
134
135    public void addArchiveEntry(final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier source) {
136        submitStreamAwareCallable(createCallable(zipArchiveEntry, source));
137    }
138
139    /**
140     * Adds an archive entry to this archive.
141     * <p>
142     * This method is expected to be called from a single client thread
143     * </p>
144     *
145     * @param zipArchiveEntryRequestSupplier Should supply the entry to be added.
146     * @since 1.13
147     */
148    public void addArchiveEntry(final ZipArchiveEntryRequestSupplier zipArchiveEntryRequestSupplier) {
149        submitStreamAwareCallable(createCallable(zipArchiveEntryRequestSupplier));
150    }
151
152    /**
153     * Submit a callable for compression.
154     *
155     * @see ParallelScatterZipCreator#createCallable for details of if/when to use this.
156     *
157     * @param callable The callable to run, created by {@link #createCallable createCallable}, possibly wrapped by caller.
158     */
159    public final void submit(final Callable<? extends Object> callable) {
160        submitStreamAwareCallable(new Callable<ScatterZipOutputStream>() {
161            @Override
162            public ScatterZipOutputStream call() throws Exception {
163                callable.call();
164                return tlScatterStreams.get();
165            }
166        });
167    }
168
169    /**
170     * Submit a callable for compression.
171     *
172     * @see ParallelScatterZipCreator#createCallable for details of if/when to use this.
173     *
174     * @param callable The callable to run, created by {@link #createCallable createCallable}, possibly wrapped by caller.
175     * @since 1.19
176     */
177    public final void submitStreamAwareCallable(final Callable<? extends ScatterZipOutputStream> callable) {
178        futures.add(es.submit(callable));
179    }
180
181    /**
182     * Create a callable that will compress the given archive entry.
183     *
184     * <p>This method is expected to be called from a single client thread.</p>
185     *
186     * Consider using {@link #addArchiveEntry addArchiveEntry}, which wraps this method and {@link #submitStreamAwareCallable submitStreamAwareCallable}.
187     * The most common use case for using {@link #createCallable createCallable} and {@link #submitStreamAwareCallable submitStreamAwareCallable} from a
188     * client is if you want to wrap the callable in something that can be prioritized by the supplied
189     * {@link ExecutorService}, for instance to process large or slow files first.
190     * Since the creation of the {@link ExecutorService} is handled by the client, all of this is up to the client.
191     *
192     * @param zipArchiveEntry The entry to add.
193     * @param source          The source input stream supplier
194     * @return A callable that should subsequently passed to #submitStreamAwareCallable, possibly in a wrapped/adapted from. The
195     * value of this callable is not used, but any exceptions happening inside the compression
196     * will be propagated through the callable.
197     */
198
199    public final Callable<ScatterZipOutputStream> createCallable(final ZipArchiveEntry zipArchiveEntry,
200        final InputStreamSupplier source) {
201        final int method = zipArchiveEntry.getMethod();
202        if (method == ZipMethod.UNKNOWN_CODE) {
203            throw new IllegalArgumentException("Method must be set on zipArchiveEntry: " + zipArchiveEntry);
204        }
205        final ZipArchiveEntryRequest zipArchiveEntryRequest = createZipArchiveEntryRequest(zipArchiveEntry, source);
206        return new Callable<ScatterZipOutputStream>() {
207            @Override
208            public ScatterZipOutputStream call() throws Exception {
209                ScatterZipOutputStream scatterStream = tlScatterStreams.get();
210                scatterStream.addArchiveEntry(zipArchiveEntryRequest);
211                return scatterStream;
212            }
213        };
214    }
215
216    /**
217     * Create a callable that will compress archive entry supplied by {@link ZipArchiveEntryRequestSupplier}.
218     *
219     * <p>This method is expected to be called from a single client thread.</p>
220     *
221     * The same as {@link #createCallable(ZipArchiveEntry, InputStreamSupplier)}, but the archive entry
222     * to be added is supplied by a {@link ZipArchiveEntryRequestSupplier}.
223     *
224     * @see #createCallable(ZipArchiveEntry, InputStreamSupplier)
225     *
226     * @param zipArchiveEntryRequestSupplier Should supply the entry to be added.
227     * @return A callable that should subsequently passed to #submitStreamAwareCallable, possibly in a wrapped/adapted from. The
228     * value of this callable is not used, but any exceptions happening inside the compression
229     * will be propagated through the callable.
230     * @since 1.13
231     */
232    public final Callable<ScatterZipOutputStream> createCallable(final ZipArchiveEntryRequestSupplier zipArchiveEntryRequestSupplier) {
233        return new Callable<ScatterZipOutputStream>() {
234            @Override
235            public ScatterZipOutputStream call() throws Exception {
236                ScatterZipOutputStream scatterStream = tlScatterStreams.get();
237                scatterStream.addArchiveEntry(zipArchiveEntryRequestSupplier.get());
238                return scatterStream;
239            }
240        };
241    }
242
243    /**
244     * Write the contents this to the target {@link ZipArchiveOutputStream}.
245     * <p>
246     * It may be beneficial to write things like directories and manifest files to the targetStream
247     * before calling this method.
248     * </p>
249     *
250     * <p>Calling this method will shut down the {@link ExecutorService} used by this class. If any of the {@link
251     * Callable}s {@link #submitStreamAwareCallable submit}ted to this instance throws an exception, the archive can not be created properly and
252     * this method will throw an exception.</p>
253     *
254     * @param targetStream The {@link ZipArchiveOutputStream} to receive the contents of the scatter streams
255     * @throws IOException          If writing fails
256     * @throws InterruptedException If we get interrupted
257     * @throws ExecutionException   If something happens in the parallel execution
258     */
259    public void writeTo(final ZipArchiveOutputStream targetStream)
260            throws IOException, InterruptedException, ExecutionException {
261
262        try {
263            // Make sure we catch any exceptions from parallel phase
264            try {
265                for (final Future<?> future : futures) {
266                    future.get();
267                }
268            } finally {
269                es.shutdown();
270            }
271
272            es.awaitTermination(1000 * 60L, TimeUnit.SECONDS);  // == Infinity. We really *must* wait for this to complete
273
274            // It is important that all threads terminate before we go on, ensure happens-before relationship
275            compressionDoneAt = System.currentTimeMillis();
276
277            for (final Future<? extends ScatterZipOutputStream> future : futures) {
278                ScatterZipOutputStream scatterStream = future.get();
279                scatterStream.zipEntryWriter().writeNextZipEntry(targetStream);
280            }
281
282            for (final ScatterZipOutputStream scatterStream : streams) {
283                scatterStream.close();
284            }
285
286            scatterDoneAt = System.currentTimeMillis();
287        } finally {
288            closeAll();
289        }
290    }
291
292    /**
293     * Returns a message describing the overall statistics of the compression run
294     *
295     * @return A string
296     */
297    public ScatterStatistics getStatisticsMessage() {
298        return new ScatterStatistics(compressionDoneAt - startedAt, scatterDoneAt - compressionDoneAt);
299    }
300
301    private void closeAll() {
302        for (final ScatterZipOutputStream scatterStream : streams) {
303            try {
304                scatterStream.close();
305            } catch (IOException ex) { //NOSONAR
306                // no way to properly log this
307            }
308        }
309    }
310}
311