1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.commons.compress.archivers.zip;
20
21 import static org.apache.commons.compress.archivers.zip.ZipArchiveEntryRequest.createZipArchiveEntryRequest;
22
23 import java.io.IOException;
24 import java.io.UncheckedIOException;
25 import java.util.Deque;
26 import java.util.concurrent.Callable;
27 import java.util.concurrent.ConcurrentLinkedDeque;
28 import java.util.concurrent.ExecutionException;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.Future;
32 import java.util.concurrent.TimeUnit;
33 import java.util.zip.Deflater;
34
35 import org.apache.commons.compress.parallel.InputStreamSupplier;
36 import org.apache.commons.compress.parallel.ScatterGatherBackingStore;
37 import org.apache.commons.compress.parallel.ScatterGatherBackingStoreSupplier;
38 import org.apache.commons.io.IOUtils;
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54 public class ParallelScatterZipCreator {
55
56 private final Deque<ScatterZipOutputStream> streams = new ConcurrentLinkedDeque<>();
57 private final ExecutorService executorService;
58 private final ScatterGatherBackingStoreSupplier backingStoreSupplier;
59
60 private final Deque<Future<? extends ScatterZipOutputStream>> futures = new ConcurrentLinkedDeque<>();
61 private final long startedAt = System.currentTimeMillis();
62 private long compressionDoneAt;
63 private long scatterDoneAt;
64
65 private final int compressionLevel;
66
67 private final ThreadLocal<ScatterZipOutputStream> tlScatterStreams = new ThreadLocal<ScatterZipOutputStream>() {
68 @Override
69 protected ScatterZipOutputStream initialValue() {
70 try {
71 final ScatterZipOutputStream scatterStream = createDeferred(backingStoreSupplier);
72 streams.add(scatterStream);
73 return scatterStream;
74 } catch (final IOException e) {
75 throw new UncheckedIOException(e);
76 }
77 }
78 };
79
80
81
82
83
84 public ParallelScatterZipCreator() {
85 this(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
86 }
87
88
89
90
91
92
93 public ParallelScatterZipCreator(final ExecutorService executorService) {
94 this(executorService, new DefaultBackingStoreSupplier(null));
95 }
96
97
98
99
100
101
102
103 public ParallelScatterZipCreator(final ExecutorService executorService, final ScatterGatherBackingStoreSupplier backingStoreSupplier) {
104 this(executorService, backingStoreSupplier, Deflater.DEFAULT_COMPRESSION);
105 }
106
107
108
109
110
111
112
113
114
115
116 public ParallelScatterZipCreator(final ExecutorService executorService, final ScatterGatherBackingStoreSupplier backingStoreSupplier,
117 final int compressionLevel) throws IllegalArgumentException {
118 if ((compressionLevel < Deflater.NO_COMPRESSION || compressionLevel > Deflater.BEST_COMPRESSION) && compressionLevel != Deflater.DEFAULT_COMPRESSION) {
119 throw new IllegalArgumentException("Compression level is expected between -1~9");
120 }
121
122 this.backingStoreSupplier = backingStoreSupplier;
123 this.executorService = executorService;
124 this.compressionLevel = compressionLevel;
125 }
126
127
128
129
130
131
132
133
134
135
136
137 public void addArchiveEntry(final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier source) {
138 submitStreamAwareCallable(createCallable(zipArchiveEntry, source));
139 }
140
141
142
143
144
145
146
147
148
149
150 public void addArchiveEntry(final ZipArchiveEntryRequestSupplier zipArchiveEntryRequestSupplier) {
151 submitStreamAwareCallable(createCallable(zipArchiveEntryRequestSupplier));
152 }
153
154 private void closeAll() {
155 for (final ScatterZipOutputStream scatterStream : streams) {
156 IOUtils.closeQuietly(scatterStream);
157 }
158 }
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178 public final Callable<ScatterZipOutputStream> createCallable(final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier source) {
179 final int method = zipArchiveEntry.getMethod();
180 if (method == ZipMethod.UNKNOWN_CODE) {
181 throw new IllegalArgumentException("Method must be set on zipArchiveEntry: " + zipArchiveEntry);
182 }
183 final ZipArchiveEntryRequest zipArchiveEntryRequest = createZipArchiveEntryRequest(zipArchiveEntry, source);
184 return () -> {
185 final ScatterZipOutputStream scatterStream = tlScatterStreams.get();
186 scatterStream.addArchiveEntry(zipArchiveEntryRequest);
187 return scatterStream;
188 };
189 }
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207 public final Callable<ScatterZipOutputStream> createCallable(final ZipArchiveEntryRequestSupplier zipArchiveEntryRequestSupplier) {
208 return () -> {
209 final ScatterZipOutputStream scatterStream = tlScatterStreams.get();
210 scatterStream.addArchiveEntry(zipArchiveEntryRequestSupplier.get());
211 return scatterStream;
212 };
213 }
214
215 @SuppressWarnings("resource")
216 private ScatterZipOutputStream createDeferred(final ScatterGatherBackingStoreSupplier scatterGatherBackingStoreSupplier) throws IOException {
217 final ScatterGatherBackingStore bs = scatterGatherBackingStoreSupplier.get();
218
219 final StreamCompressor sc = StreamCompressor.create(compressionLevel, bs);
220 return new ScatterZipOutputStream(bs, sc);
221 }
222
223
224
225
226
227
228 public ScatterStatistics getStatisticsMessage() {
229 return new ScatterStatistics(compressionDoneAt - startedAt, scatterDoneAt - compressionDoneAt);
230 }
231
232
233
234
235
236
237
238 public final void submit(final Callable<? extends Object> callable) {
239 submitStreamAwareCallable(() -> {
240 callable.call();
241 return tlScatterStreams.get();
242 });
243 }
244
245
246
247
248
249
250
251
252 public final void submitStreamAwareCallable(final Callable<? extends ScatterZipOutputStream> callable) {
253 futures.add(executorService.submit(callable));
254 }
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271 public void writeTo(final ZipArchiveOutputStream targetStream) throws IOException, InterruptedException, ExecutionException {
272
273 try {
274
275 try {
276 for (final Future<?> future : futures) {
277 future.get();
278 }
279 } finally {
280 executorService.shutdown();
281 }
282
283 executorService.awaitTermination(1000 * 60L, TimeUnit.SECONDS);
284
285
286 compressionDoneAt = System.currentTimeMillis();
287
288 for (final Future<? extends ScatterZipOutputStream> future : futures) {
289 final ScatterZipOutputStream scatterStream = future.get();
290 scatterStream.zipEntryWriter().writeNextZipEntry(targetStream);
291 }
292
293 for (final ScatterZipOutputStream scatterStream : streams) {
294 scatterStream.close();
295 }
296
297 scatterDoneAt = System.currentTimeMillis();
298 } finally {
299 closeAll();
300 }
301 }
302 }