001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * https://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.commons.compress.archivers.zip; 020 021import java.io.Closeable; 022import java.io.File; 023import java.io.FileNotFoundException; 024import java.io.IOException; 025import java.io.InputStream; 026import java.nio.file.Path; 027import java.util.Iterator; 028import java.util.Queue; 029import java.util.concurrent.ConcurrentLinkedQueue; 030import java.util.concurrent.atomic.AtomicBoolean; 031import java.util.zip.Deflater; 032 033import org.apache.commons.compress.parallel.FileBasedScatterGatherBackingStore; 034import org.apache.commons.compress.parallel.ScatterGatherBackingStore; 035import org.apache.commons.io.IOUtils; 036import org.apache.commons.io.input.BoundedInputStream; 037 038/** 039 * A ZIP output stream that is optimized for multi-threaded scatter/gather construction of ZIP files. 040 * <p> 041 * The internal data format of the entries used by this class are entirely private to this class and are not part of any public api whatsoever. 042 * </p> 043 * <p> 044 * It is possible to extend this class to support different kinds of backing storage, the default implementation only supports file-based backing. 045 * </p> 046 * <p> 047 * Thread safety: This class supports multiple threads. But the "writeTo" method must be called by the thread that originally created the 048 * {@link ZipArchiveEntry}. 049 * </p> 050 * 051 * @since 1.10 052 */ 053public class ScatterZipOutputStream implements Closeable { 054 055 private static final class CompressedEntry { 056 final ZipArchiveEntryRequest zipArchiveEntryRequest; 057 final long crc; 058 final long compressedSize; 059 final long size; 060 061 CompressedEntry(final ZipArchiveEntryRequest zipArchiveEntryRequest, final long crc, final long compressedSize, final long size) { 062 this.zipArchiveEntryRequest = zipArchiveEntryRequest; 063 this.crc = crc; 064 this.compressedSize = compressedSize; 065 this.size = size; 066 } 067 068 /** 069 * Updates the original {@link ZipArchiveEntry} with sizes/CRC. Do not use this method from threads that did not create the instance itself! 070 * 071 * @return the zipArchiveEntry that is the basis for this request. 072 */ 073 public ZipArchiveEntry transferToArchiveEntry() { 074 final ZipArchiveEntry entry = zipArchiveEntryRequest.getZipArchiveEntry(); 075 entry.setCompressedSize(compressedSize); 076 entry.setSize(size); 077 entry.setCrc(crc); 078 entry.setMethod(zipArchiveEntryRequest.getMethod()); 079 return entry; 080 } 081 } 082 083 /** 084 * Writes ZIP entries to a ZIP archive. 085 */ 086 public static class ZipEntryWriter implements Closeable { 087 private final Iterator<CompressedEntry> itemsIterator; 088 private final InputStream inputStream; 089 090 /** 091 * Constructs a new instance. 092 * 093 * @param out a ScatterZipOutputStream. 094 * @throws IOException if an I/O error occurs. 095 */ 096 public ZipEntryWriter(final ScatterZipOutputStream out) throws IOException { 097 out.backingStore.closeForWriting(); 098 itemsIterator = out.items.iterator(); 099 inputStream = out.backingStore.getInputStream(); 100 } 101 102 @Override 103 public void close() throws IOException { 104 IOUtils.close(inputStream); 105 } 106 107 /** 108 * Writes the next ZIP entry to the given target. 109 * 110 * @param target Where to write. 111 * @throws IOException if an I/O error occurs. 112 */ 113 public void writeNextZipEntry(final ZipArchiveOutputStream target) throws IOException { 114 final CompressedEntry compressedEntry = itemsIterator.next(); 115 // @formatter:off 116 try (BoundedInputStream rawStream = BoundedInputStream.builder() 117 .setInputStream(inputStream) 118 .setMaxCount(compressedEntry.compressedSize) 119 .setPropagateClose(false) 120 .get()) { 121 target.addRawArchiveEntry(compressedEntry.transferToArchiveEntry(), rawStream); 122 } 123 // @formatter:on 124 } 125 } 126 127 /** 128 * Creates a {@link ScatterZipOutputStream} with default compression level that is backed by a file 129 * 130 * @param file The file to offload compressed data into. 131 * @return A ScatterZipOutputStream that is ready for use. 132 * @throws FileNotFoundException if the file cannot be found 133 */ 134 public static ScatterZipOutputStream fileBased(final File file) throws FileNotFoundException { 135 return pathBased(file.toPath(), Deflater.DEFAULT_COMPRESSION); 136 } 137 138 /** 139 * Creates a {@link ScatterZipOutputStream} that is backed by a file 140 * 141 * @param file The file to offload compressed data into. 142 * @param compressionLevel The compression level to use, @see #Deflater 143 * @return A ScatterZipOutputStream that is ready for use. 144 * @throws FileNotFoundException if the file cannot be found 145 */ 146 public static ScatterZipOutputStream fileBased(final File file, final int compressionLevel) throws FileNotFoundException { 147 return pathBased(file.toPath(), compressionLevel); 148 } 149 150 /** 151 * Creates a {@link ScatterZipOutputStream} with default compression level that is backed by a file 152 * 153 * @param path The path to offload compressed data into. 154 * @return A ScatterZipOutputStream that is ready for use. 155 * @throws FileNotFoundException if the path cannot be found 156 * @since 1.22 157 */ 158 public static ScatterZipOutputStream pathBased(final Path path) throws FileNotFoundException { 159 return pathBased(path, Deflater.DEFAULT_COMPRESSION); 160 } 161 162 /** 163 * Creates a {@link ScatterZipOutputStream} that is backed by a file 164 * 165 * @param path The path to offload compressed data into. 166 * @param compressionLevel The compression level to use, @see #Deflater 167 * @return A ScatterZipOutputStream that is ready for use. 168 * @throws FileNotFoundException if the path cannot be found 169 * @since 1.22 170 */ 171 public static ScatterZipOutputStream pathBased(final Path path, final int compressionLevel) throws FileNotFoundException { 172 final ScatterGatherBackingStore bs = new FileBasedScatterGatherBackingStore(path); 173 // lifecycle is bound to the ScatterZipOutputStream returned 174 final StreamCompressor sc = StreamCompressor.create(compressionLevel, bs); // NOSONAR 175 return new ScatterZipOutputStream(bs, sc); 176 } 177 178 private final Queue<CompressedEntry> items = new ConcurrentLinkedQueue<>(); 179 180 private final ScatterGatherBackingStore backingStore; 181 182 private final StreamCompressor streamCompressor; 183 184 private final AtomicBoolean isClosed = new AtomicBoolean(); 185 186 private ZipEntryWriter zipEntryWriter; 187 188 /** 189 * Constructs a new instance. 190 * 191 * @param backingStore the backing store. 192 * @param streamCompressor Deflates ZIP entries. 193 */ 194 public ScatterZipOutputStream(final ScatterGatherBackingStore backingStore, final StreamCompressor streamCompressor) { 195 this.backingStore = backingStore; 196 this.streamCompressor = streamCompressor; 197 } 198 199 /** 200 * Adds an archive entry to this scatter stream. 201 * 202 * @param zipArchiveEntryRequest The entry to write. 203 * @throws IOException If writing fails 204 */ 205 public void addArchiveEntry(final ZipArchiveEntryRequest zipArchiveEntryRequest) throws IOException { 206 try (InputStream payloadStream = zipArchiveEntryRequest.getPayloadStream()) { 207 streamCompressor.deflate(payloadStream, zipArchiveEntryRequest.getMethod()); 208 } 209 items.add(new CompressedEntry(zipArchiveEntryRequest, streamCompressor.getCrc32(), streamCompressor.getBytesWrittenForLastEntry(), 210 streamCompressor.getBytesRead())); 211 } 212 213 /** 214 * Closes this stream, freeing all resources involved in the creation of this stream. 215 * 216 * @throws IOException If closing fails 217 */ 218 @Override 219 public void close() throws IOException { 220 if (!isClosed.compareAndSet(false, true)) { 221 return; 222 } 223 try { 224 IOUtils.close(zipEntryWriter); 225 backingStore.close(); 226 } finally { 227 streamCompressor.close(); 228 } 229 } 230 231 /** 232 * Writes the contents of this scatter stream to a target archive. 233 * 234 * @param target The archive to receive the contents of this {@link ScatterZipOutputStream}. 235 * @throws IOException If writing fails 236 * @see #zipEntryWriter() 237 */ 238 public void writeTo(final ZipArchiveOutputStream target) throws IOException { 239 backingStore.closeForWriting(); 240 try (InputStream data = backingStore.getInputStream()) { 241 for (final CompressedEntry compressedEntry : items) { 242 // @formatter:off 243 try (BoundedInputStream rawStream = BoundedInputStream.builder() 244 .setInputStream(data) 245 .setMaxCount(compressedEntry.compressedSize) 246 .setPropagateClose(false) 247 .get()) { 248 target.addRawArchiveEntry(compressedEntry.transferToArchiveEntry(), rawStream); 249 } 250 // @formatter:on 251 } 252 } 253 } 254 255 /** 256 * Gets a ZIP entry writer for this scatter stream. 257 * 258 * @throws IOException If getting scatter stream input stream 259 * @return the ZipEntryWriter created on first call of the method 260 */ 261 public ZipEntryWriter zipEntryWriter() throws IOException { 262 if (zipEntryWriter == null) { 263 zipEntryWriter = new ZipEntryWriter(this); 264 } 265 return zipEntryWriter; 266 } 267}