View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one or more
3    *  contributor license agreements.  See the NOTICE file distributed with
4    *  this work for additional information regarding copyright ownership.
5    *  The ASF licenses this file to You under the Apache License, Version 2.0
6    *  (the "License"); you may not use this file except in compliance with
7    *  the License.  You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   *  Unless required by applicable law or agreed to in writing, software
12   *  distributed under the License is distributed on an "AS IS" BASIS,
13   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   *  See the License for the specific language governing permissions and
15   *  limitations under the License.
16   */
17  package org.apache.commons.compress.archivers.zip;
18  
19  import java.io.Closeable;
20  import java.io.File;
21  import java.io.FileNotFoundException;
22  import java.io.IOException;
23  import java.io.InputStream;
24  import java.nio.file.Path;
25  import java.util.Iterator;
26  import java.util.Queue;
27  import java.util.concurrent.ConcurrentLinkedQueue;
28  import java.util.concurrent.atomic.AtomicBoolean;
29  import java.util.zip.Deflater;
30  
31  import org.apache.commons.compress.parallel.FileBasedScatterGatherBackingStore;
32  import org.apache.commons.compress.parallel.ScatterGatherBackingStore;
33  import org.apache.commons.io.input.BoundedInputStream;
34  
35  /**
36   * A ZIP output stream that is optimized for multi-threaded scatter/gather construction of ZIP files.
37   * <p>
38   * The internal data format of the entries used by this class are entirely private to this class and are not part of any public api whatsoever.
39   * </p>
40   * <p>
41   * It is possible to extend this class to support different kinds of backing storage, the default implementation only supports file-based backing.
42   * </p>
43   * <p>
44   * Thread safety: This class supports multiple threads. But the "writeTo" method must be called by the thread that originally created the
45   * {@link ZipArchiveEntry}.
46   * </p>
47   *
48   * @since 1.10
49   */
50  public class ScatterZipOutputStream implements Closeable {
51  
52      private static final class CompressedEntry {
53          final ZipArchiveEntryRequest zipArchiveEntryRequest;
54          final long crc;
55          final long compressedSize;
56          final long size;
57  
58          CompressedEntry(final ZipArchiveEntryRequest zipArchiveEntryRequest, final long crc, final long compressedSize, final long size) {
59              this.zipArchiveEntryRequest = zipArchiveEntryRequest;
60              this.crc = crc;
61              this.compressedSize = compressedSize;
62              this.size = size;
63          }
64  
65          /**
66           * Updates the original {@link ZipArchiveEntry} with sizes/crc. Do not use this method from threads that did not create the instance itself!
67           *
68           * @return the zipArchiveEntry that is the basis for this request.
69           */
70  
71          public ZipArchiveEntry transferToArchiveEntry() {
72              final ZipArchiveEntry entry = zipArchiveEntryRequest.getZipArchiveEntry();
73              entry.setCompressedSize(compressedSize);
74              entry.setSize(size);
75              entry.setCrc(crc);
76              entry.setMethod(zipArchiveEntryRequest.getMethod());
77              return entry;
78          }
79      }
80  
81      public static class ZipEntryWriter implements Closeable {
82          private final Iterator<CompressedEntry> itemsIterator;
83          private final InputStream itemsIteratorData;
84  
85          public ZipEntryWriter(final ScatterZipOutputStream scatter) throws IOException {
86              scatter.backingStore.closeForWriting();
87              itemsIterator = scatter.items.iterator();
88              itemsIteratorData = scatter.backingStore.getInputStream();
89          }
90  
91          @Override
92          public void close() throws IOException {
93              if (itemsIteratorData != null) {
94                  itemsIteratorData.close();
95              }
96          }
97  
98          public void writeNextZipEntry(final ZipArchiveOutputStream target) throws IOException {
99              final CompressedEntry compressedEntry = itemsIterator.next();
100             // @formatter:off
101             try (BoundedInputStream rawStream = BoundedInputStream.builder()
102                     .setInputStream(itemsIteratorData)
103                     .setMaxCount(compressedEntry.compressedSize)
104                     .setPropagateClose(false)
105                     .get()) {
106                 target.addRawArchiveEntry(compressedEntry.transferToArchiveEntry(), rawStream);
107             }
108             // @formatter:on
109         }
110     }
111 
112     /**
113      * Creates a {@link ScatterZipOutputStream} with default compression level that is backed by a file
114      *
115      * @param file The file to offload compressed data into.
116      * @return A ScatterZipOutputStream that is ready for use.
117      * @throws FileNotFoundException if the file cannot be found
118      */
119     public static ScatterZipOutputStream fileBased(final File file) throws FileNotFoundException {
120         return pathBased(file.toPath(), Deflater.DEFAULT_COMPRESSION);
121     }
122 
123     /**
124      * Creates a {@link ScatterZipOutputStream} that is backed by a file
125      *
126      * @param file             The file to offload compressed data into.
127      * @param compressionLevel The compression level to use, @see #Deflater
128      * @return A ScatterZipOutputStream that is ready for use.
129      * @throws FileNotFoundException if the file cannot be found
130      */
131     public static ScatterZipOutputStream fileBased(final File file, final int compressionLevel) throws FileNotFoundException {
132         return pathBased(file.toPath(), compressionLevel);
133     }
134 
135     /**
136      * Creates a {@link ScatterZipOutputStream} with default compression level that is backed by a file
137      *
138      * @param path The path to offload compressed data into.
139      * @return A ScatterZipOutputStream that is ready for use.
140      * @throws FileNotFoundException if the path cannot be found
141      * @since 1.22
142      */
143     public static ScatterZipOutputStream pathBased(final Path path) throws FileNotFoundException {
144         return pathBased(path, Deflater.DEFAULT_COMPRESSION);
145     }
146 
147     /**
148      * Creates a {@link ScatterZipOutputStream} that is backed by a file
149      *
150      * @param path             The path to offload compressed data into.
151      * @param compressionLevel The compression level to use, @see #Deflater
152      * @return A ScatterZipOutputStream that is ready for use.
153      * @throws FileNotFoundException if the path cannot be found
154      * @since 1.22
155      */
156     public static ScatterZipOutputStream pathBased(final Path path, final int compressionLevel) throws FileNotFoundException {
157         final ScatterGatherBackingStore bs = new FileBasedScatterGatherBackingStore(path);
158         // lifecycle is bound to the ScatterZipOutputStream returned
159         final StreamCompressor sc = StreamCompressor.create(compressionLevel, bs); // NOSONAR
160         return new ScatterZipOutputStream(bs, sc);
161     }
162 
163     private final Queue<CompressedEntry> items = new ConcurrentLinkedQueue<>();
164 
165     private final ScatterGatherBackingStore backingStore;
166 
167     private final StreamCompressor streamCompressor;
168 
169     private final AtomicBoolean isClosed = new AtomicBoolean();
170 
171     private ZipEntryWriter zipEntryWriter;
172 
173     public ScatterZipOutputStream(final ScatterGatherBackingStore backingStore, final StreamCompressor streamCompressor) {
174         this.backingStore = backingStore;
175         this.streamCompressor = streamCompressor;
176     }
177 
178     /**
179      * Adds an archive entry to this scatter stream.
180      *
181      * @param zipArchiveEntryRequest The entry to write.
182      * @throws IOException If writing fails
183      */
184     public void addArchiveEntry(final ZipArchiveEntryRequest zipArchiveEntryRequest) throws IOException {
185         try (InputStream payloadStream = zipArchiveEntryRequest.getPayloadStream()) {
186             streamCompressor.deflate(payloadStream, zipArchiveEntryRequest.getMethod());
187         }
188         items.add(new CompressedEntry(zipArchiveEntryRequest, streamCompressor.getCrc32(), streamCompressor.getBytesWrittenForLastEntry(),
189                 streamCompressor.getBytesRead()));
190     }
191 
192     /**
193      * Closes this stream, freeing all resources involved in the creation of this stream.
194      *
195      * @throws IOException If closing fails
196      */
197     @Override
198     public void close() throws IOException {
199         if (!isClosed.compareAndSet(false, true)) {
200             return;
201         }
202         try {
203             if (zipEntryWriter != null) {
204                 zipEntryWriter.close();
205             }
206             backingStore.close();
207         } finally {
208             streamCompressor.close();
209         }
210     }
211 
212     /**
213      * Writes the contents of this scatter stream to a target archive.
214      *
215      * @param target The archive to receive the contents of this {@link ScatterZipOutputStream}.
216      * @throws IOException If writing fails
217      * @see #zipEntryWriter()
218      */
219     public void writeTo(final ZipArchiveOutputStream target) throws IOException {
220         backingStore.closeForWriting();
221         try (InputStream data = backingStore.getInputStream()) {
222             for (final CompressedEntry compressedEntry : items) {
223                 // @formatter:off
224                 try (BoundedInputStream rawStream = BoundedInputStream.builder()
225                         .setInputStream(data)
226                         .setMaxCount(compressedEntry.compressedSize)
227                         .setPropagateClose(false)
228                         .get()) {
229                     target.addRawArchiveEntry(compressedEntry.transferToArchiveEntry(), rawStream);
230                 }
231                 // @formatter:on
232             }
233         }
234     }
235 
236     /**
237      * Gets a ZIP entry writer for this scatter stream.
238      *
239      * @throws IOException If getting scatter stream input stream
240      * @return the ZipEntryWriter created on first call of the method
241      */
242     public ZipEntryWriter zipEntryWriter() throws IOException {
243         if (zipEntryWriter == null) {
244             zipEntryWriter = new ZipEntryWriter(this);
245         }
246         return zipEntryWriter;
247     }
248 }