View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   https://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.commons.compress.compressors.snappy;
20  
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.PushbackInputStream;
24  import java.util.Arrays;
25  
26  import org.apache.commons.codec.digest.PureJavaCrc32C;
27  import org.apache.commons.compress.compressors.CompressorInputStream;
28  import org.apache.commons.compress.utils.ByteUtils;
29  import org.apache.commons.compress.utils.IOUtils;
30  import org.apache.commons.compress.utils.InputStreamStatistics;
31  import org.apache.commons.io.input.BoundedInputStream;
32  
33  /**
34   * CompressorInputStream for the framing Snappy format.
35   *
36   * <p>
37   * Based on the "spec" in the version "Last revised: 2013-10-25"
38   * </p>
39   *
40   * @see <a href="https://github.com/google/snappy/blob/master/framing_format.txt">Snappy framing format description</a>
41   * @since 1.7
42   */
43  public class FramedSnappyCompressorInputStream extends CompressorInputStream implements InputStreamStatistics {
44  
45      /**
46       * package private for tests only.
47       */
48      static final long MASK_OFFSET = 0xa282ead8L;
49  
50      private static final int STREAM_IDENTIFIER_TYPE = 0xff;
51      static final int COMPRESSED_CHUNK_TYPE = 0;
52      private static final int UNCOMPRESSED_CHUNK_TYPE = 1;
53      private static final int PADDING_CHUNK_TYPE = 0xfe;
54      private static final int MIN_UNSKIPPABLE_TYPE = 2;
55      private static final int MAX_UNSKIPPABLE_TYPE = 0x7f;
56      private static final int MAX_SKIPPABLE_TYPE = 0xfd;
57  
58      // used by FramedSnappyCompressorOutputStream as well
59      static final byte[] SZ_SIGNATURE = { // NOSONAR
60              (byte) STREAM_IDENTIFIER_TYPE, // tag
61              6, 0, 0, // length
62              's', 'N', 'a', 'P', 'p', 'Y' };
63  
64      /**
65       * Checks if the signature matches what is expected for a .sz file.
66       *
67       * <p>
68       * .sz files start with a chunk with tag 0xff and content sNaPpY.
69       * </p>
70       *
71       * @param signature the bytes to check
72       * @param length    the number of bytes to check
73       * @return true if this is a .sz stream, false otherwise
74       */
75      public static boolean matches(final byte[] signature, final int length) {
76  
77          if (length < SZ_SIGNATURE.length) {
78              return false;
79          }
80  
81          byte[] shortenedSig = signature;
82          if (signature.length > SZ_SIGNATURE.length) {
83              shortenedSig = Arrays.copyOf(signature, SZ_SIGNATURE.length);
84          }
85  
86          return Arrays.equals(shortenedSig, SZ_SIGNATURE);
87      }
88  
89      static long unmask(long x) {
90          // ugly, maybe we should just have used ints and deal with the
91          // overflow
92          x -= MASK_OFFSET;
93          x &= 0xffffFFFFL;
94          return (x >> 17 | x << 15) & 0xffffFFFFL;
95      }
96  
97      private long unreadBytes;
98  
99      private final BoundedInputStream countingStream;
100 
101     /** The underlying stream to read compressed data from */
102     private final PushbackInputStream inputStream;
103 
104     /** The dialect to expect */
105     private final FramedSnappyDialect dialect;
106 
107     private SnappyCompressorInputStream currentCompressedChunk;
108 
109     // used in no-arg read method
110     private final byte[] oneByte = new byte[1];
111     private boolean endReached;
112     private boolean inUncompressedChunk;
113     private int uncompressedBytesRemaining;
114     private long expectedChecksum = -1;
115 
116     private final int blockSize;
117 
118     private final PureJavaCrc32C checksum = new PureJavaCrc32C();
119 
120     private final ByteUtils.ByteSupplier supplier = this::readOneByte;
121 
122     /**
123      * Constructs a new input stream that decompresses snappy-framed-compressed data from the specified input stream using the
124      * {@link FramedSnappyDialect#STANDARD} dialect.
125      *
126      * @param in the InputStream from which to read the compressed data
127      * @throws IOException if reading fails
128      */
129     public FramedSnappyCompressorInputStream(final InputStream in) throws IOException {
130         this(in, FramedSnappyDialect.STANDARD);
131     }
132 
133     /**
134      * Constructs a new input stream that decompresses snappy-framed-compressed data from the specified input stream.
135      *
136      * @param in      the InputStream from which to read the compressed data
137      * @param dialect the dialect used by the compressed stream
138      * @throws IOException if reading fails
139      */
140     public FramedSnappyCompressorInputStream(final InputStream in, final FramedSnappyDialect dialect) throws IOException {
141         this(in, SnappyCompressorInputStream.DEFAULT_BLOCK_SIZE, dialect);
142     }
143 
144     /**
145      * Constructs a new input stream that decompresses snappy-framed-compressed data from the specified input stream.
146      *
147      * @param in        the InputStream from which to read the compressed data
148      * @param blockSize the block size to use for the compressed stream
149      * @param dialect   the dialect used by the compressed stream
150      * @throws IOException              if reading fails
151      * @throws IllegalArgumentException if blockSize is not bigger than 0
152      * @since 1.14
153      */
154     public FramedSnappyCompressorInputStream(final InputStream in, final int blockSize, final FramedSnappyDialect dialect) throws IOException {
155         if (blockSize <= 0) {
156             throw new IllegalArgumentException("blockSize must be bigger than 0");
157         }
158         countingStream = BoundedInputStream.builder().setInputStream(in).get();
159         this.inputStream = new PushbackInputStream(countingStream, 1);
160         this.blockSize = blockSize;
161         this.dialect = dialect;
162         if (dialect.hasStreamIdentifier()) {
163             readStreamIdentifier();
164         }
165     }
166 
167     /** {@inheritDoc} */
168     @Override
169     public int available() throws IOException {
170         if (inUncompressedChunk) {
171             return Math.min(uncompressedBytesRemaining, inputStream.available());
172         }
173         if (currentCompressedChunk != null) {
174             return currentCompressedChunk.available();
175         }
176         return 0;
177     }
178 
179     /** {@inheritDoc} */
180     @Override
181     public void close() throws IOException {
182         try {
183             org.apache.commons.io.IOUtils.close(currentCompressedChunk);
184             currentCompressedChunk = null;
185         } finally {
186             inputStream.close();
187         }
188     }
189 
190     /**
191      * @since 1.17
192      */
193     @Override
194     public long getCompressedCount() {
195         return countingStream.getCount() - unreadBytes;
196     }
197 
198     /** {@inheritDoc} */
199     @Override
200     public int read() throws IOException {
201         return read(oneByte, 0, 1) == -1 ? -1 : oneByte[0] & 0xFF;
202     }
203 
204     /** {@inheritDoc} */
205     @Override
206     public int read(final byte[] b, final int off, final int len) throws IOException {
207         if (len == 0) {
208             return 0;
209         }
210         int read = readOnce(b, off, len);
211         if (read == -1) {
212             readNextBlock();
213             if (endReached) {
214                 return -1;
215             }
216             read = readOnce(b, off, len);
217         }
218         return read;
219     }
220 
221     private long readCrc() throws IOException {
222         final byte[] b = new byte[4];
223         final int read = IOUtils.readFully(inputStream, b);
224         count(read);
225         if (read != 4) {
226             throw new IOException("Premature end of stream");
227         }
228         return ByteUtils.fromLittleEndian(b);
229     }
230 
231     private void readNextBlock() throws IOException {
232         verifyLastChecksumAndReset();
233         inUncompressedChunk = false;
234         final int type = readOneByte();
235         if (type == -1) {
236             endReached = true;
237         } else if (type == STREAM_IDENTIFIER_TYPE) {
238             inputStream.unread(type);
239             unreadBytes++;
240             pushedBackBytes(1);
241             readStreamIdentifier();
242             readNextBlock();
243         } else if (type == PADDING_CHUNK_TYPE || type > MAX_UNSKIPPABLE_TYPE && type <= MAX_SKIPPABLE_TYPE) {
244             skipBlock();
245             readNextBlock();
246         } else if (type >= MIN_UNSKIPPABLE_TYPE && type <= MAX_UNSKIPPABLE_TYPE) {
247             throw new IOException("Unskippable chunk with type " + type + " (hex " + Integer.toHexString(type) + ") detected.");
248         } else if (type == UNCOMPRESSED_CHUNK_TYPE) {
249             inUncompressedChunk = true;
250             uncompressedBytesRemaining = readSize() - 4 /* CRC */;
251             if (uncompressedBytesRemaining < 0) {
252                 throw new IOException("Found illegal chunk with negative size");
253             }
254             expectedChecksum = unmask(readCrc());
255         } else if (type == COMPRESSED_CHUNK_TYPE) {
256             final boolean expectChecksum = dialect.usesChecksumWithCompressedChunks();
257             final long size = readSize() - (expectChecksum ? 4L : 0L);
258             if (size < 0) {
259                 throw new IOException("Found illegal chunk with negative size");
260             }
261             if (expectChecksum) {
262                 expectedChecksum = unmask(readCrc());
263             } else {
264                 expectedChecksum = -1;
265             }
266             // @formatter:off
267             currentCompressedChunk = new SnappyCompressorInputStream(BoundedInputStream.builder()
268                     .setInputStream(inputStream)
269                     .setMaxCount(size)
270                     .setPropagateClose(false)
271                     .get(),
272                     blockSize);
273             // @formatter:on
274             // constructor reads uncompressed size
275             count(currentCompressedChunk.getBytesRead());
276         } else {
277             // impossible as all potential byte values have been covered
278             throw new IOException("Unknown chunk type " + type + " detected.");
279         }
280     }
281 
282     /**
283      * Reads from the current chunk into the given array.
284      *
285      * @return -1 if there is no current chunk or the number of bytes read from the current chunk (which may be -1 if the end of the chunk is reached).
286      */
287     private int readOnce(final byte[] b, final int off, final int len) throws IOException {
288         int read = -1;
289         if (inUncompressedChunk) {
290             final int amount = Math.min(uncompressedBytesRemaining, len);
291             if (amount == 0) {
292                 return -1;
293             }
294             read = inputStream.read(b, off, amount);
295             if (read != -1) {
296                 uncompressedBytesRemaining -= read;
297                 count(read);
298             }
299         } else if (currentCompressedChunk != null) {
300             final long before = currentCompressedChunk.getBytesRead();
301             read = currentCompressedChunk.read(b, off, len);
302             if (read == -1) {
303                 currentCompressedChunk.close();
304                 currentCompressedChunk = null;
305             } else {
306                 count(currentCompressedChunk.getBytesRead() - before);
307             }
308         }
309         if (read > 0) {
310             checksum.update(b, off, read);
311         }
312         return read;
313     }
314 
315     private int readOneByte() throws IOException {
316         final int b = inputStream.read();
317         if (b != -1) {
318             count(1);
319             return b & 0xFF;
320         }
321         return -1;
322     }
323 
324     private int readSize() throws IOException {
325         return (int) ByteUtils.fromLittleEndian(supplier, 3);
326     }
327 
328     private void readStreamIdentifier() throws IOException {
329         final byte[] b = new byte[10];
330         final int read = IOUtils.readFully(inputStream, b);
331         count(read);
332         if (10 != read || !matches(b, 10)) {
333             throw new IOException("Not a framed Snappy stream");
334         }
335     }
336 
337     private void skipBlock() throws IOException {
338         final int size = readSize();
339         if (size < 0) {
340             throw new IOException("Found illegal chunk with negative size");
341         }
342         final long read = org.apache.commons.io.IOUtils.skip(inputStream, size);
343         count(read);
344         if (read != size) {
345             throw new IOException("Premature end of stream");
346         }
347     }
348 
349     private void verifyLastChecksumAndReset() throws IOException {
350         if (expectedChecksum >= 0 && expectedChecksum != checksum.getValue()) {
351             throw new IOException("Checksum verification failed");
352         }
353         expectedChecksum = -1;
354         checksum.reset();
355     }
356 
357 }