001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * https://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.commons.compress.archivers.zip; 020 021import static org.apache.commons.compress.archivers.zip.ZipArchiveEntryRequest.createZipArchiveEntryRequest; 022 023import java.io.IOException; 024import java.io.UncheckedIOException; 025import java.util.Deque; 026import java.util.concurrent.Callable; 027import java.util.concurrent.ConcurrentLinkedDeque; 028import java.util.concurrent.ExecutionException; 029import java.util.concurrent.ExecutorService; 030import java.util.concurrent.Executors; 031import java.util.concurrent.Future; 032import java.util.concurrent.TimeUnit; 033import java.util.zip.Deflater; 034 035import org.apache.commons.compress.parallel.InputStreamSupplier; 036import org.apache.commons.compress.parallel.ScatterGatherBackingStore; 037import org.apache.commons.compress.parallel.ScatterGatherBackingStoreSupplier; 038import org.apache.commons.io.IOUtils; 039 040/** 041 * Creates a ZIP in parallel by using multiple threadlocal {@link ScatterZipOutputStream} instances. 042 * <p> 043 * Note that until 1.18, this class generally made no guarantees about the order of things written to the output file. Things that needed to come in a specific 044 * order (manifests, directories) had to be handled by the client of this class, usually by writing these things to the {@link ZipArchiveOutputStream} 045 * <em>before</em> calling {@link #writeTo writeTo} on this class. 046 * </p> 047 * <p> 048 * The client can supply an {@link java.util.concurrent.ExecutorService}, but for reasons of memory model consistency, this will be shut down by this class 049 * prior to completion. 050 * </p> 051 * 052 * @since 1.10 053 */ 054public class ParallelScatterZipCreator { 055 056 private final Deque<ScatterZipOutputStream> streams = new ConcurrentLinkedDeque<>(); 057 private final ExecutorService executorService; 058 private final ScatterGatherBackingStoreSupplier backingStoreSupplier; 059 060 private final Deque<Future<? extends ScatterZipOutputStream>> futures = new ConcurrentLinkedDeque<>(); 061 private final long startedAt = System.currentTimeMillis(); 062 private long compressionDoneAt; 063 private long scatterDoneAt; 064 065 private final int compressionLevel; 066 067 private final ThreadLocal<ScatterZipOutputStream> tlScatterStreams = new ThreadLocal<ScatterZipOutputStream>() { 068 @Override 069 protected ScatterZipOutputStream initialValue() { 070 try { 071 final ScatterZipOutputStream scatterStream = createDeferred(backingStoreSupplier); 072 streams.add(scatterStream); 073 return scatterStream; 074 } catch (final IOException e) { 075 throw new UncheckedIOException(e); // NOSONAR 076 } 077 } 078 }; 079 080 /** 081 * Constructs a ParallelScatterZipCreator with default threads, which is set to the number of available processors, as defined by 082 * {@link Runtime#availableProcessors} 083 */ 084 public ParallelScatterZipCreator() { 085 this(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors())); 086 } 087 088 /** 089 * Constructs a ParallelScatterZipCreator 090 * 091 * @param executorService The executorService to use for parallel scheduling. For technical reasons, this will be shut down by this class. 092 */ 093 public ParallelScatterZipCreator(final ExecutorService executorService) { 094 this(executorService, new DefaultBackingStoreSupplier(null)); 095 } 096 097 /** 098 * Constructs a ParallelScatterZipCreator 099 * 100 * @param executorService The executorService to use. For technical reasons, this will be shut down by this class. 101 * @param backingStoreSupplier The supplier of backing store which shall be used 102 */ 103 public ParallelScatterZipCreator(final ExecutorService executorService, final ScatterGatherBackingStoreSupplier backingStoreSupplier) { 104 this(executorService, backingStoreSupplier, Deflater.DEFAULT_COMPRESSION); 105 } 106 107 /** 108 * Constructs a ParallelScatterZipCreator 109 * 110 * @param executorService The executorService to use. For technical reasons, this will be shut down by this class. 111 * @param backingStoreSupplier The supplier of backing store which shall be used 112 * @param compressionLevel The compression level used in compression, this value should be -1(default level) or between 0~9. 113 * @throws IllegalArgumentException if the compression level is illegal 114 * @since 1.21 115 */ 116 public ParallelScatterZipCreator(final ExecutorService executorService, final ScatterGatherBackingStoreSupplier backingStoreSupplier, 117 final int compressionLevel) throws IllegalArgumentException { 118 if ((compressionLevel < Deflater.NO_COMPRESSION || compressionLevel > Deflater.BEST_COMPRESSION) && compressionLevel != Deflater.DEFAULT_COMPRESSION) { 119 throw new IllegalArgumentException("Compression level is expected between -1~9"); 120 } 121 122 this.backingStoreSupplier = backingStoreSupplier; 123 this.executorService = executorService; 124 this.compressionLevel = compressionLevel; 125 } 126 127 /** 128 * Adds an archive entry to this archive. 129 * <p> 130 * This method is expected to be called from a single client thread 131 * </p> 132 * 133 * @param zipArchiveEntry The entry to add. 134 * @param source The source input stream supplier 135 */ 136 137 public void addArchiveEntry(final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier source) { 138 submitStreamAwareCallable(createCallable(zipArchiveEntry, source)); 139 } 140 141 /** 142 * Adds an archive entry to this archive. 143 * <p> 144 * This method is expected to be called from a single client thread 145 * </p> 146 * 147 * @param zipArchiveEntryRequestSupplier Should supply the entry to be added. 148 * @since 1.13 149 */ 150 public void addArchiveEntry(final ZipArchiveEntryRequestSupplier zipArchiveEntryRequestSupplier) { 151 submitStreamAwareCallable(createCallable(zipArchiveEntryRequestSupplier)); 152 } 153 154 private void closeAll() { 155 for (final ScatterZipOutputStream scatterStream : streams) { 156 IOUtils.closeQuietly(scatterStream); 157 } 158 } 159 160 /** 161 * Creates a callable that will compress the given archive entry. 162 * 163 * <p> 164 * This method is expected to be called from a single client thread. 165 * </p> 166 * 167 * Consider using {@link #addArchiveEntry addArchiveEntry}, which wraps this method and {@link #submitStreamAwareCallable submitStreamAwareCallable}. The 168 * most common use case for using {@link #createCallable createCallable} and {@link #submitStreamAwareCallable submitStreamAwareCallable} from a client is 169 * if you want to wrap the callable in something that can be prioritized by the supplied {@link ExecutorService}, for instance to process large or slow 170 * files first. Since the creation of the {@link ExecutorService} is handled by the client, all of this is up to the client. 171 * 172 * @param zipArchiveEntry The entry to add. 173 * @param source The source input stream supplier 174 * @return A callable that should subsequently be passed to #submitStreamAwareCallable, possibly in a wrapped/adapted from. The value of this callable is 175 * not used, but any exceptions happening inside the compression will be propagated through the callable. 176 */ 177 178 public final Callable<ScatterZipOutputStream> createCallable(final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier source) { 179 final int method = zipArchiveEntry.getMethod(); 180 if (method == ZipMethod.UNKNOWN_CODE) { 181 throw new IllegalArgumentException("Method must be set on zipArchiveEntry: " + zipArchiveEntry); 182 } 183 final ZipArchiveEntryRequest zipArchiveEntryRequest = createZipArchiveEntryRequest(zipArchiveEntry, source); 184 return () -> { 185 final ScatterZipOutputStream scatterStream = tlScatterStreams.get(); 186 scatterStream.addArchiveEntry(zipArchiveEntryRequest); 187 return scatterStream; 188 }; 189 } 190 191 /** 192 * Creates a callable that will compress archive entry supplied by {@link ZipArchiveEntryRequestSupplier}. 193 * 194 * <p> 195 * This method is expected to be called from a single client thread. 196 * </p> 197 * 198 * The same as {@link #createCallable(ZipArchiveEntry, InputStreamSupplier)}, but the archive entry to be added is supplied by a 199 * {@link ZipArchiveEntryRequestSupplier}. 200 * 201 * @see #createCallable(ZipArchiveEntry, InputStreamSupplier) 202 * @param zipArchiveEntryRequestSupplier Should supply the entry to be added. 203 * @return A callable that should subsequently be passed to #submitStreamAwareCallable, possibly in a wrapped/adapted from. The value of this callable is 204 * not used, but any exceptions happening inside the compression will be propagated through the callable. 205 * @since 1.13 206 */ 207 public final Callable<ScatterZipOutputStream> createCallable(final ZipArchiveEntryRequestSupplier zipArchiveEntryRequestSupplier) { 208 return () -> { 209 final ScatterZipOutputStream scatterStream = tlScatterStreams.get(); 210 scatterStream.addArchiveEntry(zipArchiveEntryRequestSupplier.get()); 211 return scatterStream; 212 }; 213 } 214 215 @SuppressWarnings("resource") // Caller closes 216 private ScatterZipOutputStream createDeferred(final ScatterGatherBackingStoreSupplier scatterGatherBackingStoreSupplier) throws IOException { 217 final ScatterGatherBackingStore bs = scatterGatherBackingStoreSupplier.get(); 218 // lifecycle is bound to the ScatterZipOutputStream returned 219 final StreamCompressor sc = StreamCompressor.create(compressionLevel, bs); // NOSONAR 220 return new ScatterZipOutputStream(bs, sc); 221 } 222 223 /** 224 * Gets a message describing the overall statistics of the compression run 225 * 226 * @return A string 227 */ 228 public ScatterStatistics getStatisticsMessage() { 229 return new ScatterStatistics(compressionDoneAt - startedAt, scatterDoneAt - compressionDoneAt); 230 } 231 232 /** 233 * Submits a callable for compression. 234 * 235 * @see ParallelScatterZipCreator#createCallable for details of if/when to use this. 236 * @param callable The callable to run, created by {@link #createCallable createCallable}, possibly wrapped by caller. 237 */ 238 public final void submit(final Callable<? extends Object> callable) { 239 submitStreamAwareCallable(() -> { 240 callable.call(); 241 return tlScatterStreams.get(); 242 }); 243 } 244 245 /** 246 * Submits a callable for compression. 247 * 248 * @see ParallelScatterZipCreator#createCallable for details of if/when to use this. 249 * @param callable The callable to run, created by {@link #createCallable createCallable}, possibly wrapped by caller. 250 * @since 1.19 251 */ 252 public final void submitStreamAwareCallable(final Callable<? extends ScatterZipOutputStream> callable) { 253 futures.add(executorService.submit(callable)); 254 } 255 256 /** 257 * Writes the contents this to the target {@link ZipArchiveOutputStream}. 258 * <p> 259 * It may be beneficial to write things like directories and manifest files to the targetStream before calling this method. 260 * </p> 261 * <p> 262 * Calling this method will shut down the {@link ExecutorService} used by this class. If any of the {@link Callable}s {@link #submitStreamAwareCallable 263 * submit}ted to this instance throws an exception, the archive cannot be created properly and this method will throw an exception. 264 * </p> 265 * 266 * @param targetStream The {@link ZipArchiveOutputStream} to receive the contents of the scatter streams 267 * @throws IOException If writing fails 268 * @throws InterruptedException If we get interrupted 269 * @throws ExecutionException If something happens in the parallel execution 270 */ 271 public void writeTo(final ZipArchiveOutputStream targetStream) throws IOException, InterruptedException, ExecutionException { 272 273 try { 274 // Make sure we catch any exceptions from parallel phase 275 try { 276 for (final Future<?> future : futures) { 277 future.get(); 278 } 279 } finally { 280 executorService.shutdown(); 281 } 282 283 executorService.awaitTermination(1000 * 60L, TimeUnit.SECONDS); // == Infinity. We really *must* wait for this to complete 284 285 // It is important that all threads terminate before we go on, ensure happens-before relationship 286 compressionDoneAt = System.currentTimeMillis(); 287 288 for (final Future<? extends ScatterZipOutputStream> future : futures) { 289 final ScatterZipOutputStream scatterStream = future.get(); 290 scatterStream.zipEntryWriter().writeNextZipEntry(targetStream); 291 } 292 293 for (final ScatterZipOutputStream scatterStream : streams) { 294 scatterStream.close(); 295 } 296 297 scatterDoneAt = System.currentTimeMillis(); 298 } finally { 299 closeAll(); 300 } 301 } 302}