001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.commons.compress.archivers.zip; 018 019import java.io.Closeable; 020import java.io.File; 021import java.io.FileNotFoundException; 022import java.io.IOException; 023import java.io.InputStream; 024import java.nio.file.Path; 025import java.util.Iterator; 026import java.util.Queue; 027import java.util.concurrent.ConcurrentLinkedQueue; 028import java.util.concurrent.atomic.AtomicBoolean; 029import java.util.zip.Deflater; 030 031import org.apache.commons.compress.parallel.FileBasedScatterGatherBackingStore; 032import org.apache.commons.compress.parallel.ScatterGatherBackingStore; 033import org.apache.commons.compress.utils.BoundedInputStream; 034 035/** 036 * A ZIP output stream that is optimized for multi-threaded scatter/gather construction of ZIP files. 037 * <p> 038 * The internal data format of the entries used by this class are entirely private to this class and are not part of any public api whatsoever. 039 * </p> 040 * <p> 041 * It is possible to extend this class to support different kinds of backing storage, the default implementation only supports file-based backing. 042 * </p> 043 * <p> 044 * Thread safety: This class supports multiple threads. But the "writeTo" method must be called by the thread that originally created the 045 * {@link ZipArchiveEntry}. 046 * </p> 047 * 048 * @since 1.10 049 */ 050public class ScatterZipOutputStream implements Closeable { 051 052 private static final class CompressedEntry { 053 final ZipArchiveEntryRequest zipArchiveEntryRequest; 054 final long crc; 055 final long compressedSize; 056 final long size; 057 058 CompressedEntry(final ZipArchiveEntryRequest zipArchiveEntryRequest, final long crc, final long compressedSize, final long size) { 059 this.zipArchiveEntryRequest = zipArchiveEntryRequest; 060 this.crc = crc; 061 this.compressedSize = compressedSize; 062 this.size = size; 063 } 064 065 /** 066 * Updates the original {@link ZipArchiveEntry} with sizes/crc. Do not use this method from threads that did not create the instance itself! 067 * 068 * @return the zipArchiveEntry that is the basis for this request. 069 */ 070 071 public ZipArchiveEntry transferToArchiveEntry() { 072 final ZipArchiveEntry entry = zipArchiveEntryRequest.getZipArchiveEntry(); 073 entry.setCompressedSize(compressedSize); 074 entry.setSize(size); 075 entry.setCrc(crc); 076 entry.setMethod(zipArchiveEntryRequest.getMethod()); 077 return entry; 078 } 079 } 080 081 public static class ZipEntryWriter implements Closeable { 082 private final Iterator<CompressedEntry> itemsIterator; 083 private final InputStream itemsIteratorData; 084 085 public ZipEntryWriter(final ScatterZipOutputStream scatter) throws IOException { 086 scatter.backingStore.closeForWriting(); 087 itemsIterator = scatter.items.iterator(); 088 itemsIteratorData = scatter.backingStore.getInputStream(); 089 } 090 091 @Override 092 public void close() throws IOException { 093 if (itemsIteratorData != null) { 094 itemsIteratorData.close(); 095 } 096 } 097 098 public void writeNextZipEntry(final ZipArchiveOutputStream target) throws IOException { 099 final CompressedEntry compressedEntry = itemsIterator.next(); 100 try (BoundedInputStream rawStream = new BoundedInputStream(itemsIteratorData, compressedEntry.compressedSize)) { 101 target.addRawArchiveEntry(compressedEntry.transferToArchiveEntry(), rawStream); 102 } 103 } 104 } 105 106 /** 107 * Creates a {@link ScatterZipOutputStream} with default compression level that is backed by a file 108 * 109 * @param file The file to offload compressed data into. 110 * @return A ScatterZipOutputStream that is ready for use. 111 * @throws FileNotFoundException if the file cannot be found 112 */ 113 public static ScatterZipOutputStream fileBased(final File file) throws FileNotFoundException { 114 return pathBased(file.toPath(), Deflater.DEFAULT_COMPRESSION); 115 } 116 117 /** 118 * Creates a {@link ScatterZipOutputStream} that is backed by a file 119 * 120 * @param file The file to offload compressed data into. 121 * @param compressionLevel The compression level to use, @see #Deflater 122 * @return A ScatterZipOutputStream that is ready for use. 123 * @throws FileNotFoundException if the file cannot be found 124 */ 125 public static ScatterZipOutputStream fileBased(final File file, final int compressionLevel) throws FileNotFoundException { 126 return pathBased(file.toPath(), compressionLevel); 127 } 128 129 /** 130 * Creates a {@link ScatterZipOutputStream} with default compression level that is backed by a file 131 * 132 * @param path The path to offload compressed data into. 133 * @return A ScatterZipOutputStream that is ready for use. 134 * @throws FileNotFoundException if the path cannot be found 135 * @since 1.22 136 */ 137 public static ScatterZipOutputStream pathBased(final Path path) throws FileNotFoundException { 138 return pathBased(path, Deflater.DEFAULT_COMPRESSION); 139 } 140 141 /** 142 * Creates a {@link ScatterZipOutputStream} that is backed by a file 143 * 144 * @param path The path to offload compressed data into. 145 * @param compressionLevel The compression level to use, @see #Deflater 146 * @return A ScatterZipOutputStream that is ready for use. 147 * @throws FileNotFoundException if the path cannot be found 148 * @since 1.22 149 */ 150 public static ScatterZipOutputStream pathBased(final Path path, final int compressionLevel) throws FileNotFoundException { 151 final ScatterGatherBackingStore bs = new FileBasedScatterGatherBackingStore(path); 152 // lifecycle is bound to the ScatterZipOutputStream returned 153 final StreamCompressor sc = StreamCompressor.create(compressionLevel, bs); // NOSONAR 154 return new ScatterZipOutputStream(bs, sc); 155 } 156 157 private final Queue<CompressedEntry> items = new ConcurrentLinkedQueue<>(); 158 159 private final ScatterGatherBackingStore backingStore; 160 161 private final StreamCompressor streamCompressor; 162 163 private final AtomicBoolean isClosed = new AtomicBoolean(); 164 165 private ZipEntryWriter zipEntryWriter; 166 167 public ScatterZipOutputStream(final ScatterGatherBackingStore backingStore, final StreamCompressor streamCompressor) { 168 this.backingStore = backingStore; 169 this.streamCompressor = streamCompressor; 170 } 171 172 /** 173 * Adds an archive entry to this scatter stream. 174 * 175 * @param zipArchiveEntryRequest The entry to write. 176 * @throws IOException If writing fails 177 */ 178 public void addArchiveEntry(final ZipArchiveEntryRequest zipArchiveEntryRequest) throws IOException { 179 try (InputStream payloadStream = zipArchiveEntryRequest.getPayloadStream()) { 180 streamCompressor.deflate(payloadStream, zipArchiveEntryRequest.getMethod()); 181 } 182 items.add(new CompressedEntry(zipArchiveEntryRequest, streamCompressor.getCrc32(), streamCompressor.getBytesWrittenForLastEntry(), 183 streamCompressor.getBytesRead())); 184 } 185 186 /** 187 * Closes this stream, freeing all resources involved in the creation of this stream. 188 * 189 * @throws IOException If closing fails 190 */ 191 @Override 192 public void close() throws IOException { 193 if (!isClosed.compareAndSet(false, true)) { 194 return; 195 } 196 try { 197 if (zipEntryWriter != null) { 198 zipEntryWriter.close(); 199 } 200 backingStore.close(); 201 } finally { 202 streamCompressor.close(); 203 } 204 } 205 206 /** 207 * Writes the contents of this scatter stream to a target archive. 208 * 209 * @param target The archive to receive the contents of this {@link ScatterZipOutputStream}. 210 * @throws IOException If writing fails 211 * @see #zipEntryWriter() 212 */ 213 public void writeTo(final ZipArchiveOutputStream target) throws IOException { 214 backingStore.closeForWriting(); 215 try (InputStream data = backingStore.getInputStream()) { 216 for (final CompressedEntry compressedEntry : items) { 217 try (BoundedInputStream rawStream = new BoundedInputStream(data, compressedEntry.compressedSize)) { 218 target.addRawArchiveEntry(compressedEntry.transferToArchiveEntry(), rawStream); 219 } 220 } 221 } 222 } 223 224 /** 225 * Gets a ZIP entry writer for this scatter stream. 226 * 227 * @throws IOException If getting scatter stream input stream 228 * @return the ZipEntryWriter created on first call of the method 229 */ 230 public ZipEntryWriter zipEntryWriter() throws IOException { 231 if (zipEntryWriter == null) { 232 zipEntryWriter = new ZipEntryWriter(this); 233 } 234 return zipEntryWriter; 235 } 236}