001/*
002 *  Licensed to the Apache Software Foundation (ASF) under one or more
003 *  contributor license agreements.  See the NOTICE file distributed with
004 *  this work for additional information regarding copyright ownership.
005 *  The ASF licenses this file to You under the Apache License, Version 2.0
006 *  (the "License"); you may not use this file except in compliance with
007 *  the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 *  Unless required by applicable law or agreed to in writing, software
012 *  distributed under the License is distributed on an "AS IS" BASIS,
013 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 *  See the License for the specific language governing permissions and
015 *  limitations under the License.
016 */
017package org.apache.commons.compress.archivers.zip;
018
019import java.io.Closeable;
020import java.io.DataOutput;
021import java.io.IOException;
022import java.io.InputStream;
023import java.io.OutputStream;
024import java.nio.ByteBuffer;
025import java.nio.channels.SeekableByteChannel;
026import java.util.zip.CRC32;
027import java.util.zip.Deflater;
028import java.util.zip.ZipEntry;
029
030import org.apache.commons.compress.parallel.ScatterGatherBackingStore;
031
032/**
033 * Encapsulates a {@link Deflater} and crc calculator, handling multiple types of output streams. Currently {@link java.util.zip.ZipEntry#DEFLATED} and
034 * {@link java.util.zip.ZipEntry#STORED} are the only supported compression methods.
035 *
036 * @since 1.10
037 */
038public abstract class StreamCompressor implements Closeable {
039
040    private static final class DataOutputCompressor extends StreamCompressor {
041        private final DataOutput raf;
042
043        DataOutputCompressor(final Deflater deflater, final DataOutput raf) {
044            super(deflater);
045            this.raf = raf;
046        }
047
048        @Override
049        protected void writeOut(final byte[] data, final int offset, final int length) throws IOException {
050            raf.write(data, offset, length);
051        }
052    }
053
054    private static final class OutputStreamCompressor extends StreamCompressor {
055        private final OutputStream os;
056
057        OutputStreamCompressor(final Deflater deflater, final OutputStream os) {
058            super(deflater);
059            this.os = os;
060        }
061
062        @Override
063        protected void writeOut(final byte[] data, final int offset, final int length) throws IOException {
064            os.write(data, offset, length);
065        }
066    }
067
068    private static final class ScatterGatherBackingStoreCompressor extends StreamCompressor {
069        private final ScatterGatherBackingStore bs;
070
071        ScatterGatherBackingStoreCompressor(final Deflater deflater, final ScatterGatherBackingStore bs) {
072            super(deflater);
073            this.bs = bs;
074        }
075
076        @Override
077        protected void writeOut(final byte[] data, final int offset, final int length) throws IOException {
078            bs.writeOut(data, offset, length);
079        }
080    }
081
082    private static final class SeekableByteChannelCompressor extends StreamCompressor {
083        private final SeekableByteChannel channel;
084
085        SeekableByteChannelCompressor(final Deflater deflater, final SeekableByteChannel channel) {
086            super(deflater);
087            this.channel = channel;
088        }
089
090        @Override
091        protected void writeOut(final byte[] data, final int offset, final int length) throws IOException {
092            channel.write(ByteBuffer.wrap(data, offset, length));
093        }
094    }
095
096    /*
097     * Apparently Deflater.setInput gets slowed down a lot on Sun JVMs when it gets handed a huge buffer. See
098     * https://issues.apache.org/bugzilla/show_bug.cgi?id=45396
099     *
100     * Using a buffer size of 8 kB proved to be a good compromise
101     */
102    private static final int DEFLATER_BLOCK_SIZE = 8192;
103    private static final int BUFFER_SIZE = 4096;
104
105    /**
106     * Creates a stream compressor with the given compression level.
107     *
108     * @param os       The DataOutput to receive output
109     * @param deflater The deflater to use for the compressor
110     * @return A stream compressor
111     */
112    static StreamCompressor create(final DataOutput os, final Deflater deflater) {
113        return new DataOutputCompressor(deflater, os);
114    }
115
116    /**
117     * Creates a stream compressor with the given compression level.
118     *
119     * @param compressionLevel The {@link Deflater} compression level
120     * @param bs               The ScatterGatherBackingStore to receive output
121     * @return A stream compressor
122     */
123    public static StreamCompressor create(final int compressionLevel, final ScatterGatherBackingStore bs) {
124        final Deflater deflater = new Deflater(compressionLevel, true);
125        return new ScatterGatherBackingStoreCompressor(deflater, bs);
126    }
127
128    /**
129     * Creates a stream compressor with the default compression level.
130     *
131     * @param os The stream to receive output
132     * @return A stream compressor
133     */
134    static StreamCompressor create(final OutputStream os) {
135        return create(os, new Deflater(Deflater.DEFAULT_COMPRESSION, true));
136    }
137
138    /**
139     * Creates a stream compressor with the given compression level.
140     *
141     * @param os       The stream to receive output
142     * @param deflater The deflater to use
143     * @return A stream compressor
144     */
145    static StreamCompressor create(final OutputStream os, final Deflater deflater) {
146        return new OutputStreamCompressor(deflater, os);
147    }
148
149    /**
150     * Creates a stream compressor with the default compression level.
151     *
152     * @param bs The ScatterGatherBackingStore to receive output
153     * @return A stream compressor
154     */
155    public static StreamCompressor create(final ScatterGatherBackingStore bs) {
156        return create(Deflater.DEFAULT_COMPRESSION, bs);
157    }
158
159    /**
160     * Creates a stream compressor with the given compression level.
161     *
162     * @param os       The SeekableByteChannel to receive output
163     * @param deflater The deflater to use for the compressor
164     * @return A stream compressor
165     * @since 1.13
166     */
167    static StreamCompressor create(final SeekableByteChannel os, final Deflater deflater) {
168        return new SeekableByteChannelCompressor(deflater, os);
169    }
170
171    private final Deflater def;
172
173    private final CRC32 crc = new CRC32();
174
175    private long writtenToOutputStreamForLastEntry;
176
177    private long sourcePayloadLength;
178
179    private long totalWrittenToOutputStream;
180
181    private final byte[] outputBuffer = new byte[BUFFER_SIZE];
182
183    private final byte[] readerBuf = new byte[BUFFER_SIZE];
184
185    StreamCompressor(final Deflater deflater) {
186        this.def = deflater;
187    }
188
189    @Override
190    public void close() throws IOException {
191        def.end();
192    }
193
194    void deflate() throws IOException {
195        final int len = def.deflate(outputBuffer, 0, outputBuffer.length);
196        if (len > 0) {
197            writeCounted(outputBuffer, 0, len);
198        }
199    }
200
201    /**
202     * Deflate the given source using the supplied compression method
203     *
204     * @param source The source to compress
205     * @param method The #ZipArchiveEntry compression method
206     * @throws IOException When failures happen
207     */
208
209    public void deflate(final InputStream source, final int method) throws IOException {
210        reset();
211        int length;
212
213        while ((length = source.read(readerBuf, 0, readerBuf.length)) >= 0) {
214            write(readerBuf, 0, length, method);
215        }
216        if (method == ZipEntry.DEFLATED) {
217            flushDeflater();
218        }
219    }
220
221    private void deflateUntilInputIsNeeded() throws IOException {
222        while (!def.needsInput()) {
223            deflate();
224        }
225    }
226
227    void flushDeflater() throws IOException {
228        def.finish();
229        while (!def.finished()) {
230            deflate();
231        }
232    }
233
234    /**
235     * Gets the number of bytes read from the source stream
236     *
237     * @return The number of bytes read, never negative
238     */
239    public long getBytesRead() {
240        return sourcePayloadLength;
241    }
242
243    /**
244     * The number of bytes written to the output for the last entry
245     *
246     * @return The number of bytes, never negative
247     */
248    public long getBytesWrittenForLastEntry() {
249        return writtenToOutputStreamForLastEntry;
250    }
251
252    /**
253     * The crc32 of the last deflated file
254     *
255     * @return the crc32
256     */
257
258    public long getCrc32() {
259        return crc.getValue();
260    }
261
262    /**
263     * The total number of bytes written to the output for all files
264     *
265     * @return The number of bytes, never negative
266     */
267    public long getTotalBytesWritten() {
268        return totalWrittenToOutputStream;
269    }
270
271    void reset() {
272        crc.reset();
273        def.reset();
274        sourcePayloadLength = 0;
275        writtenToOutputStreamForLastEntry = 0;
276    }
277
278    /**
279     * Writes bytes to ZIP entry.
280     *
281     * @param b      the byte array to write
282     * @param offset the start position to write from
283     * @param length the number of bytes to write
284     * @param method the comrpession method to use
285     * @return the number of bytes written to the stream this time
286     * @throws IOException on error
287     */
288    long write(final byte[] b, final int offset, final int length, final int method) throws IOException {
289        final long current = writtenToOutputStreamForLastEntry;
290        crc.update(b, offset, length);
291        if (method == ZipEntry.DEFLATED) {
292            writeDeflated(b, offset, length);
293        } else {
294            writeCounted(b, offset, length);
295        }
296        sourcePayloadLength += length;
297        return writtenToOutputStreamForLastEntry - current;
298    }
299
300    public void writeCounted(final byte[] data) throws IOException {
301        writeCounted(data, 0, data.length);
302    }
303
304    public void writeCounted(final byte[] data, final int offset, final int length) throws IOException {
305        writeOut(data, offset, length);
306        writtenToOutputStreamForLastEntry += length;
307        totalWrittenToOutputStream += length;
308    }
309
310    private void writeDeflated(final byte[] b, final int offset, final int length) throws IOException {
311        if (length > 0 && !def.finished()) {
312            if (length <= DEFLATER_BLOCK_SIZE) {
313                def.setInput(b, offset, length);
314                deflateUntilInputIsNeeded();
315            } else {
316                final int fullblocks = length / DEFLATER_BLOCK_SIZE;
317                for (int i = 0; i < fullblocks; i++) {
318                    def.setInput(b, offset + i * DEFLATER_BLOCK_SIZE, DEFLATER_BLOCK_SIZE);
319                    deflateUntilInputIsNeeded();
320                }
321                final int done = fullblocks * DEFLATER_BLOCK_SIZE;
322                if (done < length) {
323                    def.setInput(b, offset + done, length - done);
324                    deflateUntilInputIsNeeded();
325                }
326            }
327        }
328    }
329
330    protected abstract void writeOut(byte[] data, int offset, int length) throws IOException;
331}