ParallelScatterZipCreator.java

  1. /*
  2.  *  Licensed to the Apache Software Foundation (ASF) under one or more
  3.  *  contributor license agreements.  See the NOTICE file distributed with
  4.  *  this work for additional information regarding copyright ownership.
  5.  *  The ASF licenses this file to You under the Apache License, Version 2.0
  6.  *  (the "License"); you may not use this file except in compliance with
  7.  *  the License.  You may obtain a copy of the License at
  8.  *
  9.  *      http://www.apache.org/licenses/LICENSE-2.0
  10.  *
  11.  *  Unless required by applicable law or agreed to in writing, software
  12.  *  distributed under the License is distributed on an "AS IS" BASIS,
  13.  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14.  *  See the License for the specific language governing permissions and
  15.  *  limitations under the License.
  16.  */
  17. package org.apache.commons.compress.archivers.zip;

  18. import static org.apache.commons.compress.archivers.zip.ZipArchiveEntryRequest.createZipArchiveEntryRequest;

  19. import java.io.IOException;
  20. import java.io.UncheckedIOException;
  21. import java.util.Deque;
  22. import java.util.concurrent.Callable;
  23. import java.util.concurrent.ConcurrentLinkedDeque;
  24. import java.util.concurrent.ExecutionException;
  25. import java.util.concurrent.ExecutorService;
  26. import java.util.concurrent.Executors;
  27. import java.util.concurrent.Future;
  28. import java.util.concurrent.TimeUnit;
  29. import java.util.zip.Deflater;

  30. import org.apache.commons.compress.parallel.InputStreamSupplier;
  31. import org.apache.commons.compress.parallel.ScatterGatherBackingStore;
  32. import org.apache.commons.compress.parallel.ScatterGatherBackingStoreSupplier;

  33. /**
  34.  * Creates a ZIP in parallel by using multiple threadlocal {@link ScatterZipOutputStream} instances.
  35.  * <p>
  36.  * Note that until 1.18, this class generally made no guarantees about the order of things written to the output file. Things that needed to come in a specific
  37.  * order (manifests, directories) had to be handled by the client of this class, usually by writing these things to the {@link ZipArchiveOutputStream}
  38.  * <em>before</em> calling {@link #writeTo writeTo} on this class.
  39.  * </p>
  40.  * <p>
  41.  * The client can supply an {@link java.util.concurrent.ExecutorService}, but for reasons of memory model consistency, this will be shut down by this class
  42.  * prior to completion.
  43.  * </p>
  44.  *
  45.  * @since 1.10
  46.  */
  47. public class ParallelScatterZipCreator {

  48.     private final Deque<ScatterZipOutputStream> streams = new ConcurrentLinkedDeque<>();
  49.     private final ExecutorService executorService;
  50.     private final ScatterGatherBackingStoreSupplier backingStoreSupplier;

  51.     private final Deque<Future<? extends ScatterZipOutputStream>> futures = new ConcurrentLinkedDeque<>();
  52.     private final long startedAt = System.currentTimeMillis();
  53.     private long compressionDoneAt;
  54.     private long scatterDoneAt;

  55.     private final int compressionLevel;

  56.     private final ThreadLocal<ScatterZipOutputStream> tlScatterStreams = new ThreadLocal<ScatterZipOutputStream>() {
  57.         @Override
  58.         protected ScatterZipOutputStream initialValue() {
  59.             try {
  60.                 final ScatterZipOutputStream scatterStream = createDeferred(backingStoreSupplier);
  61.                 streams.add(scatterStream);
  62.                 return scatterStream;
  63.             } catch (final IOException e) {
  64.                 throw new UncheckedIOException(e); // NOSONAR
  65.             }
  66.         }
  67.     };

  68.     /**
  69.      * Constructs a ParallelScatterZipCreator with default threads, which is set to the number of available processors, as defined by
  70.      * {@link Runtime#availableProcessors}
  71.      */
  72.     public ParallelScatterZipCreator() {
  73.         this(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
  74.     }

  75.     /**
  76.      * Constructs a ParallelScatterZipCreator
  77.      *
  78.      * @param executorService The executorService to use for parallel scheduling. For technical reasons, this will be shut down by this class.
  79.      */
  80.     public ParallelScatterZipCreator(final ExecutorService executorService) {
  81.         this(executorService, new DefaultBackingStoreSupplier(null));
  82.     }

  83.     /**
  84.      * Constructs a ParallelScatterZipCreator
  85.      *
  86.      * @param executorService      The executorService to use. For technical reasons, this will be shut down by this class.
  87.      * @param backingStoreSupplier The supplier of backing store which shall be used
  88.      */
  89.     public ParallelScatterZipCreator(final ExecutorService executorService, final ScatterGatherBackingStoreSupplier backingStoreSupplier) {
  90.         this(executorService, backingStoreSupplier, Deflater.DEFAULT_COMPRESSION);
  91.     }

  92.     /**
  93.      * Constructs a ParallelScatterZipCreator
  94.      *
  95.      * @param executorService      The executorService to use. For technical reasons, this will be shut down by this class.
  96.      * @param backingStoreSupplier The supplier of backing store which shall be used
  97.      * @param compressionLevel     The compression level used in compression, this value should be -1(default level) or between 0~9.
  98.      * @throws IllegalArgumentException if the compression level is illegal
  99.      * @since 1.21
  100.      */
  101.     public ParallelScatterZipCreator(final ExecutorService executorService, final ScatterGatherBackingStoreSupplier backingStoreSupplier,
  102.             final int compressionLevel) throws IllegalArgumentException {
  103.         if ((compressionLevel < Deflater.NO_COMPRESSION || compressionLevel > Deflater.BEST_COMPRESSION) && compressionLevel != Deflater.DEFAULT_COMPRESSION) {
  104.             throw new IllegalArgumentException("Compression level is expected between -1~9");
  105.         }

  106.         this.backingStoreSupplier = backingStoreSupplier;
  107.         this.executorService = executorService;
  108.         this.compressionLevel = compressionLevel;
  109.     }

  110.     /**
  111.      * Adds an archive entry to this archive.
  112.      * <p>
  113.      * This method is expected to be called from a single client thread
  114.      * </p>
  115.      *
  116.      * @param zipArchiveEntry The entry to add.
  117.      * @param source          The source input stream supplier
  118.      */

  119.     public void addArchiveEntry(final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier source) {
  120.         submitStreamAwareCallable(createCallable(zipArchiveEntry, source));
  121.     }

  122.     /**
  123.      * Adds an archive entry to this archive.
  124.      * <p>
  125.      * This method is expected to be called from a single client thread
  126.      * </p>
  127.      *
  128.      * @param zipArchiveEntryRequestSupplier Should supply the entry to be added.
  129.      * @since 1.13
  130.      */
  131.     public void addArchiveEntry(final ZipArchiveEntryRequestSupplier zipArchiveEntryRequestSupplier) {
  132.         submitStreamAwareCallable(createCallable(zipArchiveEntryRequestSupplier));
  133.     }

  134.     private void closeAll() {
  135.         for (final ScatterZipOutputStream scatterStream : streams) {
  136.             try {
  137.                 scatterStream.close();
  138.             } catch (final IOException ignored) {
  139.                 // no way to properly log this
  140.             }
  141.         }
  142.     }

  143.     /**
  144.      * Creates a callable that will compress the given archive entry.
  145.      *
  146.      * <p>
  147.      * This method is expected to be called from a single client thread.
  148.      * </p>
  149.      *
  150.      * Consider using {@link #addArchiveEntry addArchiveEntry}, which wraps this method and {@link #submitStreamAwareCallable submitStreamAwareCallable}. The
  151.      * most common use case for using {@link #createCallable createCallable} and {@link #submitStreamAwareCallable submitStreamAwareCallable} from a client is
  152.      * if you want to wrap the callable in something that can be prioritized by the supplied {@link ExecutorService}, for instance to process large or slow
  153.      * files first. Since the creation of the {@link ExecutorService} is handled by the client, all of this is up to the client.
  154.      *
  155.      * @param zipArchiveEntry The entry to add.
  156.      * @param source          The source input stream supplier
  157.      * @return A callable that should subsequently be passed to #submitStreamAwareCallable, possibly in a wrapped/adapted from. The value of this callable is
  158.      *         not used, but any exceptions happening inside the compression will be propagated through the callable.
  159.      */

  160.     public final Callable<ScatterZipOutputStream> createCallable(final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier source) {
  161.         final int method = zipArchiveEntry.getMethod();
  162.         if (method == ZipMethod.UNKNOWN_CODE) {
  163.             throw new IllegalArgumentException("Method must be set on zipArchiveEntry: " + zipArchiveEntry);
  164.         }
  165.         final ZipArchiveEntryRequest zipArchiveEntryRequest = createZipArchiveEntryRequest(zipArchiveEntry, source);
  166.         return () -> {
  167.             final ScatterZipOutputStream scatterStream = tlScatterStreams.get();
  168.             scatterStream.addArchiveEntry(zipArchiveEntryRequest);
  169.             return scatterStream;
  170.         };
  171.     }

  172.     /**
  173.      * Creates a callable that will compress archive entry supplied by {@link ZipArchiveEntryRequestSupplier}.
  174.      *
  175.      * <p>
  176.      * This method is expected to be called from a single client thread.
  177.      * </p>
  178.      *
  179.      * The same as {@link #createCallable(ZipArchiveEntry, InputStreamSupplier)}, but the archive entry to be added is supplied by a
  180.      * {@link ZipArchiveEntryRequestSupplier}.
  181.      *
  182.      * @see #createCallable(ZipArchiveEntry, InputStreamSupplier)
  183.      *
  184.      * @param zipArchiveEntryRequestSupplier Should supply the entry to be added.
  185.      * @return A callable that should subsequently be passed to #submitStreamAwareCallable, possibly in a wrapped/adapted from. The value of this callable is
  186.      *         not used, but any exceptions happening inside the compression will be propagated through the callable.
  187.      * @since 1.13
  188.      */
  189.     public final Callable<ScatterZipOutputStream> createCallable(final ZipArchiveEntryRequestSupplier zipArchiveEntryRequestSupplier) {
  190.         return () -> {
  191.             final ScatterZipOutputStream scatterStream = tlScatterStreams.get();
  192.             scatterStream.addArchiveEntry(zipArchiveEntryRequestSupplier.get());
  193.             return scatterStream;
  194.         };
  195.     }

  196.     @SuppressWarnings("resource") // Caller closes
  197.     private ScatterZipOutputStream createDeferred(final ScatterGatherBackingStoreSupplier scatterGatherBackingStoreSupplier) throws IOException {
  198.         final ScatterGatherBackingStore bs = scatterGatherBackingStoreSupplier.get();
  199.         // lifecycle is bound to the ScatterZipOutputStream returned
  200.         final StreamCompressor sc = StreamCompressor.create(compressionLevel, bs); // NOSONAR
  201.         return new ScatterZipOutputStream(bs, sc);
  202.     }

  203.     /**
  204.      * Gets a message describing the overall statistics of the compression run
  205.      *
  206.      * @return A string
  207.      */
  208.     public ScatterStatistics getStatisticsMessage() {
  209.         return new ScatterStatistics(compressionDoneAt - startedAt, scatterDoneAt - compressionDoneAt);
  210.     }

  211.     /**
  212.      * Submits a callable for compression.
  213.      *
  214.      * @see ParallelScatterZipCreator#createCallable for details of if/when to use this.
  215.      *
  216.      * @param callable The callable to run, created by {@link #createCallable createCallable}, possibly wrapped by caller.
  217.      */
  218.     public final void submit(final Callable<? extends Object> callable) {
  219.         submitStreamAwareCallable(() -> {
  220.             callable.call();
  221.             return tlScatterStreams.get();
  222.         });
  223.     }

  224.     /**
  225.      * Submits a callable for compression.
  226.      *
  227.      * @see ParallelScatterZipCreator#createCallable for details of if/when to use this.
  228.      *
  229.      * @param callable The callable to run, created by {@link #createCallable createCallable}, possibly wrapped by caller.
  230.      * @since 1.19
  231.      */
  232.     public final void submitStreamAwareCallable(final Callable<? extends ScatterZipOutputStream> callable) {
  233.         futures.add(executorService.submit(callable));
  234.     }

  235.     /**
  236.      * Writes the contents this to the target {@link ZipArchiveOutputStream}.
  237.      * <p>
  238.      * It may be beneficial to write things like directories and manifest files to the targetStream before calling this method.
  239.      * </p>
  240.      * <p>
  241.      * Calling this method will shut down the {@link ExecutorService} used by this class. If any of the {@link Callable}s {@link #submitStreamAwareCallable
  242.      * submit}ted to this instance throws an exception, the archive can not be created properly and this method will throw an exception.
  243.      * </p>
  244.      *
  245.      * @param targetStream The {@link ZipArchiveOutputStream} to receive the contents of the scatter streams
  246.      * @throws IOException          If writing fails
  247.      * @throws InterruptedException If we get interrupted
  248.      * @throws ExecutionException   If something happens in the parallel execution
  249.      */
  250.     public void writeTo(final ZipArchiveOutputStream targetStream) throws IOException, InterruptedException, ExecutionException {

  251.         try {
  252.             // Make sure we catch any exceptions from parallel phase
  253.             try {
  254.                 for (final Future<?> future : futures) {
  255.                     future.get();
  256.                 }
  257.             } finally {
  258.                 executorService.shutdown();
  259.             }

  260.             executorService.awaitTermination(1000 * 60L, TimeUnit.SECONDS); // == Infinity. We really *must* wait for this to complete

  261.             // It is important that all threads terminate before we go on, ensure happens-before relationship
  262.             compressionDoneAt = System.currentTimeMillis();

  263.             for (final Future<? extends ScatterZipOutputStream> future : futures) {
  264.                 final ScatterZipOutputStream scatterStream = future.get();
  265.                 scatterStream.zipEntryWriter().writeNextZipEntry(targetStream);
  266.             }

  267.             for (final ScatterZipOutputStream scatterStream : streams) {
  268.                 scatterStream.close();
  269.             }

  270.             scatterDoneAt = System.currentTimeMillis();
  271.         } finally {
  272.             closeAll();
  273.         }
  274.     }
  275. }