001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.commons.compress.archivers.zip; 018 019import static org.apache.commons.compress.archivers.zip.ZipArchiveEntryRequest.createZipArchiveEntryRequest; 020 021import java.io.IOException; 022import java.io.UncheckedIOException; 023import java.util.Deque; 024import java.util.concurrent.Callable; 025import java.util.concurrent.ConcurrentLinkedDeque; 026import java.util.concurrent.ExecutionException; 027import java.util.concurrent.ExecutorService; 028import java.util.concurrent.Executors; 029import java.util.concurrent.Future; 030import java.util.concurrent.TimeUnit; 031import java.util.zip.Deflater; 032 033import org.apache.commons.compress.parallel.InputStreamSupplier; 034import org.apache.commons.compress.parallel.ScatterGatherBackingStore; 035import org.apache.commons.compress.parallel.ScatterGatherBackingStoreSupplier; 036 037/** 038 * Creates a ZIP in parallel by using multiple threadlocal {@link ScatterZipOutputStream} instances. 039 * <p> 040 * Note that until 1.18, this class generally made no guarantees about the order of things written to the output file. Things that needed to come in a specific 041 * order (manifests, directories) had to be handled by the client of this class, usually by writing these things to the {@link ZipArchiveOutputStream} 042 * <em>before</em> calling {@link #writeTo writeTo} on this class. 043 * </p> 044 * <p> 045 * The client can supply an {@link java.util.concurrent.ExecutorService}, but for reasons of memory model consistency, this will be shut down by this class 046 * prior to completion. 047 * </p> 048 * 049 * @since 1.10 050 */ 051public class ParallelScatterZipCreator { 052 053 private final Deque<ScatterZipOutputStream> streams = new ConcurrentLinkedDeque<>(); 054 private final ExecutorService executorService; 055 private final ScatterGatherBackingStoreSupplier backingStoreSupplier; 056 057 private final Deque<Future<? extends ScatterZipOutputStream>> futures = new ConcurrentLinkedDeque<>(); 058 private final long startedAt = System.currentTimeMillis(); 059 private long compressionDoneAt; 060 private long scatterDoneAt; 061 062 private final int compressionLevel; 063 064 private final ThreadLocal<ScatterZipOutputStream> tlScatterStreams = new ThreadLocal<ScatterZipOutputStream>() { 065 @Override 066 protected ScatterZipOutputStream initialValue() { 067 try { 068 final ScatterZipOutputStream scatterStream = createDeferred(backingStoreSupplier); 069 streams.add(scatterStream); 070 return scatterStream; 071 } catch (final IOException e) { 072 throw new UncheckedIOException(e); // NOSONAR 073 } 074 } 075 }; 076 077 /** 078 * Constructs a ParallelScatterZipCreator with default threads, which is set to the number of available processors, as defined by 079 * {@link Runtime#availableProcessors} 080 */ 081 public ParallelScatterZipCreator() { 082 this(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors())); 083 } 084 085 /** 086 * Constructs a ParallelScatterZipCreator 087 * 088 * @param executorService The executorService to use for parallel scheduling. For technical reasons, this will be shut down by this class. 089 */ 090 public ParallelScatterZipCreator(final ExecutorService executorService) { 091 this(executorService, new DefaultBackingStoreSupplier(null)); 092 } 093 094 /** 095 * Constructs a ParallelScatterZipCreator 096 * 097 * @param executorService The executorService to use. For technical reasons, this will be shut down by this class. 098 * @param backingStoreSupplier The supplier of backing store which shall be used 099 */ 100 public ParallelScatterZipCreator(final ExecutorService executorService, final ScatterGatherBackingStoreSupplier backingStoreSupplier) { 101 this(executorService, backingStoreSupplier, Deflater.DEFAULT_COMPRESSION); 102 } 103 104 /** 105 * Constructs a ParallelScatterZipCreator 106 * 107 * @param executorService The executorService to use. For technical reasons, this will be shut down by this class. 108 * @param backingStoreSupplier The supplier of backing store which shall be used 109 * @param compressionLevel The compression level used in compression, this value should be -1(default level) or between 0~9. 110 * @throws IllegalArgumentException if the compression level is illegal 111 * @since 1.21 112 */ 113 public ParallelScatterZipCreator(final ExecutorService executorService, final ScatterGatherBackingStoreSupplier backingStoreSupplier, 114 final int compressionLevel) throws IllegalArgumentException { 115 if ((compressionLevel < Deflater.NO_COMPRESSION || compressionLevel > Deflater.BEST_COMPRESSION) && compressionLevel != Deflater.DEFAULT_COMPRESSION) { 116 throw new IllegalArgumentException("Compression level is expected between -1~9"); 117 } 118 119 this.backingStoreSupplier = backingStoreSupplier; 120 this.executorService = executorService; 121 this.compressionLevel = compressionLevel; 122 } 123 124 /** 125 * Adds an archive entry to this archive. 126 * <p> 127 * This method is expected to be called from a single client thread 128 * </p> 129 * 130 * @param zipArchiveEntry The entry to add. 131 * @param source The source input stream supplier 132 */ 133 134 public void addArchiveEntry(final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier source) { 135 submitStreamAwareCallable(createCallable(zipArchiveEntry, source)); 136 } 137 138 /** 139 * Adds an archive entry to this archive. 140 * <p> 141 * This method is expected to be called from a single client thread 142 * </p> 143 * 144 * @param zipArchiveEntryRequestSupplier Should supply the entry to be added. 145 * @since 1.13 146 */ 147 public void addArchiveEntry(final ZipArchiveEntryRequestSupplier zipArchiveEntryRequestSupplier) { 148 submitStreamAwareCallable(createCallable(zipArchiveEntryRequestSupplier)); 149 } 150 151 private void closeAll() { 152 for (final ScatterZipOutputStream scatterStream : streams) { 153 try { 154 scatterStream.close(); 155 } catch (final IOException ignored) { 156 // no way to properly log this 157 } 158 } 159 } 160 161 /** 162 * Creates a callable that will compress the given archive entry. 163 * 164 * <p> 165 * This method is expected to be called from a single client thread. 166 * </p> 167 * 168 * Consider using {@link #addArchiveEntry addArchiveEntry}, which wraps this method and {@link #submitStreamAwareCallable submitStreamAwareCallable}. The 169 * most common use case for using {@link #createCallable createCallable} and {@link #submitStreamAwareCallable submitStreamAwareCallable} from a client is 170 * if you want to wrap the callable in something that can be prioritized by the supplied {@link ExecutorService}, for instance to process large or slow 171 * files first. Since the creation of the {@link ExecutorService} is handled by the client, all of this is up to the client. 172 * 173 * @param zipArchiveEntry The entry to add. 174 * @param source The source input stream supplier 175 * @return A callable that should subsequently be passed to #submitStreamAwareCallable, possibly in a wrapped/adapted from. The value of this callable is 176 * not used, but any exceptions happening inside the compression will be propagated through the callable. 177 */ 178 179 public final Callable<ScatterZipOutputStream> createCallable(final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier source) { 180 final int method = zipArchiveEntry.getMethod(); 181 if (method == ZipMethod.UNKNOWN_CODE) { 182 throw new IllegalArgumentException("Method must be set on zipArchiveEntry: " + zipArchiveEntry); 183 } 184 final ZipArchiveEntryRequest zipArchiveEntryRequest = createZipArchiveEntryRequest(zipArchiveEntry, source); 185 return () -> { 186 final ScatterZipOutputStream scatterStream = tlScatterStreams.get(); 187 scatterStream.addArchiveEntry(zipArchiveEntryRequest); 188 return scatterStream; 189 }; 190 } 191 192 /** 193 * Creates a callable that will compress archive entry supplied by {@link ZipArchiveEntryRequestSupplier}. 194 * 195 * <p> 196 * This method is expected to be called from a single client thread. 197 * </p> 198 * 199 * The same as {@link #createCallable(ZipArchiveEntry, InputStreamSupplier)}, but the archive entry to be added is supplied by a 200 * {@link ZipArchiveEntryRequestSupplier}. 201 * 202 * @see #createCallable(ZipArchiveEntry, InputStreamSupplier) 203 * 204 * @param zipArchiveEntryRequestSupplier Should supply the entry to be added. 205 * @return A callable that should subsequently be passed to #submitStreamAwareCallable, possibly in a wrapped/adapted from. The value of this callable is 206 * not used, but any exceptions happening inside the compression will be propagated through the callable. 207 * @since 1.13 208 */ 209 public final Callable<ScatterZipOutputStream> createCallable(final ZipArchiveEntryRequestSupplier zipArchiveEntryRequestSupplier) { 210 return () -> { 211 final ScatterZipOutputStream scatterStream = tlScatterStreams.get(); 212 scatterStream.addArchiveEntry(zipArchiveEntryRequestSupplier.get()); 213 return scatterStream; 214 }; 215 } 216 217 @SuppressWarnings("resource") // Caller closes 218 private ScatterZipOutputStream createDeferred(final ScatterGatherBackingStoreSupplier scatterGatherBackingStoreSupplier) throws IOException { 219 final ScatterGatherBackingStore bs = scatterGatherBackingStoreSupplier.get(); 220 // lifecycle is bound to the ScatterZipOutputStream returned 221 final StreamCompressor sc = StreamCompressor.create(compressionLevel, bs); // NOSONAR 222 return new ScatterZipOutputStream(bs, sc); 223 } 224 225 /** 226 * Gets a message describing the overall statistics of the compression run 227 * 228 * @return A string 229 */ 230 public ScatterStatistics getStatisticsMessage() { 231 return new ScatterStatistics(compressionDoneAt - startedAt, scatterDoneAt - compressionDoneAt); 232 } 233 234 /** 235 * Submits a callable for compression. 236 * 237 * @see ParallelScatterZipCreator#createCallable for details of if/when to use this. 238 * 239 * @param callable The callable to run, created by {@link #createCallable createCallable}, possibly wrapped by caller. 240 */ 241 public final void submit(final Callable<? extends Object> callable) { 242 submitStreamAwareCallable(() -> { 243 callable.call(); 244 return tlScatterStreams.get(); 245 }); 246 } 247 248 /** 249 * Submits a callable for compression. 250 * 251 * @see ParallelScatterZipCreator#createCallable for details of if/when to use this. 252 * 253 * @param callable The callable to run, created by {@link #createCallable createCallable}, possibly wrapped by caller. 254 * @since 1.19 255 */ 256 public final void submitStreamAwareCallable(final Callable<? extends ScatterZipOutputStream> callable) { 257 futures.add(executorService.submit(callable)); 258 } 259 260 /** 261 * Writes the contents this to the target {@link ZipArchiveOutputStream}. 262 * <p> 263 * It may be beneficial to write things like directories and manifest files to the targetStream before calling this method. 264 * </p> 265 * <p> 266 * Calling this method will shut down the {@link ExecutorService} used by this class. If any of the {@link Callable}s {@link #submitStreamAwareCallable 267 * submit}ted to this instance throws an exception, the archive can not be created properly and this method will throw an exception. 268 * </p> 269 * 270 * @param targetStream The {@link ZipArchiveOutputStream} to receive the contents of the scatter streams 271 * @throws IOException If writing fails 272 * @throws InterruptedException If we get interrupted 273 * @throws ExecutionException If something happens in the parallel execution 274 */ 275 public void writeTo(final ZipArchiveOutputStream targetStream) throws IOException, InterruptedException, ExecutionException { 276 277 try { 278 // Make sure we catch any exceptions from parallel phase 279 try { 280 for (final Future<?> future : futures) { 281 future.get(); 282 } 283 } finally { 284 executorService.shutdown(); 285 } 286 287 executorService.awaitTermination(1000 * 60L, TimeUnit.SECONDS); // == Infinity. We really *must* wait for this to complete 288 289 // It is important that all threads terminate before we go on, ensure happens-before relationship 290 compressionDoneAt = System.currentTimeMillis(); 291 292 for (final Future<? extends ScatterZipOutputStream> future : futures) { 293 final ScatterZipOutputStream scatterStream = future.get(); 294 scatterStream.zipEntryWriter().writeNextZipEntry(targetStream); 295 } 296 297 for (final ScatterZipOutputStream scatterStream : streams) { 298 scatterStream.close(); 299 } 300 301 scatterDoneAt = System.currentTimeMillis(); 302 } finally { 303 closeAll(); 304 } 305 } 306}