View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   https://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.commons.compress.archivers.zip;
20  
21  import java.io.Closeable;
22  import java.io.File;
23  import java.io.FileNotFoundException;
24  import java.io.IOException;
25  import java.io.InputStream;
26  import java.nio.file.Path;
27  import java.util.Iterator;
28  import java.util.Queue;
29  import java.util.concurrent.ConcurrentLinkedQueue;
30  import java.util.concurrent.atomic.AtomicBoolean;
31  import java.util.zip.Deflater;
32  
33  import org.apache.commons.compress.parallel.FileBasedScatterGatherBackingStore;
34  import org.apache.commons.compress.parallel.ScatterGatherBackingStore;
35  import org.apache.commons.io.IOUtils;
36  import org.apache.commons.io.input.BoundedInputStream;
37  
38  /**
39   * A ZIP output stream that is optimized for multi-threaded scatter/gather construction of ZIP files.
40   * <p>
41   * The internal data format of the entries used by this class are entirely private to this class and are not part of any public api whatsoever.
42   * </p>
43   * <p>
44   * It is possible to extend this class to support different kinds of backing storage, the default implementation only supports file-based backing.
45   * </p>
46   * <p>
47   * Thread safety: This class supports multiple threads. But the "writeTo" method must be called by the thread that originally created the
48   * {@link ZipArchiveEntry}.
49   * </p>
50   *
51   * @since 1.10
52   */
53  public class ScatterZipOutputStream implements Closeable {
54  
55      private static final class CompressedEntry {
56          final ZipArchiveEntryRequest zipArchiveEntryRequest;
57          final long crc;
58          final long compressedSize;
59          final long size;
60  
61          CompressedEntry(final ZipArchiveEntryRequest zipArchiveEntryRequest, final long crc, final long compressedSize, final long size) {
62              this.zipArchiveEntryRequest = zipArchiveEntryRequest;
63              this.crc = crc;
64              this.compressedSize = compressedSize;
65              this.size = size;
66          }
67  
68          /**
69           * Updates the original {@link ZipArchiveEntry} with sizes/CRC. Do not use this method from threads that did not create the instance itself!
70           *
71           * @return the zipArchiveEntry that is the basis for this request.
72           */
73          public ZipArchiveEntry transferToArchiveEntry() {
74              final ZipArchiveEntry entry = zipArchiveEntryRequest.getZipArchiveEntry();
75              entry.setCompressedSize(compressedSize);
76              entry.setSize(size);
77              entry.setCrc(crc);
78              entry.setMethod(zipArchiveEntryRequest.getMethod());
79              return entry;
80          }
81      }
82  
83      /**
84       * Writes ZIP entries to a ZIP archive.
85       */
86      public static class ZipEntryWriter implements Closeable {
87          private final Iterator<CompressedEntry> itemsIterator;
88          private final InputStream inputStream;
89  
90          /**
91           * Constructs a new instance.
92           *
93           * @param out a ScatterZipOutputStream.
94           * @throws IOException if an I/O error occurs.
95           */
96          public ZipEntryWriter(final ScatterZipOutputStream out) throws IOException {
97              out.backingStore.closeForWriting();
98              itemsIterator = out.items.iterator();
99              inputStream = out.backingStore.getInputStream();
100         }
101 
102         @Override
103         public void close() throws IOException {
104             IOUtils.close(inputStream);
105         }
106 
107         /**
108          * Writes the next ZIP entry to the given target.
109          *
110          * @param target Where to write.
111          * @throws IOException if an I/O error occurs.
112          */
113         public void writeNextZipEntry(final ZipArchiveOutputStream target) throws IOException {
114             final CompressedEntry compressedEntry = itemsIterator.next();
115             // @formatter:off
116             try (BoundedInputStream rawStream = BoundedInputStream.builder()
117                     .setInputStream(inputStream)
118                     .setMaxCount(compressedEntry.compressedSize)
119                     .setPropagateClose(false)
120                     .get()) {
121                 target.addRawArchiveEntry(compressedEntry.transferToArchiveEntry(), rawStream);
122             }
123             // @formatter:on
124         }
125     }
126 
127     /**
128      * Creates a {@link ScatterZipOutputStream} with default compression level that is backed by a file
129      *
130      * @param file The file to offload compressed data into.
131      * @return A ScatterZipOutputStream that is ready for use.
132      * @throws FileNotFoundException if the file cannot be found
133      */
134     public static ScatterZipOutputStream fileBased(final File file) throws FileNotFoundException {
135         return pathBased(file.toPath(), Deflater.DEFAULT_COMPRESSION);
136     }
137 
138     /**
139      * Creates a {@link ScatterZipOutputStream} that is backed by a file
140      *
141      * @param file             The file to offload compressed data into.
142      * @param compressionLevel The compression level to use, @see #Deflater
143      * @return A ScatterZipOutputStream that is ready for use.
144      * @throws FileNotFoundException if the file cannot be found
145      */
146     public static ScatterZipOutputStream fileBased(final File file, final int compressionLevel) throws FileNotFoundException {
147         return pathBased(file.toPath(), compressionLevel);
148     }
149 
150     /**
151      * Creates a {@link ScatterZipOutputStream} with default compression level that is backed by a file
152      *
153      * @param path The path to offload compressed data into.
154      * @return A ScatterZipOutputStream that is ready for use.
155      * @throws FileNotFoundException if the path cannot be found
156      * @since 1.22
157      */
158     public static ScatterZipOutputStream pathBased(final Path path) throws FileNotFoundException {
159         return pathBased(path, Deflater.DEFAULT_COMPRESSION);
160     }
161 
162     /**
163      * Creates a {@link ScatterZipOutputStream} that is backed by a file
164      *
165      * @param path             The path to offload compressed data into.
166      * @param compressionLevel The compression level to use, @see #Deflater
167      * @return A ScatterZipOutputStream that is ready for use.
168      * @throws FileNotFoundException if the path cannot be found
169      * @since 1.22
170      */
171     public static ScatterZipOutputStream pathBased(final Path path, final int compressionLevel) throws FileNotFoundException {
172         final ScatterGatherBackingStore bs = new FileBasedScatterGatherBackingStore(path);
173         // lifecycle is bound to the ScatterZipOutputStream returned
174         final StreamCompressor sc = StreamCompressor.create(compressionLevel, bs); // NOSONAR
175         return new ScatterZipOutputStream(bs, sc);
176     }
177 
178     private final Queue<CompressedEntry> items = new ConcurrentLinkedQueue<>();
179 
180     private final ScatterGatherBackingStore backingStore;
181 
182     private final StreamCompressor streamCompressor;
183 
184     private final AtomicBoolean isClosed = new AtomicBoolean();
185 
186     private ZipEntryWriter zipEntryWriter;
187 
188     /**
189      * Constructs a new instance.
190      *
191      * @param backingStore the backing store.
192      * @param streamCompressor Deflates ZIP entries.
193      */
194     public ScatterZipOutputStream(final ScatterGatherBackingStore backingStore, final StreamCompressor streamCompressor) {
195         this.backingStore = backingStore;
196         this.streamCompressor = streamCompressor;
197     }
198 
199     /**
200      * Adds an archive entry to this scatter stream.
201      *
202      * @param zipArchiveEntryRequest The entry to write.
203      * @throws IOException If writing fails
204      */
205     public void addArchiveEntry(final ZipArchiveEntryRequest zipArchiveEntryRequest) throws IOException {
206         try (InputStream payloadStream = zipArchiveEntryRequest.getPayloadStream()) {
207             streamCompressor.deflate(payloadStream, zipArchiveEntryRequest.getMethod());
208         }
209         items.add(new CompressedEntry(zipArchiveEntryRequest, streamCompressor.getCrc32(), streamCompressor.getBytesWrittenForLastEntry(),
210                 streamCompressor.getBytesRead()));
211     }
212 
213     /**
214      * Closes this stream, freeing all resources involved in the creation of this stream.
215      *
216      * @throws IOException If closing fails
217      */
218     @Override
219     public void close() throws IOException {
220         if (!isClosed.compareAndSet(false, true)) {
221             return;
222         }
223         try {
224             IOUtils.close(zipEntryWriter);
225             backingStore.close();
226         } finally {
227             streamCompressor.close();
228         }
229     }
230 
231     /**
232      * Writes the contents of this scatter stream to a target archive.
233      *
234      * @param target The archive to receive the contents of this {@link ScatterZipOutputStream}.
235      * @throws IOException If writing fails
236      * @see #zipEntryWriter()
237      */
238     public void writeTo(final ZipArchiveOutputStream target) throws IOException {
239         backingStore.closeForWriting();
240         try (InputStream data = backingStore.getInputStream()) {
241             for (final CompressedEntry compressedEntry : items) {
242                 // @formatter:off
243                 try (BoundedInputStream rawStream = BoundedInputStream.builder()
244                         .setInputStream(data)
245                         .setMaxCount(compressedEntry.compressedSize)
246                         .setPropagateClose(false)
247                         .get()) {
248                     target.addRawArchiveEntry(compressedEntry.transferToArchiveEntry(), rawStream);
249                 }
250                 // @formatter:on
251             }
252         }
253     }
254 
255     /**
256      * Gets a ZIP entry writer for this scatter stream.
257      *
258      * @throws IOException If getting scatter stream input stream
259      * @return the ZipEntryWriter created on first call of the method
260      */
261     public ZipEntryWriter zipEntryWriter() throws IOException {
262         if (zipEntryWriter == null) {
263             zipEntryWriter = new ZipEntryWriter(this);
264         }
265         return zipEntryWriter;
266     }
267 }