1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.commons.compress.archivers.zip;
18
19 import java.io.Closeable;
20 import java.io.File;
21 import java.io.FileNotFoundException;
22 import java.io.IOException;
23 import java.io.InputStream;
24 import java.nio.file.Path;
25 import java.util.Iterator;
26 import java.util.Queue;
27 import java.util.concurrent.ConcurrentLinkedQueue;
28 import java.util.concurrent.atomic.AtomicBoolean;
29 import java.util.zip.Deflater;
30
31 import org.apache.commons.compress.parallel.FileBasedScatterGatherBackingStore;
32 import org.apache.commons.compress.parallel.ScatterGatherBackingStore;
33 import org.apache.commons.io.input.BoundedInputStream;
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50 public class ScatterZipOutputStream implements Closeable {
51
52 private static final class CompressedEntry {
53 final ZipArchiveEntryRequest zipArchiveEntryRequest;
54 final long crc;
55 final long compressedSize;
56 final long size;
57
58 CompressedEntry(final ZipArchiveEntryRequest zipArchiveEntryRequest, final long crc, final long compressedSize, final long size) {
59 this.zipArchiveEntryRequest = zipArchiveEntryRequest;
60 this.crc = crc;
61 this.compressedSize = compressedSize;
62 this.size = size;
63 }
64
65
66
67
68
69
70
71 public ZipArchiveEntry transferToArchiveEntry() {
72 final ZipArchiveEntry entry = zipArchiveEntryRequest.getZipArchiveEntry();
73 entry.setCompressedSize(compressedSize);
74 entry.setSize(size);
75 entry.setCrc(crc);
76 entry.setMethod(zipArchiveEntryRequest.getMethod());
77 return entry;
78 }
79 }
80
81 public static class ZipEntryWriter implements Closeable {
82 private final Iterator<CompressedEntry> itemsIterator;
83 private final InputStream itemsIteratorData;
84
85 public ZipEntryWriter(final ScatterZipOutputStream scatter) throws IOException {
86 scatter.backingStore.closeForWriting();
87 itemsIterator = scatter.items.iterator();
88 itemsIteratorData = scatter.backingStore.getInputStream();
89 }
90
91 @Override
92 public void close() throws IOException {
93 if (itemsIteratorData != null) {
94 itemsIteratorData.close();
95 }
96 }
97
98 public void writeNextZipEntry(final ZipArchiveOutputStream target) throws IOException {
99 final CompressedEntry compressedEntry = itemsIterator.next();
100
101 try (BoundedInputStream rawStream = BoundedInputStream.builder()
102 .setInputStream(itemsIteratorData)
103 .setMaxCount(compressedEntry.compressedSize)
104 .setPropagateClose(false)
105 .get()) {
106 target.addRawArchiveEntry(compressedEntry.transferToArchiveEntry(), rawStream);
107 }
108
109 }
110 }
111
112
113
114
115
116
117
118
119 public static ScatterZipOutputStream fileBased(final File file) throws FileNotFoundException {
120 return pathBased(file.toPath(), Deflater.DEFAULT_COMPRESSION);
121 }
122
123
124
125
126
127
128
129
130
131 public static ScatterZipOutputStream fileBased(final File file, final int compressionLevel) throws FileNotFoundException {
132 return pathBased(file.toPath(), compressionLevel);
133 }
134
135
136
137
138
139
140
141
142
143 public static ScatterZipOutputStream pathBased(final Path path) throws FileNotFoundException {
144 return pathBased(path, Deflater.DEFAULT_COMPRESSION);
145 }
146
147
148
149
150
151
152
153
154
155
156 public static ScatterZipOutputStream pathBased(final Path path, final int compressionLevel) throws FileNotFoundException {
157 final ScatterGatherBackingStore bs = new FileBasedScatterGatherBackingStore(path);
158
159 final StreamCompressor sc = StreamCompressor.create(compressionLevel, bs);
160 return new ScatterZipOutputStream(bs, sc);
161 }
162
163 private final Queue<CompressedEntry> items = new ConcurrentLinkedQueue<>();
164
165 private final ScatterGatherBackingStore backingStore;
166
167 private final StreamCompressor streamCompressor;
168
169 private final AtomicBoolean isClosed = new AtomicBoolean();
170
171 private ZipEntryWriter zipEntryWriter;
172
173 public ScatterZipOutputStream(final ScatterGatherBackingStore backingStore, final StreamCompressor streamCompressor) {
174 this.backingStore = backingStore;
175 this.streamCompressor = streamCompressor;
176 }
177
178
179
180
181
182
183
184 public void addArchiveEntry(final ZipArchiveEntryRequest zipArchiveEntryRequest) throws IOException {
185 try (InputStream payloadStream = zipArchiveEntryRequest.getPayloadStream()) {
186 streamCompressor.deflate(payloadStream, zipArchiveEntryRequest.getMethod());
187 }
188 items.add(new CompressedEntry(zipArchiveEntryRequest, streamCompressor.getCrc32(), streamCompressor.getBytesWrittenForLastEntry(),
189 streamCompressor.getBytesRead()));
190 }
191
192
193
194
195
196
197 @Override
198 public void close() throws IOException {
199 if (!isClosed.compareAndSet(false, true)) {
200 return;
201 }
202 try {
203 if (zipEntryWriter != null) {
204 zipEntryWriter.close();
205 }
206 backingStore.close();
207 } finally {
208 streamCompressor.close();
209 }
210 }
211
212
213
214
215
216
217
218
219 public void writeTo(final ZipArchiveOutputStream target) throws IOException {
220 backingStore.closeForWriting();
221 try (InputStream data = backingStore.getInputStream()) {
222 for (final CompressedEntry compressedEntry : items) {
223
224 try (BoundedInputStream rawStream = BoundedInputStream.builder()
225 .setInputStream(data)
226 .setMaxCount(compressedEntry.compressedSize)
227 .setPropagateClose(false)
228 .get()) {
229 target.addRawArchiveEntry(compressedEntry.transferToArchiveEntry(), rawStream);
230 }
231
232 }
233 }
234 }
235
236
237
238
239
240
241
242 public ZipEntryWriter zipEntryWriter() throws IOException {
243 if (zipEntryWriter == null) {
244 zipEntryWriter = new ZipEntryWriter(this);
245 }
246 return zipEntryWriter;
247 }
248 }