001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   https://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019
020package org.apache.commons.compress.archivers.zip;
021
022import java.io.Closeable;
023import java.io.DataOutput;
024import java.io.IOException;
025import java.io.InputStream;
026import java.io.OutputStream;
027import java.nio.ByteBuffer;
028import java.nio.channels.SeekableByteChannel;
029import java.util.zip.CRC32;
030import java.util.zip.Deflater;
031import java.util.zip.ZipEntry;
032
033import org.apache.commons.compress.parallel.ScatterGatherBackingStore;
034
035/**
036 * Encapsulates a {@link Deflater} and CRC calculator, handling multiple types of output streams. Currently {@link java.util.zip.ZipEntry#DEFLATED} and
037 * {@link java.util.zip.ZipEntry#STORED} are the only supported compression methods.
038 *
039 * @since 1.10
040 */
041public abstract class StreamCompressor implements Closeable {
042
043    private static final class DataOutputCompressor extends StreamCompressor {
044
045        private final DataOutput raf;
046
047        DataOutputCompressor(final Deflater deflater, final DataOutput raf) {
048            super(deflater);
049            this.raf = raf;
050        }
051
052        @Override
053        protected void writeOut(final byte[] data, final int offset, final int length) throws IOException {
054            raf.write(data, offset, length);
055        }
056    }
057
058    private static final class OutputStreamCompressor extends StreamCompressor {
059
060        private final OutputStream os;
061
062        OutputStreamCompressor(final Deflater deflater, final OutputStream os) {
063            super(deflater);
064            this.os = os;
065        }
066
067        @Override
068        protected void writeOut(final byte[] data, final int offset, final int length) throws IOException {
069            os.write(data, offset, length);
070        }
071    }
072
073    private static final class ScatterGatherBackingStoreCompressor extends StreamCompressor {
074
075        private final ScatterGatherBackingStore bs;
076
077        ScatterGatherBackingStoreCompressor(final Deflater deflater, final ScatterGatherBackingStore bs) {
078            super(deflater);
079            this.bs = bs;
080        }
081
082        @Override
083        protected void writeOut(final byte[] data, final int offset, final int length) throws IOException {
084            bs.writeOut(data, offset, length);
085        }
086    }
087
088    private static final class SeekableByteChannelCompressor extends StreamCompressor {
089
090        private final SeekableByteChannel channel;
091
092        SeekableByteChannelCompressor(final Deflater deflater, final SeekableByteChannel channel) {
093            super(deflater);
094            this.channel = channel;
095        }
096
097        @Override
098        protected void writeOut(final byte[] data, final int offset, final int length) throws IOException {
099            channel.write(ByteBuffer.wrap(data, offset, length));
100        }
101    }
102
103    /**
104     * Apparently Deflater.setInput gets slowed down a lot on Sun JVMs when it gets handed a huge buffer. See
105     * https://issues.apache.org/bugzilla/show_bug.cgi?id=45396
106     *
107     * Using a buffer size of {@value} bytes proved to be a good compromise
108     */
109    private static final int DEFLATER_BLOCK_SIZE = 8192;
110    private static final int BUFFER_SIZE = 4096;
111
112    /**
113     * Creates a stream compressor with the given compression level.
114     *
115     * @param os       The DataOutput to receive output
116     * @param deflater The deflater to use for the compressor
117     * @return A stream compressor
118     */
119    static StreamCompressor create(final DataOutput os, final Deflater deflater) {
120        return new DataOutputCompressor(deflater, os);
121    }
122
123    /**
124     * Creates a stream compressor with the given compression level.
125     *
126     * @param compressionLevel The {@link Deflater} compression level
127     * @param bs               The ScatterGatherBackingStore to receive output
128     * @return A stream compressor
129     */
130    public static StreamCompressor create(final int compressionLevel, final ScatterGatherBackingStore bs) {
131        final Deflater deflater = new Deflater(compressionLevel, true);
132        return new ScatterGatherBackingStoreCompressor(deflater, bs);
133    }
134
135    /**
136     * Creates a stream compressor with the default compression level.
137     *
138     * @param os The stream to receive output
139     * @return A stream compressor
140     */
141    static StreamCompressor create(final OutputStream os) {
142        return create(os, new Deflater(Deflater.DEFAULT_COMPRESSION, true));
143    }
144
145    /**
146     * Creates a stream compressor with the given compression level.
147     *
148     * @param os       The stream to receive output
149     * @param deflater The deflater to use
150     * @return A stream compressor
151     */
152    static StreamCompressor create(final OutputStream os, final Deflater deflater) {
153        return new OutputStreamCompressor(deflater, os);
154    }
155
156    /**
157     * Creates a stream compressor with the default compression level.
158     *
159     * @param bs The ScatterGatherBackingStore to receive output
160     * @return A stream compressor
161     */
162    public static StreamCompressor create(final ScatterGatherBackingStore bs) {
163        return create(Deflater.DEFAULT_COMPRESSION, bs);
164    }
165
166    /**
167     * Creates a stream compressor with the given compression level.
168     *
169     * @param os       The SeekableByteChannel to receive output
170     * @param deflater The deflater to use for the compressor
171     * @return A stream compressor
172     * @since 1.13
173     */
174    static StreamCompressor create(final SeekableByteChannel os, final Deflater deflater) {
175        return new SeekableByteChannelCompressor(deflater, os);
176    }
177
178    private final Deflater deflater;
179    private final CRC32 crc = new CRC32();
180    private long writtenToOutputStreamForLastEntry;
181    private long sourcePayloadLength;
182    private long totalWrittenToOutputStream;
183    private final byte[] outputBuffer = new byte[BUFFER_SIZE];
184    private final byte[] readerBuf = new byte[BUFFER_SIZE];
185
186    StreamCompressor(final Deflater deflater) {
187        this.deflater = deflater;
188    }
189
190    @Override
191    public void close() throws IOException {
192        deflater.end();
193    }
194
195    void deflate() throws IOException {
196        final int len = deflater.deflate(outputBuffer, 0, outputBuffer.length);
197        if (len > 0) {
198            writeCounted(outputBuffer, 0, len);
199        }
200    }
201
202    /**
203     * Deflates the given source using the supplied compression method
204     *
205     * @param source The source to compress
206     * @param method The #ZipArchiveEntry compression method
207     * @throws IOException When failures happen
208     */
209    public void deflate(final InputStream source, final int method) throws IOException {
210        reset();
211        int length;
212        while ((length = source.read(readerBuf, 0, readerBuf.length)) >= 0) {
213            write(readerBuf, 0, length, method);
214        }
215        if (method == ZipEntry.DEFLATED) {
216            flushDeflater();
217        }
218    }
219
220    private void deflateUntilInputIsNeeded() throws IOException {
221        while (!deflater.needsInput()) {
222            deflate();
223        }
224    }
225
226    void flushDeflater() throws IOException {
227        deflater.finish();
228        while (!deflater.finished()) {
229            deflate();
230        }
231    }
232
233    /**
234     * Gets the number of bytes read from the source stream
235     *
236     * @return The number of bytes read, never negative
237     */
238    public long getBytesRead() {
239        return sourcePayloadLength;
240    }
241
242    /**
243     * Gets the number of bytes written to the output for the last entry
244     *
245     * @return The number of bytes, never negative
246     */
247    public long getBytesWrittenForLastEntry() {
248        return writtenToOutputStreamForLastEntry;
249    }
250
251    /**
252     * Gets the CRC-32 of the last deflated file
253     *
254     * @return the CRC-32
255     */
256    public long getCrc32() {
257        return crc.getValue();
258    }
259
260    /**
261     * Gets the total number of bytes written to the output for all files
262     *
263     * @return The number of bytes, never negative
264     */
265    public long getTotalBytesWritten() {
266        return totalWrittenToOutputStream;
267    }
268
269    void reset() {
270        crc.reset();
271        deflater.reset();
272        sourcePayloadLength = 0;
273        writtenToOutputStreamForLastEntry = 0;
274    }
275
276    /**
277     * Writes bytes to ZIP entry.
278     *
279     * @param b      the byte array to write
280     * @param offset the start position to write from
281     * @param length the number of bytes to write
282     * @param method the comrpession method to use
283     * @return the number of bytes written to the stream this time
284     * @throws IOException on error
285     */
286    long write(final byte[] b, final int offset, final int length, final int method) throws IOException {
287        final long current = writtenToOutputStreamForLastEntry;
288        crc.update(b, offset, length);
289        if (method == ZipEntry.DEFLATED) {
290            writeDeflated(b, offset, length);
291        } else {
292            writeCounted(b, offset, length);
293        }
294        sourcePayloadLength += length;
295        return writtenToOutputStreamForLastEntry - current;
296    }
297
298    /**
299     * Writes the specified byte array to the output stream.
300     *
301     * @param data   the data.
302     * @exception IOException if an I/O error occurs.
303     */
304    public void writeCounted(final byte[] data) throws IOException {
305        writeCounted(data, 0, data.length);
306    }
307
308    /**
309     * Writes {@code len} bytes from the specified byte array starting at offset {@code off} to the output stream.
310     *
311     * @param data   the data.
312     * @param offset the start offset in the data.
313     * @param length the number of bytes to write.
314     * @exception IOException if an I/O error occurs.
315     */
316    public void writeCounted(final byte[] data, final int offset, final int length) throws IOException {
317        writeOut(data, offset, length);
318        writtenToOutputStreamForLastEntry += length;
319        totalWrittenToOutputStream += length;
320    }
321
322    private void writeDeflated(final byte[] b, final int offset, final int length) throws IOException {
323        if (length > 0 && !deflater.finished()) {
324            if (length <= DEFLATER_BLOCK_SIZE) {
325                deflater.setInput(b, offset, length);
326                deflateUntilInputIsNeeded();
327            } else {
328                final int fullblocks = length / DEFLATER_BLOCK_SIZE;
329                for (int i = 0; i < fullblocks; i++) {
330                    deflater.setInput(b, offset + i * DEFLATER_BLOCK_SIZE, DEFLATER_BLOCK_SIZE);
331                    deflateUntilInputIsNeeded();
332                }
333                final int done = fullblocks * DEFLATER_BLOCK_SIZE;
334                if (done < length) {
335                    deflater.setInput(b, offset + done, length - done);
336                    deflateUntilInputIsNeeded();
337                }
338            }
339        }
340    }
341
342    /**
343     * Writes {@code len} bytes from the specified byte array starting at offset {@code off} to the output stream.
344     *
345     * @param data   the data.
346     * @param offset the start offset in the data.
347     * @param length the number of bytes to write.
348     * @exception IOException if an I/O error occurs.
349     */
350    protected abstract void writeOut(byte[] data, int offset, int length) throws IOException;
351}