001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   https://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.commons.compress.archivers.zip;
020
021import java.io.Closeable;
022import java.io.File;
023import java.io.FileNotFoundException;
024import java.io.IOException;
025import java.io.InputStream;
026import java.nio.file.Path;
027import java.util.Iterator;
028import java.util.Queue;
029import java.util.concurrent.ConcurrentLinkedQueue;
030import java.util.concurrent.atomic.AtomicBoolean;
031import java.util.zip.Deflater;
032
033import org.apache.commons.compress.parallel.FileBasedScatterGatherBackingStore;
034import org.apache.commons.compress.parallel.ScatterGatherBackingStore;
035import org.apache.commons.io.IOUtils;
036import org.apache.commons.io.input.BoundedInputStream;
037
038/**
039 * A ZIP output stream that is optimized for multi-threaded scatter/gather construction of ZIP files.
040 * <p>
041 * The internal data format of the entries used by this class are entirely private to this class and are not part of any public api whatsoever.
042 * </p>
043 * <p>
044 * It is possible to extend this class to support different kinds of backing storage, the default implementation only supports file-based backing.
045 * </p>
046 * <p>
047 * Thread safety: This class supports multiple threads. But the "writeTo" method must be called by the thread that originally created the
048 * {@link ZipArchiveEntry}.
049 * </p>
050 *
051 * @since 1.10
052 */
053public class ScatterZipOutputStream implements Closeable {
054
055    private static final class CompressedEntry {
056        final ZipArchiveEntryRequest zipArchiveEntryRequest;
057        final long crc;
058        final long compressedSize;
059        final long size;
060
061        CompressedEntry(final ZipArchiveEntryRequest zipArchiveEntryRequest, final long crc, final long compressedSize, final long size) {
062            this.zipArchiveEntryRequest = zipArchiveEntryRequest;
063            this.crc = crc;
064            this.compressedSize = compressedSize;
065            this.size = size;
066        }
067
068        /**
069         * Updates the original {@link ZipArchiveEntry} with sizes/CRC. Do not use this method from threads that did not create the instance itself!
070         *
071         * @return the zipArchiveEntry that is the basis for this request.
072         */
073        public ZipArchiveEntry transferToArchiveEntry() {
074            final ZipArchiveEntry entry = zipArchiveEntryRequest.getZipArchiveEntry();
075            entry.setCompressedSize(compressedSize);
076            entry.setSize(size);
077            entry.setCrc(crc);
078            entry.setMethod(zipArchiveEntryRequest.getMethod());
079            return entry;
080        }
081    }
082
083    /**
084     * Writes ZIP entries to a ZIP archive.
085     */
086    public static class ZipEntryWriter implements Closeable {
087        private final Iterator<CompressedEntry> itemsIterator;
088        private final InputStream inputStream;
089
090        /**
091         * Constructs a new instance.
092         *
093         * @param out a ScatterZipOutputStream.
094         * @throws IOException if an I/O error occurs.
095         */
096        public ZipEntryWriter(final ScatterZipOutputStream out) throws IOException {
097            out.backingStore.closeForWriting();
098            itemsIterator = out.items.iterator();
099            inputStream = out.backingStore.getInputStream();
100        }
101
102        @Override
103        public void close() throws IOException {
104            IOUtils.close(inputStream);
105        }
106
107        /**
108         * Writes the next ZIP entry to the given target.
109         *
110         * @param target Where to write.
111         * @throws IOException if an I/O error occurs.
112         */
113        public void writeNextZipEntry(final ZipArchiveOutputStream target) throws IOException {
114            final CompressedEntry compressedEntry = itemsIterator.next();
115            // @formatter:off
116            try (BoundedInputStream rawStream = BoundedInputStream.builder()
117                    .setInputStream(inputStream)
118                    .setMaxCount(compressedEntry.compressedSize)
119                    .setPropagateClose(false)
120                    .get()) {
121                target.addRawArchiveEntry(compressedEntry.transferToArchiveEntry(), rawStream);
122            }
123            // @formatter:on
124        }
125    }
126
127    /**
128     * Creates a {@link ScatterZipOutputStream} with default compression level that is backed by a file
129     *
130     * @param file The file to offload compressed data into.
131     * @return A ScatterZipOutputStream that is ready for use.
132     * @throws FileNotFoundException if the file cannot be found
133     */
134    public static ScatterZipOutputStream fileBased(final File file) throws FileNotFoundException {
135        return pathBased(file.toPath(), Deflater.DEFAULT_COMPRESSION);
136    }
137
138    /**
139     * Creates a {@link ScatterZipOutputStream} that is backed by a file
140     *
141     * @param file             The file to offload compressed data into.
142     * @param compressionLevel The compression level to use, @see #Deflater
143     * @return A ScatterZipOutputStream that is ready for use.
144     * @throws FileNotFoundException if the file cannot be found
145     */
146    public static ScatterZipOutputStream fileBased(final File file, final int compressionLevel) throws FileNotFoundException {
147        return pathBased(file.toPath(), compressionLevel);
148    }
149
150    /**
151     * Creates a {@link ScatterZipOutputStream} with default compression level that is backed by a file
152     *
153     * @param path The path to offload compressed data into.
154     * @return A ScatterZipOutputStream that is ready for use.
155     * @throws FileNotFoundException if the path cannot be found
156     * @since 1.22
157     */
158    public static ScatterZipOutputStream pathBased(final Path path) throws FileNotFoundException {
159        return pathBased(path, Deflater.DEFAULT_COMPRESSION);
160    }
161
162    /**
163     * Creates a {@link ScatterZipOutputStream} that is backed by a file
164     *
165     * @param path             The path to offload compressed data into.
166     * @param compressionLevel The compression level to use, @see #Deflater
167     * @return A ScatterZipOutputStream that is ready for use.
168     * @throws FileNotFoundException if the path cannot be found
169     * @since 1.22
170     */
171    public static ScatterZipOutputStream pathBased(final Path path, final int compressionLevel) throws FileNotFoundException {
172        final ScatterGatherBackingStore bs = new FileBasedScatterGatherBackingStore(path);
173        // lifecycle is bound to the ScatterZipOutputStream returned
174        final StreamCompressor sc = StreamCompressor.create(compressionLevel, bs); // NOSONAR
175        return new ScatterZipOutputStream(bs, sc);
176    }
177
178    private final Queue<CompressedEntry> items = new ConcurrentLinkedQueue<>();
179
180    private final ScatterGatherBackingStore backingStore;
181
182    private final StreamCompressor streamCompressor;
183
184    private final AtomicBoolean isClosed = new AtomicBoolean();
185
186    private ZipEntryWriter zipEntryWriter;
187
188    /**
189     * Constructs a new instance.
190     *
191     * @param backingStore the backing store.
192     * @param streamCompressor Deflates ZIP entries.
193     */
194    public ScatterZipOutputStream(final ScatterGatherBackingStore backingStore, final StreamCompressor streamCompressor) {
195        this.backingStore = backingStore;
196        this.streamCompressor = streamCompressor;
197    }
198
199    /**
200     * Adds an archive entry to this scatter stream.
201     *
202     * @param zipArchiveEntryRequest The entry to write.
203     * @throws IOException If writing fails
204     */
205    public void addArchiveEntry(final ZipArchiveEntryRequest zipArchiveEntryRequest) throws IOException {
206        try (InputStream payloadStream = zipArchiveEntryRequest.getPayloadStream()) {
207            streamCompressor.deflate(payloadStream, zipArchiveEntryRequest.getMethod());
208        }
209        items.add(new CompressedEntry(zipArchiveEntryRequest, streamCompressor.getCrc32(), streamCompressor.getBytesWrittenForLastEntry(),
210                streamCompressor.getBytesRead()));
211    }
212
213    /**
214     * Closes this stream, freeing all resources involved in the creation of this stream.
215     *
216     * @throws IOException If closing fails
217     */
218    @Override
219    public void close() throws IOException {
220        if (!isClosed.compareAndSet(false, true)) {
221            return;
222        }
223        try {
224            IOUtils.close(zipEntryWriter);
225            backingStore.close();
226        } finally {
227            streamCompressor.close();
228        }
229    }
230
231    /**
232     * Writes the contents of this scatter stream to a target archive.
233     *
234     * @param target The archive to receive the contents of this {@link ScatterZipOutputStream}.
235     * @throws IOException If writing fails
236     * @see #zipEntryWriter()
237     */
238    public void writeTo(final ZipArchiveOutputStream target) throws IOException {
239        backingStore.closeForWriting();
240        try (InputStream data = backingStore.getInputStream()) {
241            for (final CompressedEntry compressedEntry : items) {
242                // @formatter:off
243                try (BoundedInputStream rawStream = BoundedInputStream.builder()
244                        .setInputStream(data)
245                        .setMaxCount(compressedEntry.compressedSize)
246                        .setPropagateClose(false)
247                        .get()) {
248                    target.addRawArchiveEntry(compressedEntry.transferToArchiveEntry(), rawStream);
249                }
250                // @formatter:on
251            }
252        }
253    }
254
255    /**
256     * Gets a ZIP entry writer for this scatter stream.
257     *
258     * @throws IOException If getting scatter stream input stream
259     * @return the ZipEntryWriter created on first call of the method
260     */
261    public ZipEntryWriter zipEntryWriter() throws IOException {
262        if (zipEntryWriter == null) {
263            zipEntryWriter = new ZipEntryWriter(this);
264        }
265        return zipEntryWriter;
266    }
267}