View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   * http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.commons.compress.compressors.snappy;
20  
21  import java.io.ByteArrayOutputStream;
22  import java.io.IOException;
23  import java.io.OutputStream;
24  
25  import org.apache.commons.codec.digest.PureJavaCrc32C;
26  import org.apache.commons.compress.compressors.CompressorOutputStream;
27  import org.apache.commons.compress.compressors.lz77support.Parameters;
28  import org.apache.commons.compress.utils.ByteUtils;
29  
30  /**
31   * CompressorOutputStream for the framing Snappy format.
32   *
33   * <p>
34   * Based on the "spec" in the version "Last revised: 2013-10-25"
35   * </p>
36   *
37   * @see <a href="https://github.com/google/snappy/blob/master/framing_format.txt">Snappy framing format description</a>
38   * @since 1.14
39   * @NotThreadSafe
40   */
41  public class FramedSnappyCompressorOutputStream extends CompressorOutputStream {
42      // see spec:
43      // > However, we place an additional restriction that the uncompressed data
44      // > in a chunk must be no longer than 65,536 bytes. This allows consumers to
45      // > easily use small fixed-size buffers.
46      private static final int MAX_COMPRESSED_BUFFER_SIZE = 1 << 16;
47  
48      static long mask(long x) {
49          // ugly, maybe we should just have used ints and deal with the
50          // overflow
51          x = x >> 15 | x << 17;
52          x += FramedSnappyCompressorInputStream.MASK_OFFSET;
53          x &= 0xffffFFFFL;
54          return x;
55      }
56  
57      private final OutputStream out;
58      private final Parameters params;
59      private final PureJavaCrc32C checksum = new PureJavaCrc32C();
60      // used in one-arg write method
61      private final byte[] oneByte = new byte[1];
62      private final byte[] buffer = new byte[MAX_COMPRESSED_BUFFER_SIZE];
63  
64      private int currentIndex;
65  
66      private final ByteUtils.ByteConsumer consumer;
67  
68      /**
69       * Constructs a new output stream that compresses snappy-framed-compressed data to the specified output stream.
70       *
71       * @param out the OutputStream to which to write the compressed data
72       * @throws IOException if writing the signature fails
73       */
74      public FramedSnappyCompressorOutputStream(final OutputStream out) throws IOException {
75          this(out, SnappyCompressorOutputStream.createParameterBuilder(SnappyCompressorInputStream.DEFAULT_BLOCK_SIZE).build());
76      }
77  
78      /**
79       * Constructs a new output stream that compresses snappy-framed-compressed data to the specified output stream.
80       *
81       * @param out    the OutputStream to which to write the compressed data
82       * @param params parameters used to fine-tune compression, in particular to balance compression ratio vs compression speed.
83       * @throws IOException if writing the signature fails
84       */
85      public FramedSnappyCompressorOutputStream(final OutputStream out, final Parameters params) throws IOException {
86          this.out = out;
87          this.params = params;
88          consumer = new ByteUtils.OutputStreamByteConsumer(out);
89          out.write(FramedSnappyCompressorInputStream.SZ_SIGNATURE);
90      }
91  
92      @Override
93      public void close() throws IOException {
94          try {
95              finish();
96          } finally {
97              out.close();
98          }
99      }
100 
101     /**
102      * Compresses all remaining data and writes it to the stream, doesn't close the underlying stream.
103      *
104      * @throws IOException if an error occurs
105      */
106     public void finish() throws IOException {
107         flushBuffer();
108     }
109 
110     private void flushBuffer() throws IOException {
111         if (currentIndex == 0) {
112             return;
113         }
114         out.write(FramedSnappyCompressorInputStream.COMPRESSED_CHUNK_TYPE);
115         final ByteArrayOutputStream baos = new ByteArrayOutputStream();
116         try (OutputStream o = new SnappyCompressorOutputStream(baos, currentIndex, params)) {
117             o.write(buffer, 0, currentIndex);
118         }
119         final byte[] b = baos.toByteArray();
120         writeLittleEndian(3, b.length + 4L /* CRC */);
121         writeCrc();
122         out.write(b);
123         currentIndex = 0;
124     }
125 
126     @Override
127     public void write(final byte[] data, int off, int len) throws IOException {
128         int blockDataRemaining = buffer.length - currentIndex;
129         while (len > 0) {
130             final int copyLen = Math.min(len, blockDataRemaining);
131             System.arraycopy(data, off, buffer, currentIndex, copyLen);
132             off += copyLen;
133             blockDataRemaining -= copyLen;
134             len -= copyLen;
135             currentIndex += copyLen;
136             if (blockDataRemaining == 0) {
137                 flushBuffer();
138                 blockDataRemaining = buffer.length;
139             }
140         }
141     }
142 
143     @Override
144     public void write(final int b) throws IOException {
145         oneByte[0] = (byte) (b & 0xff);
146         write(oneByte);
147     }
148 
149     private void writeCrc() throws IOException {
150         checksum.update(buffer, 0, currentIndex);
151         writeLittleEndian(4, mask(checksum.getValue()));
152         checksum.reset();
153     }
154 
155     private void writeLittleEndian(final int numBytes, final long num) throws IOException {
156         ByteUtils.toLittleEndian(consumer, num, numBytes);
157     }
158 }