View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one or more
3    *  contributor license agreements.  See the NOTICE file distributed with
4    *  this work for additional information regarding copyright ownership.
5    *  The ASF licenses this file to You under the Apache License, Version 2.0
6    *  (the "License"); you may not use this file except in compliance with
7    *  the License.  You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   *  Unless required by applicable law or agreed to in writing, software
12   *  distributed under the License is distributed on an "AS IS" BASIS,
13   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   *  See the License for the specific language governing permissions and
15   *  limitations under the License.
16   */
17  package org.apache.commons.compress.archivers.zip;
18  
19  import java.io.Closeable;
20  import java.io.File;
21  import java.io.FileNotFoundException;
22  import java.io.IOException;
23  import java.io.InputStream;
24  import java.nio.file.Path;
25  import java.util.Iterator;
26  import java.util.Queue;
27  import java.util.concurrent.ConcurrentLinkedQueue;
28  import java.util.concurrent.atomic.AtomicBoolean;
29  import java.util.zip.Deflater;
30  
31  import org.apache.commons.compress.parallel.FileBasedScatterGatherBackingStore;
32  import org.apache.commons.compress.parallel.ScatterGatherBackingStore;
33  import org.apache.commons.compress.utils.BoundedInputStream;
34  
35  /**
36   * A ZIP output stream that is optimized for multi-threaded scatter/gather construction of ZIP files.
37   * <p>
38   * The internal data format of the entries used by this class are entirely private to this class and are not part of any public api whatsoever.
39   * </p>
40   * <p>
41   * It is possible to extend this class to support different kinds of backing storage, the default implementation only supports file-based backing.
42   * </p>
43   * <p>
44   * Thread safety: This class supports multiple threads. But the "writeTo" method must be called by the thread that originally created the
45   * {@link ZipArchiveEntry}.
46   * </p>
47   *
48   * @since 1.10
49   */
50  public class ScatterZipOutputStream implements Closeable {
51  
52      private static final class CompressedEntry {
53          final ZipArchiveEntryRequest zipArchiveEntryRequest;
54          final long crc;
55          final long compressedSize;
56          final long size;
57  
58          CompressedEntry(final ZipArchiveEntryRequest zipArchiveEntryRequest, final long crc, final long compressedSize, final long size) {
59              this.zipArchiveEntryRequest = zipArchiveEntryRequest;
60              this.crc = crc;
61              this.compressedSize = compressedSize;
62              this.size = size;
63          }
64  
65          /**
66           * Updates the original {@link ZipArchiveEntry} with sizes/crc. Do not use this method from threads that did not create the instance itself!
67           *
68           * @return the zipArchiveEntry that is the basis for this request.
69           */
70  
71          public ZipArchiveEntry transferToArchiveEntry() {
72              final ZipArchiveEntry entry = zipArchiveEntryRequest.getZipArchiveEntry();
73              entry.setCompressedSize(compressedSize);
74              entry.setSize(size);
75              entry.setCrc(crc);
76              entry.setMethod(zipArchiveEntryRequest.getMethod());
77              return entry;
78          }
79      }
80  
81      public static class ZipEntryWriter implements Closeable {
82          private final Iterator<CompressedEntry> itemsIterator;
83          private final InputStream itemsIteratorData;
84  
85          public ZipEntryWriter(final ScatterZipOutputStream scatter) throws IOException {
86              scatter.backingStore.closeForWriting();
87              itemsIterator = scatter.items.iterator();
88              itemsIteratorData = scatter.backingStore.getInputStream();
89          }
90  
91          @Override
92          public void close() throws IOException {
93              if (itemsIteratorData != null) {
94                  itemsIteratorData.close();
95              }
96          }
97  
98          public void writeNextZipEntry(final ZipArchiveOutputStream target) throws IOException {
99              final CompressedEntry compressedEntry = itemsIterator.next();
100             try (BoundedInputStream rawStream = new BoundedInputStream(itemsIteratorData, compressedEntry.compressedSize)) {
101                 target.addRawArchiveEntry(compressedEntry.transferToArchiveEntry(), rawStream);
102             }
103         }
104     }
105 
106     /**
107      * Creates a {@link ScatterZipOutputStream} with default compression level that is backed by a file
108      *
109      * @param file The file to offload compressed data into.
110      * @return A ScatterZipOutputStream that is ready for use.
111      * @throws FileNotFoundException if the file cannot be found
112      */
113     public static ScatterZipOutputStream fileBased(final File file) throws FileNotFoundException {
114         return pathBased(file.toPath(), Deflater.DEFAULT_COMPRESSION);
115     }
116 
117     /**
118      * Creates a {@link ScatterZipOutputStream} that is backed by a file
119      *
120      * @param file             The file to offload compressed data into.
121      * @param compressionLevel The compression level to use, @see #Deflater
122      * @return A ScatterZipOutputStream that is ready for use.
123      * @throws FileNotFoundException if the file cannot be found
124      */
125     public static ScatterZipOutputStream fileBased(final File file, final int compressionLevel) throws FileNotFoundException {
126         return pathBased(file.toPath(), compressionLevel);
127     }
128 
129     /**
130      * Creates a {@link ScatterZipOutputStream} with default compression level that is backed by a file
131      *
132      * @param path The path to offload compressed data into.
133      * @return A ScatterZipOutputStream that is ready for use.
134      * @throws FileNotFoundException if the path cannot be found
135      * @since 1.22
136      */
137     public static ScatterZipOutputStream pathBased(final Path path) throws FileNotFoundException {
138         return pathBased(path, Deflater.DEFAULT_COMPRESSION);
139     }
140 
141     /**
142      * Creates a {@link ScatterZipOutputStream} that is backed by a file
143      *
144      * @param path             The path to offload compressed data into.
145      * @param compressionLevel The compression level to use, @see #Deflater
146      * @return A ScatterZipOutputStream that is ready for use.
147      * @throws FileNotFoundException if the path cannot be found
148      * @since 1.22
149      */
150     public static ScatterZipOutputStream pathBased(final Path path, final int compressionLevel) throws FileNotFoundException {
151         final ScatterGatherBackingStore bs = new FileBasedScatterGatherBackingStore(path);
152         // lifecycle is bound to the ScatterZipOutputStream returned
153         final StreamCompressor sc = StreamCompressor.create(compressionLevel, bs); // NOSONAR
154         return new ScatterZipOutputStream(bs, sc);
155     }
156 
157     private final Queue<CompressedEntry> items = new ConcurrentLinkedQueue<>();
158 
159     private final ScatterGatherBackingStore backingStore;
160 
161     private final StreamCompressor streamCompressor;
162 
163     private final AtomicBoolean isClosed = new AtomicBoolean();
164 
165     private ZipEntryWriter zipEntryWriter;
166 
167     public ScatterZipOutputStream(final ScatterGatherBackingStore backingStore, final StreamCompressor streamCompressor) {
168         this.backingStore = backingStore;
169         this.streamCompressor = streamCompressor;
170     }
171 
172     /**
173      * Adds an archive entry to this scatter stream.
174      *
175      * @param zipArchiveEntryRequest The entry to write.
176      * @throws IOException If writing fails
177      */
178     public void addArchiveEntry(final ZipArchiveEntryRequest zipArchiveEntryRequest) throws IOException {
179         try (InputStream payloadStream = zipArchiveEntryRequest.getPayloadStream()) {
180             streamCompressor.deflate(payloadStream, zipArchiveEntryRequest.getMethod());
181         }
182         items.add(new CompressedEntry(zipArchiveEntryRequest, streamCompressor.getCrc32(), streamCompressor.getBytesWrittenForLastEntry(),
183                 streamCompressor.getBytesRead()));
184     }
185 
186     /**
187      * Closes this stream, freeing all resources involved in the creation of this stream.
188      *
189      * @throws IOException If closing fails
190      */
191     @Override
192     public void close() throws IOException {
193         if (!isClosed.compareAndSet(false, true)) {
194             return;
195         }
196         try {
197             if (zipEntryWriter != null) {
198                 zipEntryWriter.close();
199             }
200             backingStore.close();
201         } finally {
202             streamCompressor.close();
203         }
204     }
205 
206     /**
207      * Writes the contents of this scatter stream to a target archive.
208      *
209      * @param target The archive to receive the contents of this {@link ScatterZipOutputStream}.
210      * @throws IOException If writing fails
211      * @see #zipEntryWriter()
212      */
213     public void writeTo(final ZipArchiveOutputStream target) throws IOException {
214         backingStore.closeForWriting();
215         try (InputStream data = backingStore.getInputStream()) {
216             for (final CompressedEntry compressedEntry : items) {
217                 try (BoundedInputStream rawStream = new BoundedInputStream(data, compressedEntry.compressedSize)) {
218                     target.addRawArchiveEntry(compressedEntry.transferToArchiveEntry(), rawStream);
219                 }
220             }
221         }
222     }
223 
224     /**
225      * Gets a ZIP entry writer for this scatter stream.
226      *
227      * @throws IOException If getting scatter stream input stream
228      * @return the ZipEntryWriter created on first call of the method
229      */
230     public ZipEntryWriter zipEntryWriter() throws IOException {
231         if (zipEntryWriter == null) {
232             zipEntryWriter = new ZipEntryWriter(this);
233         }
234         return zipEntryWriter;
235     }
236 }