1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.commons.compress.archivers.zip;
20
21 import java.io.Closeable;
22 import java.io.File;
23 import java.io.FileNotFoundException;
24 import java.io.IOException;
25 import java.io.InputStream;
26 import java.nio.file.Path;
27 import java.util.Iterator;
28 import java.util.Queue;
29 import java.util.concurrent.ConcurrentLinkedQueue;
30 import java.util.concurrent.atomic.AtomicBoolean;
31 import java.util.zip.Deflater;
32
33 import org.apache.commons.compress.parallel.FileBasedScatterGatherBackingStore;
34 import org.apache.commons.compress.parallel.ScatterGatherBackingStore;
35 import org.apache.commons.io.IOUtils;
36 import org.apache.commons.io.input.BoundedInputStream;
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53 public class ScatterZipOutputStream implements Closeable {
54
55 private static final class CompressedEntry {
56 final ZipArchiveEntryRequest zipArchiveEntryRequest;
57 final long crc;
58 final long compressedSize;
59 final long size;
60
61 CompressedEntry(final ZipArchiveEntryRequest zipArchiveEntryRequest, final long crc, final long compressedSize, final long size) {
62 this.zipArchiveEntryRequest = zipArchiveEntryRequest;
63 this.crc = crc;
64 this.compressedSize = compressedSize;
65 this.size = size;
66 }
67
68
69
70
71
72
73 public ZipArchiveEntry transferToArchiveEntry() {
74 final ZipArchiveEntry entry = zipArchiveEntryRequest.getZipArchiveEntry();
75 entry.setCompressedSize(compressedSize);
76 entry.setSize(size);
77 entry.setCrc(crc);
78 entry.setMethod(zipArchiveEntryRequest.getMethod());
79 return entry;
80 }
81 }
82
83
84
85
86 public static class ZipEntryWriter implements Closeable {
87 private final Iterator<CompressedEntry> itemsIterator;
88 private final InputStream inputStream;
89
90
91
92
93
94
95
96 public ZipEntryWriter(final ScatterZipOutputStream out) throws IOException {
97 out.backingStore.closeForWriting();
98 itemsIterator = out.items.iterator();
99 inputStream = out.backingStore.getInputStream();
100 }
101
102 @Override
103 public void close() throws IOException {
104 IOUtils.close(inputStream);
105 }
106
107
108
109
110
111
112
113 public void writeNextZipEntry(final ZipArchiveOutputStream target) throws IOException {
114 final CompressedEntry compressedEntry = itemsIterator.next();
115
116 try (BoundedInputStream rawStream = BoundedInputStream.builder()
117 .setInputStream(inputStream)
118 .setMaxCount(compressedEntry.compressedSize)
119 .setPropagateClose(false)
120 .get()) {
121 target.addRawArchiveEntry(compressedEntry.transferToArchiveEntry(), rawStream);
122 }
123
124 }
125 }
126
127
128
129
130
131
132
133
134 public static ScatterZipOutputStream fileBased(final File file) throws FileNotFoundException {
135 return pathBased(file.toPath(), Deflater.DEFAULT_COMPRESSION);
136 }
137
138
139
140
141
142
143
144
145
146 public static ScatterZipOutputStream fileBased(final File file, final int compressionLevel) throws FileNotFoundException {
147 return pathBased(file.toPath(), compressionLevel);
148 }
149
150
151
152
153
154
155
156
157
158 public static ScatterZipOutputStream pathBased(final Path path) throws FileNotFoundException {
159 return pathBased(path, Deflater.DEFAULT_COMPRESSION);
160 }
161
162
163
164
165
166
167
168
169
170
171 public static ScatterZipOutputStream pathBased(final Path path, final int compressionLevel) throws FileNotFoundException {
172 final ScatterGatherBackingStore bs = new FileBasedScatterGatherBackingStore(path);
173
174 final StreamCompressor sc = StreamCompressor.create(compressionLevel, bs);
175 return new ScatterZipOutputStream(bs, sc);
176 }
177
178 private final Queue<CompressedEntry> items = new ConcurrentLinkedQueue<>();
179
180 private final ScatterGatherBackingStore backingStore;
181
182 private final StreamCompressor streamCompressor;
183
184 private final AtomicBoolean isClosed = new AtomicBoolean();
185
186 private ZipEntryWriter zipEntryWriter;
187
188
189
190
191
192
193
194 public ScatterZipOutputStream(final ScatterGatherBackingStore backingStore, final StreamCompressor streamCompressor) {
195 this.backingStore = backingStore;
196 this.streamCompressor = streamCompressor;
197 }
198
199
200
201
202
203
204
205 public void addArchiveEntry(final ZipArchiveEntryRequest zipArchiveEntryRequest) throws IOException {
206 try (InputStream payloadStream = zipArchiveEntryRequest.getPayloadStream()) {
207 streamCompressor.deflate(payloadStream, zipArchiveEntryRequest.getMethod());
208 }
209 items.add(new CompressedEntry(zipArchiveEntryRequest, streamCompressor.getCrc32(), streamCompressor.getBytesWrittenForLastEntry(),
210 streamCompressor.getBytesRead()));
211 }
212
213
214
215
216
217
218 @Override
219 public void close() throws IOException {
220 if (!isClosed.compareAndSet(false, true)) {
221 return;
222 }
223 try {
224 IOUtils.close(zipEntryWriter);
225 backingStore.close();
226 } finally {
227 streamCompressor.close();
228 }
229 }
230
231
232
233
234
235
236
237
238 public void writeTo(final ZipArchiveOutputStream target) throws IOException {
239 backingStore.closeForWriting();
240 try (InputStream data = backingStore.getInputStream()) {
241 for (final CompressedEntry compressedEntry : items) {
242
243 try (BoundedInputStream rawStream = BoundedInputStream.builder()
244 .setInputStream(data)
245 .setMaxCount(compressedEntry.compressedSize)
246 .setPropagateClose(false)
247 .get()) {
248 target.addRawArchiveEntry(compressedEntry.transferToArchiveEntry(), rawStream);
249 }
250
251 }
252 }
253 }
254
255
256
257
258
259
260
261 public ZipEntryWriter zipEntryWriter() throws IOException {
262 if (zipEntryWriter == null) {
263 zipEntryWriter = new ZipEntryWriter(this);
264 }
265 return zipEntryWriter;
266 }
267 }