001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.commons.compress.archivers.zip; 018 019import java.io.Closeable; 020import java.io.File; 021import java.io.FileNotFoundException; 022import java.io.IOException; 023import java.io.InputStream; 024import java.nio.file.Path; 025import java.util.Iterator; 026import java.util.Queue; 027import java.util.concurrent.ConcurrentLinkedQueue; 028import java.util.concurrent.atomic.AtomicBoolean; 029import java.util.zip.Deflater; 030 031import org.apache.commons.compress.parallel.FileBasedScatterGatherBackingStore; 032import org.apache.commons.compress.parallel.ScatterGatherBackingStore; 033import org.apache.commons.io.input.BoundedInputStream; 034 035/** 036 * A ZIP output stream that is optimized for multi-threaded scatter/gather construction of ZIP files. 037 * <p> 038 * The internal data format of the entries used by this class are entirely private to this class and are not part of any public api whatsoever. 039 * </p> 040 * <p> 041 * It is possible to extend this class to support different kinds of backing storage, the default implementation only supports file-based backing. 042 * </p> 043 * <p> 044 * Thread safety: This class supports multiple threads. But the "writeTo" method must be called by the thread that originally created the 045 * {@link ZipArchiveEntry}. 046 * </p> 047 * 048 * @since 1.10 049 */ 050public class ScatterZipOutputStream implements Closeable { 051 052 private static final class CompressedEntry { 053 final ZipArchiveEntryRequest zipArchiveEntryRequest; 054 final long crc; 055 final long compressedSize; 056 final long size; 057 058 CompressedEntry(final ZipArchiveEntryRequest zipArchiveEntryRequest, final long crc, final long compressedSize, final long size) { 059 this.zipArchiveEntryRequest = zipArchiveEntryRequest; 060 this.crc = crc; 061 this.compressedSize = compressedSize; 062 this.size = size; 063 } 064 065 /** 066 * Updates the original {@link ZipArchiveEntry} with sizes/crc. Do not use this method from threads that did not create the instance itself! 067 * 068 * @return the zipArchiveEntry that is the basis for this request. 069 */ 070 071 public ZipArchiveEntry transferToArchiveEntry() { 072 final ZipArchiveEntry entry = zipArchiveEntryRequest.getZipArchiveEntry(); 073 entry.setCompressedSize(compressedSize); 074 entry.setSize(size); 075 entry.setCrc(crc); 076 entry.setMethod(zipArchiveEntryRequest.getMethod()); 077 return entry; 078 } 079 } 080 081 public static class ZipEntryWriter implements Closeable { 082 private final Iterator<CompressedEntry> itemsIterator; 083 private final InputStream itemsIteratorData; 084 085 public ZipEntryWriter(final ScatterZipOutputStream scatter) throws IOException { 086 scatter.backingStore.closeForWriting(); 087 itemsIterator = scatter.items.iterator(); 088 itemsIteratorData = scatter.backingStore.getInputStream(); 089 } 090 091 @Override 092 public void close() throws IOException { 093 if (itemsIteratorData != null) { 094 itemsIteratorData.close(); 095 } 096 } 097 098 public void writeNextZipEntry(final ZipArchiveOutputStream target) throws IOException { 099 final CompressedEntry compressedEntry = itemsIterator.next(); 100 // @formatter:off 101 try (BoundedInputStream rawStream = BoundedInputStream.builder() 102 .setInputStream(itemsIteratorData) 103 .setMaxCount(compressedEntry.compressedSize) 104 .setPropagateClose(false) 105 .get()) { 106 target.addRawArchiveEntry(compressedEntry.transferToArchiveEntry(), rawStream); 107 } 108 // @formatter:on 109 } 110 } 111 112 /** 113 * Creates a {@link ScatterZipOutputStream} with default compression level that is backed by a file 114 * 115 * @param file The file to offload compressed data into. 116 * @return A ScatterZipOutputStream that is ready for use. 117 * @throws FileNotFoundException if the file cannot be found 118 */ 119 public static ScatterZipOutputStream fileBased(final File file) throws FileNotFoundException { 120 return pathBased(file.toPath(), Deflater.DEFAULT_COMPRESSION); 121 } 122 123 /** 124 * Creates a {@link ScatterZipOutputStream} that is backed by a file 125 * 126 * @param file The file to offload compressed data into. 127 * @param compressionLevel The compression level to use, @see #Deflater 128 * @return A ScatterZipOutputStream that is ready for use. 129 * @throws FileNotFoundException if the file cannot be found 130 */ 131 public static ScatterZipOutputStream fileBased(final File file, final int compressionLevel) throws FileNotFoundException { 132 return pathBased(file.toPath(), compressionLevel); 133 } 134 135 /** 136 * Creates a {@link ScatterZipOutputStream} with default compression level that is backed by a file 137 * 138 * @param path The path to offload compressed data into. 139 * @return A ScatterZipOutputStream that is ready for use. 140 * @throws FileNotFoundException if the path cannot be found 141 * @since 1.22 142 */ 143 public static ScatterZipOutputStream pathBased(final Path path) throws FileNotFoundException { 144 return pathBased(path, Deflater.DEFAULT_COMPRESSION); 145 } 146 147 /** 148 * Creates a {@link ScatterZipOutputStream} that is backed by a file 149 * 150 * @param path The path to offload compressed data into. 151 * @param compressionLevel The compression level to use, @see #Deflater 152 * @return A ScatterZipOutputStream that is ready for use. 153 * @throws FileNotFoundException if the path cannot be found 154 * @since 1.22 155 */ 156 public static ScatterZipOutputStream pathBased(final Path path, final int compressionLevel) throws FileNotFoundException { 157 final ScatterGatherBackingStore bs = new FileBasedScatterGatherBackingStore(path); 158 // lifecycle is bound to the ScatterZipOutputStream returned 159 final StreamCompressor sc = StreamCompressor.create(compressionLevel, bs); // NOSONAR 160 return new ScatterZipOutputStream(bs, sc); 161 } 162 163 private final Queue<CompressedEntry> items = new ConcurrentLinkedQueue<>(); 164 165 private final ScatterGatherBackingStore backingStore; 166 167 private final StreamCompressor streamCompressor; 168 169 private final AtomicBoolean isClosed = new AtomicBoolean(); 170 171 private ZipEntryWriter zipEntryWriter; 172 173 public ScatterZipOutputStream(final ScatterGatherBackingStore backingStore, final StreamCompressor streamCompressor) { 174 this.backingStore = backingStore; 175 this.streamCompressor = streamCompressor; 176 } 177 178 /** 179 * Adds an archive entry to this scatter stream. 180 * 181 * @param zipArchiveEntryRequest The entry to write. 182 * @throws IOException If writing fails 183 */ 184 public void addArchiveEntry(final ZipArchiveEntryRequest zipArchiveEntryRequest) throws IOException { 185 try (InputStream payloadStream = zipArchiveEntryRequest.getPayloadStream()) { 186 streamCompressor.deflate(payloadStream, zipArchiveEntryRequest.getMethod()); 187 } 188 items.add(new CompressedEntry(zipArchiveEntryRequest, streamCompressor.getCrc32(), streamCompressor.getBytesWrittenForLastEntry(), 189 streamCompressor.getBytesRead())); 190 } 191 192 /** 193 * Closes this stream, freeing all resources involved in the creation of this stream. 194 * 195 * @throws IOException If closing fails 196 */ 197 @Override 198 public void close() throws IOException { 199 if (!isClosed.compareAndSet(false, true)) { 200 return; 201 } 202 try { 203 if (zipEntryWriter != null) { 204 zipEntryWriter.close(); 205 } 206 backingStore.close(); 207 } finally { 208 streamCompressor.close(); 209 } 210 } 211 212 /** 213 * Writes the contents of this scatter stream to a target archive. 214 * 215 * @param target The archive to receive the contents of this {@link ScatterZipOutputStream}. 216 * @throws IOException If writing fails 217 * @see #zipEntryWriter() 218 */ 219 public void writeTo(final ZipArchiveOutputStream target) throws IOException { 220 backingStore.closeForWriting(); 221 try (InputStream data = backingStore.getInputStream()) { 222 for (final CompressedEntry compressedEntry : items) { 223 // @formatter:off 224 try (BoundedInputStream rawStream = BoundedInputStream.builder() 225 .setInputStream(data) 226 .setMaxCount(compressedEntry.compressedSize) 227 .setPropagateClose(false) 228 .get()) { 229 target.addRawArchiveEntry(compressedEntry.transferToArchiveEntry(), rawStream); 230 } 231 // @formatter:on 232 } 233 } 234 } 235 236 /** 237 * Gets a ZIP entry writer for this scatter stream. 238 * 239 * @throws IOException If getting scatter stream input stream 240 * @return the ZipEntryWriter created on first call of the method 241 */ 242 public ZipEntryWriter zipEntryWriter() throws IOException { 243 if (zipEntryWriter == null) { 244 zipEntryWriter = new ZipEntryWriter(this); 245 } 246 return zipEntryWriter; 247 } 248}