View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one or more
3    *  contributor license agreements.  See the NOTICE file distributed with
4    *  this work for additional information regarding copyright ownership.
5    *  The ASF licenses this file to You under the Apache License, Version 2.0
6    *  (the "License"); you may not use this file except in compliance with
7    *  the License.  You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   *  Unless required by applicable law or agreed to in writing, software
12   *  distributed under the License is distributed on an "AS IS" BASIS,
13   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   *  See the License for the specific language governing permissions and
15   *  limitations under the License.
16   */
17  package org.apache.commons.compress.archivers.zip;
18  
19  import java.io.Closeable;
20  import java.io.DataOutput;
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.OutputStream;
24  import java.nio.ByteBuffer;
25  import java.nio.channels.SeekableByteChannel;
26  import java.util.zip.CRC32;
27  import java.util.zip.Deflater;
28  import java.util.zip.ZipEntry;
29  
30  import org.apache.commons.compress.parallel.ScatterGatherBackingStore;
31  
32  /**
33   * Encapsulates a {@link Deflater} and crc calculator, handling multiple types of output streams. Currently {@link java.util.zip.ZipEntry#DEFLATED} and
34   * {@link java.util.zip.ZipEntry#STORED} are the only supported compression methods.
35   *
36   * @since 1.10
37   */
38  public abstract class StreamCompressor implements Closeable {
39  
40      private static final class DataOutputCompressor extends StreamCompressor {
41          private final DataOutput raf;
42  
43          DataOutputCompressor(final Deflater deflater, final DataOutput raf) {
44              super(deflater);
45              this.raf = raf;
46          }
47  
48          @Override
49          protected void writeOut(final byte[] data, final int offset, final int length) throws IOException {
50              raf.write(data, offset, length);
51          }
52      }
53  
54      private static final class OutputStreamCompressor extends StreamCompressor {
55          private final OutputStream os;
56  
57          OutputStreamCompressor(final Deflater deflater, final OutputStream os) {
58              super(deflater);
59              this.os = os;
60          }
61  
62          @Override
63          protected void writeOut(final byte[] data, final int offset, final int length) throws IOException {
64              os.write(data, offset, length);
65          }
66      }
67  
68      private static final class ScatterGatherBackingStoreCompressor extends StreamCompressor {
69          private final ScatterGatherBackingStore bs;
70  
71          ScatterGatherBackingStoreCompressor(final Deflater deflater, final ScatterGatherBackingStore bs) {
72              super(deflater);
73              this.bs = bs;
74          }
75  
76          @Override
77          protected void writeOut(final byte[] data, final int offset, final int length) throws IOException {
78              bs.writeOut(data, offset, length);
79          }
80      }
81  
82      private static final class SeekableByteChannelCompressor extends StreamCompressor {
83          private final SeekableByteChannel channel;
84  
85          SeekableByteChannelCompressor(final Deflater deflater, final SeekableByteChannel channel) {
86              super(deflater);
87              this.channel = channel;
88          }
89  
90          @Override
91          protected void writeOut(final byte[] data, final int offset, final int length) throws IOException {
92              channel.write(ByteBuffer.wrap(data, offset, length));
93          }
94      }
95  
96      /*
97       * Apparently Deflater.setInput gets slowed down a lot on Sun JVMs when it gets handed a huge buffer. See
98       * https://issues.apache.org/bugzilla/show_bug.cgi?id=45396
99       *
100      * Using a buffer size of 8 kB proved to be a good compromise
101      */
102     private static final int DEFLATER_BLOCK_SIZE = 8192;
103     private static final int BUFFER_SIZE = 4096;
104 
105     /**
106      * Creates a stream compressor with the given compression level.
107      *
108      * @param os       The DataOutput to receive output
109      * @param deflater The deflater to use for the compressor
110      * @return A stream compressor
111      */
112     static StreamCompressor create(final DataOutput os, final Deflater deflater) {
113         return new DataOutputCompressor(deflater, os);
114     }
115 
116     /**
117      * Creates a stream compressor with the given compression level.
118      *
119      * @param compressionLevel The {@link Deflater} compression level
120      * @param bs               The ScatterGatherBackingStore to receive output
121      * @return A stream compressor
122      */
123     public static StreamCompressor create(final int compressionLevel, final ScatterGatherBackingStore bs) {
124         final Deflater deflater = new Deflater(compressionLevel, true);
125         return new ScatterGatherBackingStoreCompressor(deflater, bs);
126     }
127 
128     /**
129      * Creates a stream compressor with the default compression level.
130      *
131      * @param os The stream to receive output
132      * @return A stream compressor
133      */
134     static StreamCompressor create(final OutputStream os) {
135         return create(os, new Deflater(Deflater.DEFAULT_COMPRESSION, true));
136     }
137 
138     /**
139      * Creates a stream compressor with the given compression level.
140      *
141      * @param os       The stream to receive output
142      * @param deflater The deflater to use
143      * @return A stream compressor
144      */
145     static StreamCompressor create(final OutputStream os, final Deflater deflater) {
146         return new OutputStreamCompressor(deflater, os);
147     }
148 
149     /**
150      * Creates a stream compressor with the default compression level.
151      *
152      * @param bs The ScatterGatherBackingStore to receive output
153      * @return A stream compressor
154      */
155     public static StreamCompressor create(final ScatterGatherBackingStore bs) {
156         return create(Deflater.DEFAULT_COMPRESSION, bs);
157     }
158 
159     /**
160      * Creates a stream compressor with the given compression level.
161      *
162      * @param os       The SeekableByteChannel to receive output
163      * @param deflater The deflater to use for the compressor
164      * @return A stream compressor
165      * @since 1.13
166      */
167     static StreamCompressor create(final SeekableByteChannel os, final Deflater deflater) {
168         return new SeekableByteChannelCompressor(deflater, os);
169     }
170 
171     private final Deflater def;
172 
173     private final CRC32 crc = new CRC32();
174 
175     private long writtenToOutputStreamForLastEntry;
176 
177     private long sourcePayloadLength;
178 
179     private long totalWrittenToOutputStream;
180 
181     private final byte[] outputBuffer = new byte[BUFFER_SIZE];
182 
183     private final byte[] readerBuf = new byte[BUFFER_SIZE];
184 
185     StreamCompressor(final Deflater deflater) {
186         this.def = deflater;
187     }
188 
189     @Override
190     public void close() throws IOException {
191         def.end();
192     }
193 
194     void deflate() throws IOException {
195         final int len = def.deflate(outputBuffer, 0, outputBuffer.length);
196         if (len > 0) {
197             writeCounted(outputBuffer, 0, len);
198         }
199     }
200 
201     /**
202      * Deflate the given source using the supplied compression method
203      *
204      * @param source The source to compress
205      * @param method The #ZipArchiveEntry compression method
206      * @throws IOException When failures happen
207      */
208 
209     public void deflate(final InputStream source, final int method) throws IOException {
210         reset();
211         int length;
212 
213         while ((length = source.read(readerBuf, 0, readerBuf.length)) >= 0) {
214             write(readerBuf, 0, length, method);
215         }
216         if (method == ZipEntry.DEFLATED) {
217             flushDeflater();
218         }
219     }
220 
221     private void deflateUntilInputIsNeeded() throws IOException {
222         while (!def.needsInput()) {
223             deflate();
224         }
225     }
226 
227     void flushDeflater() throws IOException {
228         def.finish();
229         while (!def.finished()) {
230             deflate();
231         }
232     }
233 
234     /**
235      * Gets the number of bytes read from the source stream
236      *
237      * @return The number of bytes read, never negative
238      */
239     public long getBytesRead() {
240         return sourcePayloadLength;
241     }
242 
243     /**
244      * The number of bytes written to the output for the last entry
245      *
246      * @return The number of bytes, never negative
247      */
248     public long getBytesWrittenForLastEntry() {
249         return writtenToOutputStreamForLastEntry;
250     }
251 
252     /**
253      * The crc32 of the last deflated file
254      *
255      * @return the crc32
256      */
257 
258     public long getCrc32() {
259         return crc.getValue();
260     }
261 
262     /**
263      * The total number of bytes written to the output for all files
264      *
265      * @return The number of bytes, never negative
266      */
267     public long getTotalBytesWritten() {
268         return totalWrittenToOutputStream;
269     }
270 
271     void reset() {
272         crc.reset();
273         def.reset();
274         sourcePayloadLength = 0;
275         writtenToOutputStreamForLastEntry = 0;
276     }
277 
278     /**
279      * Writes bytes to ZIP entry.
280      *
281      * @param b      the byte array to write
282      * @param offset the start position to write from
283      * @param length the number of bytes to write
284      * @param method the comrpession method to use
285      * @return the number of bytes written to the stream this time
286      * @throws IOException on error
287      */
288     long write(final byte[] b, final int offset, final int length, final int method) throws IOException {
289         final long current = writtenToOutputStreamForLastEntry;
290         crc.update(b, offset, length);
291         if (method == ZipEntry.DEFLATED) {
292             writeDeflated(b, offset, length);
293         } else {
294             writeCounted(b, offset, length);
295         }
296         sourcePayloadLength += length;
297         return writtenToOutputStreamForLastEntry - current;
298     }
299 
300     public void writeCounted(final byte[] data) throws IOException {
301         writeCounted(data, 0, data.length);
302     }
303 
304     public void writeCounted(final byte[] data, final int offset, final int length) throws IOException {
305         writeOut(data, offset, length);
306         writtenToOutputStreamForLastEntry += length;
307         totalWrittenToOutputStream += length;
308     }
309 
310     private void writeDeflated(final byte[] b, final int offset, final int length) throws IOException {
311         if (length > 0 && !def.finished()) {
312             if (length <= DEFLATER_BLOCK_SIZE) {
313                 def.setInput(b, offset, length);
314                 deflateUntilInputIsNeeded();
315             } else {
316                 final int fullblocks = length / DEFLATER_BLOCK_SIZE;
317                 for (int i = 0; i < fullblocks; i++) {
318                     def.setInput(b, offset + i * DEFLATER_BLOCK_SIZE, DEFLATER_BLOCK_SIZE);
319                     deflateUntilInputIsNeeded();
320                 }
321                 final int done = fullblocks * DEFLATER_BLOCK_SIZE;
322                 if (done < length) {
323                     def.setInput(b, offset + done, length - done);
324                     deflateUntilInputIsNeeded();
325                 }
326             }
327         }
328     }
329 
330     protected abstract void writeOut(byte[] data, int offset, int length) throws IOException;
331 }