ScatterZipOutputStream.java

  1. /*
  2.  *  Licensed to the Apache Software Foundation (ASF) under one or more
  3.  *  contributor license agreements.  See the NOTICE file distributed with
  4.  *  this work for additional information regarding copyright ownership.
  5.  *  The ASF licenses this file to You under the Apache License, Version 2.0
  6.  *  (the "License"); you may not use this file except in compliance with
  7.  *  the License.  You may obtain a copy of the License at
  8.  *
  9.  *      http://www.apache.org/licenses/LICENSE-2.0
  10.  *
  11.  *  Unless required by applicable law or agreed to in writing, software
  12.  *  distributed under the License is distributed on an "AS IS" BASIS,
  13.  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14.  *  See the License for the specific language governing permissions and
  15.  *  limitations under the License.
  16.  */
  17. package org.apache.commons.compress.archivers.zip;

  18. import java.io.Closeable;
  19. import java.io.File;
  20. import java.io.FileNotFoundException;
  21. import java.io.IOException;
  22. import java.io.InputStream;
  23. import java.nio.file.Path;
  24. import java.util.Iterator;
  25. import java.util.Queue;
  26. import java.util.concurrent.ConcurrentLinkedQueue;
  27. import java.util.concurrent.atomic.AtomicBoolean;
  28. import java.util.zip.Deflater;

  29. import org.apache.commons.compress.parallel.FileBasedScatterGatherBackingStore;
  30. import org.apache.commons.compress.parallel.ScatterGatherBackingStore;
  31. import org.apache.commons.io.input.BoundedInputStream;

  32. /**
  33.  * A ZIP output stream that is optimized for multi-threaded scatter/gather construction of ZIP files.
  34.  * <p>
  35.  * The internal data format of the entries used by this class are entirely private to this class and are not part of any public api whatsoever.
  36.  * </p>
  37.  * <p>
  38.  * It is possible to extend this class to support different kinds of backing storage, the default implementation only supports file-based backing.
  39.  * </p>
  40.  * <p>
  41.  * Thread safety: This class supports multiple threads. But the "writeTo" method must be called by the thread that originally created the
  42.  * {@link ZipArchiveEntry}.
  43.  * </p>
  44.  *
  45.  * @since 1.10
  46.  */
  47. public class ScatterZipOutputStream implements Closeable {

  48.     private static final class CompressedEntry {
  49.         final ZipArchiveEntryRequest zipArchiveEntryRequest;
  50.         final long crc;
  51.         final long compressedSize;
  52.         final long size;

  53.         CompressedEntry(final ZipArchiveEntryRequest zipArchiveEntryRequest, final long crc, final long compressedSize, final long size) {
  54.             this.zipArchiveEntryRequest = zipArchiveEntryRequest;
  55.             this.crc = crc;
  56.             this.compressedSize = compressedSize;
  57.             this.size = size;
  58.         }

  59.         /**
  60.          * Updates the original {@link ZipArchiveEntry} with sizes/crc. Do not use this method from threads that did not create the instance itself!
  61.          *
  62.          * @return the zipArchiveEntry that is the basis for this request.
  63.          */

  64.         public ZipArchiveEntry transferToArchiveEntry() {
  65.             final ZipArchiveEntry entry = zipArchiveEntryRequest.getZipArchiveEntry();
  66.             entry.setCompressedSize(compressedSize);
  67.             entry.setSize(size);
  68.             entry.setCrc(crc);
  69.             entry.setMethod(zipArchiveEntryRequest.getMethod());
  70.             return entry;
  71.         }
  72.     }

  73.     public static class ZipEntryWriter implements Closeable {
  74.         private final Iterator<CompressedEntry> itemsIterator;
  75.         private final InputStream itemsIteratorData;

  76.         public ZipEntryWriter(final ScatterZipOutputStream scatter) throws IOException {
  77.             scatter.backingStore.closeForWriting();
  78.             itemsIterator = scatter.items.iterator();
  79.             itemsIteratorData = scatter.backingStore.getInputStream();
  80.         }

  81.         @Override
  82.         public void close() throws IOException {
  83.             if (itemsIteratorData != null) {
  84.                 itemsIteratorData.close();
  85.             }
  86.         }

  87.         public void writeNextZipEntry(final ZipArchiveOutputStream target) throws IOException {
  88.             final CompressedEntry compressedEntry = itemsIterator.next();
  89.             // @formatter:off
  90.             try (BoundedInputStream rawStream = BoundedInputStream.builder()
  91.                     .setInputStream(itemsIteratorData)
  92.                     .setMaxCount(compressedEntry.compressedSize)
  93.                     .setPropagateClose(false)
  94.                     .get()) {
  95.                 target.addRawArchiveEntry(compressedEntry.transferToArchiveEntry(), rawStream);
  96.             }
  97.             // @formatter:on
  98.         }
  99.     }

  100.     /**
  101.      * Creates a {@link ScatterZipOutputStream} with default compression level that is backed by a file
  102.      *
  103.      * @param file The file to offload compressed data into.
  104.      * @return A ScatterZipOutputStream that is ready for use.
  105.      * @throws FileNotFoundException if the file cannot be found
  106.      */
  107.     public static ScatterZipOutputStream fileBased(final File file) throws FileNotFoundException {
  108.         return pathBased(file.toPath(), Deflater.DEFAULT_COMPRESSION);
  109.     }

  110.     /**
  111.      * Creates a {@link ScatterZipOutputStream} that is backed by a file
  112.      *
  113.      * @param file             The file to offload compressed data into.
  114.      * @param compressionLevel The compression level to use, @see #Deflater
  115.      * @return A ScatterZipOutputStream that is ready for use.
  116.      * @throws FileNotFoundException if the file cannot be found
  117.      */
  118.     public static ScatterZipOutputStream fileBased(final File file, final int compressionLevel) throws FileNotFoundException {
  119.         return pathBased(file.toPath(), compressionLevel);
  120.     }

  121.     /**
  122.      * Creates a {@link ScatterZipOutputStream} with default compression level that is backed by a file
  123.      *
  124.      * @param path The path to offload compressed data into.
  125.      * @return A ScatterZipOutputStream that is ready for use.
  126.      * @throws FileNotFoundException if the path cannot be found
  127.      * @since 1.22
  128.      */
  129.     public static ScatterZipOutputStream pathBased(final Path path) throws FileNotFoundException {
  130.         return pathBased(path, Deflater.DEFAULT_COMPRESSION);
  131.     }

  132.     /**
  133.      * Creates a {@link ScatterZipOutputStream} that is backed by a file
  134.      *
  135.      * @param path             The path to offload compressed data into.
  136.      * @param compressionLevel The compression level to use, @see #Deflater
  137.      * @return A ScatterZipOutputStream that is ready for use.
  138.      * @throws FileNotFoundException if the path cannot be found
  139.      * @since 1.22
  140.      */
  141.     public static ScatterZipOutputStream pathBased(final Path path, final int compressionLevel) throws FileNotFoundException {
  142.         final ScatterGatherBackingStore bs = new FileBasedScatterGatherBackingStore(path);
  143.         // lifecycle is bound to the ScatterZipOutputStream returned
  144.         final StreamCompressor sc = StreamCompressor.create(compressionLevel, bs); // NOSONAR
  145.         return new ScatterZipOutputStream(bs, sc);
  146.     }

  147.     private final Queue<CompressedEntry> items = new ConcurrentLinkedQueue<>();

  148.     private final ScatterGatherBackingStore backingStore;

  149.     private final StreamCompressor streamCompressor;

  150.     private final AtomicBoolean isClosed = new AtomicBoolean();

  151.     private ZipEntryWriter zipEntryWriter;

  152.     public ScatterZipOutputStream(final ScatterGatherBackingStore backingStore, final StreamCompressor streamCompressor) {
  153.         this.backingStore = backingStore;
  154.         this.streamCompressor = streamCompressor;
  155.     }

  156.     /**
  157.      * Adds an archive entry to this scatter stream.
  158.      *
  159.      * @param zipArchiveEntryRequest The entry to write.
  160.      * @throws IOException If writing fails
  161.      */
  162.     public void addArchiveEntry(final ZipArchiveEntryRequest zipArchiveEntryRequest) throws IOException {
  163.         try (InputStream payloadStream = zipArchiveEntryRequest.getPayloadStream()) {
  164.             streamCompressor.deflate(payloadStream, zipArchiveEntryRequest.getMethod());
  165.         }
  166.         items.add(new CompressedEntry(zipArchiveEntryRequest, streamCompressor.getCrc32(), streamCompressor.getBytesWrittenForLastEntry(),
  167.                 streamCompressor.getBytesRead()));
  168.     }

  169.     /**
  170.      * Closes this stream, freeing all resources involved in the creation of this stream.
  171.      *
  172.      * @throws IOException If closing fails
  173.      */
  174.     @Override
  175.     public void close() throws IOException {
  176.         if (!isClosed.compareAndSet(false, true)) {
  177.             return;
  178.         }
  179.         try {
  180.             if (zipEntryWriter != null) {
  181.                 zipEntryWriter.close();
  182.             }
  183.             backingStore.close();
  184.         } finally {
  185.             streamCompressor.close();
  186.         }
  187.     }

  188.     /**
  189.      * Writes the contents of this scatter stream to a target archive.
  190.      *
  191.      * @param target The archive to receive the contents of this {@link ScatterZipOutputStream}.
  192.      * @throws IOException If writing fails
  193.      * @see #zipEntryWriter()
  194.      */
  195.     public void writeTo(final ZipArchiveOutputStream target) throws IOException {
  196.         backingStore.closeForWriting();
  197.         try (InputStream data = backingStore.getInputStream()) {
  198.             for (final CompressedEntry compressedEntry : items) {
  199.                 // @formatter:off
  200.                 try (BoundedInputStream rawStream = BoundedInputStream.builder()
  201.                         .setInputStream(data)
  202.                         .setMaxCount(compressedEntry.compressedSize)
  203.                         .setPropagateClose(false)
  204.                         .get()) {
  205.                     target.addRawArchiveEntry(compressedEntry.transferToArchiveEntry(), rawStream);
  206.                 }
  207.                 // @formatter:on
  208.             }
  209.         }
  210.     }

  211.     /**
  212.      * Gets a ZIP entry writer for this scatter stream.
  213.      *
  214.      * @throws IOException If getting scatter stream input stream
  215.      * @return the ZipEntryWriter created on first call of the method
  216.      */
  217.     public ZipEntryWriter zipEntryWriter() throws IOException {
  218.         if (zipEntryWriter == null) {
  219.             zipEntryWriter = new ZipEntryWriter(this);
  220.         }
  221.         return zipEntryWriter;
  222.     }
  223. }