1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * https://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19 package org.apache.commons.compress.archivers.zip;
20
21 import java.io.Closeable;
22 import java.io.File;
23 import java.io.FileNotFoundException;
24 import java.io.IOException;
25 import java.io.InputStream;
26 import java.nio.file.Path;
27 import java.util.Iterator;
28 import java.util.Queue;
29 import java.util.concurrent.ConcurrentLinkedQueue;
30 import java.util.concurrent.atomic.AtomicBoolean;
31 import java.util.zip.Deflater;
32
33 import org.apache.commons.compress.parallel.FileBasedScatterGatherBackingStore;
34 import org.apache.commons.compress.parallel.ScatterGatherBackingStore;
35 import org.apache.commons.io.IOUtils;
36 import org.apache.commons.io.input.BoundedInputStream;
37
38 /**
39 * A ZIP output stream that is optimized for multi-threaded scatter/gather construction of ZIP files.
40 * <p>
41 * The internal data format of the entries used by this class are entirely private to this class and are not part of any public api whatsoever.
42 * </p>
43 * <p>
44 * It is possible to extend this class to support different kinds of backing storage, the default implementation only supports file-based backing.
45 * </p>
46 * <p>
47 * Thread safety: This class supports multiple threads. But the "writeTo" method must be called by the thread that originally created the
48 * {@link ZipArchiveEntry}.
49 * </p>
50 *
51 * @since 1.10
52 */
53 public class ScatterZipOutputStream implements Closeable {
54
55 private static final class CompressedEntry {
56 final ZipArchiveEntryRequest zipArchiveEntryRequest;
57 final long crc;
58 final long compressedSize;
59 final long size;
60
61 CompressedEntry(final ZipArchiveEntryRequest zipArchiveEntryRequest, final long crc, final long compressedSize, final long size) {
62 this.zipArchiveEntryRequest = zipArchiveEntryRequest;
63 this.crc = crc;
64 this.compressedSize = compressedSize;
65 this.size = size;
66 }
67
68 /**
69 * Updates the original {@link ZipArchiveEntry} with sizes/CRC. Do not use this method from threads that did not create the instance itself!
70 *
71 * @return the zipArchiveEntry that is the basis for this request.
72 */
73 public ZipArchiveEntry transferToArchiveEntry() {
74 final ZipArchiveEntry entry = zipArchiveEntryRequest.getZipArchiveEntry();
75 entry.setCompressedSize(compressedSize);
76 entry.setSize(size);
77 entry.setCrc(crc);
78 entry.setMethod(zipArchiveEntryRequest.getMethod());
79 return entry;
80 }
81 }
82
83 /**
84 * Writes ZIP entries to a ZIP archive.
85 */
86 public static class ZipEntryWriter implements Closeable {
87 private final Iterator<CompressedEntry> itemsIterator;
88 private final InputStream inputStream;
89
90 /**
91 * Constructs a new instance.
92 *
93 * @param out a ScatterZipOutputStream.
94 * @throws IOException if an I/O error occurs.
95 */
96 public ZipEntryWriter(final ScatterZipOutputStream out) throws IOException {
97 out.backingStore.closeForWriting();
98 itemsIterator = out.items.iterator();
99 inputStream = out.backingStore.getInputStream();
100 }
101
102 @Override
103 public void close() throws IOException {
104 IOUtils.close(inputStream);
105 }
106
107 /**
108 * Writes the next ZIP entry to the given target.
109 *
110 * @param target Where to write.
111 * @throws IOException if an I/O error occurs.
112 */
113 public void writeNextZipEntry(final ZipArchiveOutputStream target) throws IOException {
114 final CompressedEntry compressedEntry = itemsIterator.next();
115 // @formatter:off
116 try (BoundedInputStream rawStream = BoundedInputStream.builder()
117 .setInputStream(inputStream)
118 .setMaxCount(compressedEntry.compressedSize)
119 .setPropagateClose(false)
120 .get()) {
121 target.addRawArchiveEntry(compressedEntry.transferToArchiveEntry(), rawStream);
122 }
123 // @formatter:on
124 }
125 }
126
127 /**
128 * Creates a {@link ScatterZipOutputStream} with default compression level that is backed by a file
129 *
130 * @param file The file to offload compressed data into.
131 * @return A ScatterZipOutputStream that is ready for use.
132 * @throws FileNotFoundException if the file cannot be found
133 */
134 public static ScatterZipOutputStream fileBased(final File file) throws FileNotFoundException {
135 return pathBased(file.toPath(), Deflater.DEFAULT_COMPRESSION);
136 }
137
138 /**
139 * Creates a {@link ScatterZipOutputStream} that is backed by a file
140 *
141 * @param file The file to offload compressed data into.
142 * @param compressionLevel The compression level to use, @see #Deflater
143 * @return A ScatterZipOutputStream that is ready for use.
144 * @throws FileNotFoundException if the file cannot be found
145 */
146 public static ScatterZipOutputStream fileBased(final File file, final int compressionLevel) throws FileNotFoundException {
147 return pathBased(file.toPath(), compressionLevel);
148 }
149
150 /**
151 * Creates a {@link ScatterZipOutputStream} with default compression level that is backed by a file
152 *
153 * @param path The path to offload compressed data into.
154 * @return A ScatterZipOutputStream that is ready for use.
155 * @throws FileNotFoundException if the path cannot be found
156 * @since 1.22
157 */
158 public static ScatterZipOutputStream pathBased(final Path path) throws FileNotFoundException {
159 return pathBased(path, Deflater.DEFAULT_COMPRESSION);
160 }
161
162 /**
163 * Creates a {@link ScatterZipOutputStream} that is backed by a file
164 *
165 * @param path The path to offload compressed data into.
166 * @param compressionLevel The compression level to use, @see #Deflater
167 * @return A ScatterZipOutputStream that is ready for use.
168 * @throws FileNotFoundException if the path cannot be found
169 * @since 1.22
170 */
171 public static ScatterZipOutputStream pathBased(final Path path, final int compressionLevel) throws FileNotFoundException {
172 final ScatterGatherBackingStore bs = new FileBasedScatterGatherBackingStore(path);
173 // lifecycle is bound to the ScatterZipOutputStream returned
174 final StreamCompressor sc = StreamCompressor.create(compressionLevel, bs); // NOSONAR
175 return new ScatterZipOutputStream(bs, sc);
176 }
177
178 private final Queue<CompressedEntry> items = new ConcurrentLinkedQueue<>();
179
180 private final ScatterGatherBackingStore backingStore;
181
182 private final StreamCompressor streamCompressor;
183
184 private final AtomicBoolean isClosed = new AtomicBoolean();
185
186 private ZipEntryWriter zipEntryWriter;
187
188 /**
189 * Constructs a new instance.
190 *
191 * @param backingStore the backing store.
192 * @param streamCompressor Deflates ZIP entries.
193 */
194 public ScatterZipOutputStream(final ScatterGatherBackingStore backingStore, final StreamCompressor streamCompressor) {
195 this.backingStore = backingStore;
196 this.streamCompressor = streamCompressor;
197 }
198
199 /**
200 * Adds an archive entry to this scatter stream.
201 *
202 * @param zipArchiveEntryRequest The entry to write.
203 * @throws IOException If writing fails
204 */
205 public void addArchiveEntry(final ZipArchiveEntryRequest zipArchiveEntryRequest) throws IOException {
206 try (InputStream payloadStream = zipArchiveEntryRequest.getPayloadStream()) {
207 streamCompressor.deflate(payloadStream, zipArchiveEntryRequest.getMethod());
208 }
209 items.add(new CompressedEntry(zipArchiveEntryRequest, streamCompressor.getCrc32(), streamCompressor.getBytesWrittenForLastEntry(),
210 streamCompressor.getBytesRead()));
211 }
212
213 /**
214 * Closes this stream, freeing all resources involved in the creation of this stream.
215 *
216 * @throws IOException If closing fails
217 */
218 @Override
219 public void close() throws IOException {
220 if (!isClosed.compareAndSet(false, true)) {
221 return;
222 }
223 try {
224 IOUtils.close(zipEntryWriter);
225 backingStore.close();
226 } finally {
227 streamCompressor.close();
228 }
229 }
230
231 /**
232 * Writes the contents of this scatter stream to a target archive.
233 *
234 * @param target The archive to receive the contents of this {@link ScatterZipOutputStream}.
235 * @throws IOException If writing fails
236 * @see #zipEntryWriter()
237 */
238 public void writeTo(final ZipArchiveOutputStream target) throws IOException {
239 backingStore.closeForWriting();
240 try (InputStream data = backingStore.getInputStream()) {
241 for (final CompressedEntry compressedEntry : items) {
242 // @formatter:off
243 try (BoundedInputStream rawStream = BoundedInputStream.builder()
244 .setInputStream(data)
245 .setMaxCount(compressedEntry.compressedSize)
246 .setPropagateClose(false)
247 .get()) {
248 target.addRawArchiveEntry(compressedEntry.transferToArchiveEntry(), rawStream);
249 }
250 // @formatter:on
251 }
252 }
253 }
254
255 /**
256 * Gets a ZIP entry writer for this scatter stream.
257 *
258 * @throws IOException If getting scatter stream input stream
259 * @return the ZipEntryWriter created on first call of the method
260 */
261 public ZipEntryWriter zipEntryWriter() throws IOException {
262 if (zipEntryWriter == null) {
263 zipEntryWriter = new ZipEntryWriter(this);
264 }
265 return zipEntryWriter;
266 }
267 }