View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   * http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.commons.compress.compressors.snappy;
20  
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.PushbackInputStream;
24  import java.util.Arrays;
25  
26  import org.apache.commons.codec.digest.PureJavaCrc32C;
27  import org.apache.commons.compress.compressors.CompressorInputStream;
28  import org.apache.commons.compress.utils.ByteUtils;
29  import org.apache.commons.compress.utils.IOUtils;
30  import org.apache.commons.compress.utils.InputStreamStatistics;
31  import org.apache.commons.io.input.BoundedInputStream;
32  
33  /**
34   * CompressorInputStream for the framing Snappy format.
35   *
36   * <p>
37   * Based on the "spec" in the version "Last revised: 2013-10-25"
38   * </p>
39   *
40   * @see <a href="https://github.com/google/snappy/blob/master/framing_format.txt">Snappy framing format description</a>
41   * @since 1.7
42   */
43  public class FramedSnappyCompressorInputStream extends CompressorInputStream implements InputStreamStatistics {
44  
45      /**
46       * package private for tests only.
47       */
48      static final long MASK_OFFSET = 0xa282ead8L;
49  
50      private static final int STREAM_IDENTIFIER_TYPE = 0xff;
51      static final int COMPRESSED_CHUNK_TYPE = 0;
52      private static final int UNCOMPRESSED_CHUNK_TYPE = 1;
53      private static final int PADDING_CHUNK_TYPE = 0xfe;
54      private static final int MIN_UNSKIPPABLE_TYPE = 2;
55      private static final int MAX_UNSKIPPABLE_TYPE = 0x7f;
56      private static final int MAX_SKIPPABLE_TYPE = 0xfd;
57  
58      // used by FramedSnappyCompressorOutputStream as well
59      static final byte[] SZ_SIGNATURE = { // NOSONAR
60              (byte) STREAM_IDENTIFIER_TYPE, // tag
61              6, 0, 0, // length
62              's', 'N', 'a', 'P', 'p', 'Y' };
63  
64      /**
65       * Checks if the signature matches what is expected for a .sz file.
66       *
67       * <p>
68       * .sz files start with a chunk with tag 0xff and content sNaPpY.
69       * </p>
70       *
71       * @param signature the bytes to check
72       * @param length    the number of bytes to check
73       * @return true if this is a .sz stream, false otherwise
74       */
75      public static boolean matches(final byte[] signature, final int length) {
76  
77          if (length < SZ_SIGNATURE.length) {
78              return false;
79          }
80  
81          byte[] shortenedSig = signature;
82          if (signature.length > SZ_SIGNATURE.length) {
83              shortenedSig = Arrays.copyOf(signature, SZ_SIGNATURE.length);
84          }
85  
86          return Arrays.equals(shortenedSig, SZ_SIGNATURE);
87      }
88  
89      static long unmask(long x) {
90          // ugly, maybe we should just have used ints and deal with the
91          // overflow
92          x -= MASK_OFFSET;
93          x &= 0xffffFFFFL;
94          return (x >> 17 | x << 15) & 0xffffFFFFL;
95      }
96  
97      private long unreadBytes;
98  
99      private final BoundedInputStream countingStream;
100 
101     /** The underlying stream to read compressed data from */
102     private final PushbackInputStream inputStream;
103 
104     /** The dialect to expect */
105     private final FramedSnappyDialect dialect;
106 
107     private SnappyCompressorInputStream currentCompressedChunk;
108 
109     // used in no-arg read method
110     private final byte[] oneByte = new byte[1];
111     private boolean endReached, inUncompressedChunk;
112     private int uncompressedBytesRemaining;
113     private long expectedChecksum = -1;
114 
115     private final int blockSize;
116 
117     private final PureJavaCrc32C checksum = new PureJavaCrc32C();
118 
119     private final ByteUtils.ByteSupplier supplier = this::readOneByte;
120 
121     /**
122      * Constructs a new input stream that decompresses snappy-framed-compressed data from the specified input stream using the
123      * {@link FramedSnappyDialect#STANDARD} dialect.
124      *
125      * @param in the InputStream from which to read the compressed data
126      * @throws IOException if reading fails
127      */
128     public FramedSnappyCompressorInputStream(final InputStream in) throws IOException {
129         this(in, FramedSnappyDialect.STANDARD);
130     }
131 
132     /**
133      * Constructs a new input stream that decompresses snappy-framed-compressed data from the specified input stream.
134      *
135      * @param in      the InputStream from which to read the compressed data
136      * @param dialect the dialect used by the compressed stream
137      * @throws IOException if reading fails
138      */
139     public FramedSnappyCompressorInputStream(final InputStream in, final FramedSnappyDialect dialect) throws IOException {
140         this(in, SnappyCompressorInputStream.DEFAULT_BLOCK_SIZE, dialect);
141     }
142 
143     /**
144      * Constructs a new input stream that decompresses snappy-framed-compressed data from the specified input stream.
145      *
146      * @param in        the InputStream from which to read the compressed data
147      * @param blockSize the block size to use for the compressed stream
148      * @param dialect   the dialect used by the compressed stream
149      * @throws IOException              if reading fails
150      * @throws IllegalArgumentException if blockSize is not bigger than 0
151      * @since 1.14
152      */
153     public FramedSnappyCompressorInputStream(final InputStream in, final int blockSize, final FramedSnappyDialect dialect) throws IOException {
154         if (blockSize <= 0) {
155             throw new IllegalArgumentException("blockSize must be bigger than 0");
156         }
157         countingStream = BoundedInputStream.builder().setInputStream(in).get();
158         this.inputStream = new PushbackInputStream(countingStream, 1);
159         this.blockSize = blockSize;
160         this.dialect = dialect;
161         if (dialect.hasStreamIdentifier()) {
162             readStreamIdentifier();
163         }
164     }
165 
166     /** {@inheritDoc} */
167     @Override
168     public int available() throws IOException {
169         if (inUncompressedChunk) {
170             return Math.min(uncompressedBytesRemaining, inputStream.available());
171         }
172         if (currentCompressedChunk != null) {
173             return currentCompressedChunk.available();
174         }
175         return 0;
176     }
177 
178     /** {@inheritDoc} */
179     @Override
180     public void close() throws IOException {
181         try {
182             if (currentCompressedChunk != null) {
183                 currentCompressedChunk.close();
184                 currentCompressedChunk = null;
185             }
186         } finally {
187             inputStream.close();
188         }
189     }
190 
191     /**
192      * @since 1.17
193      */
194     @Override
195     public long getCompressedCount() {
196         return countingStream.getCount() - unreadBytes;
197     }
198 
199     /** {@inheritDoc} */
200     @Override
201     public int read() throws IOException {
202         return read(oneByte, 0, 1) == -1 ? -1 : oneByte[0] & 0xFF;
203     }
204 
205     /** {@inheritDoc} */
206     @Override
207     public int read(final byte[] b, final int off, final int len) throws IOException {
208         if (len == 0) {
209             return 0;
210         }
211         int read = readOnce(b, off, len);
212         if (read == -1) {
213             readNextBlock();
214             if (endReached) {
215                 return -1;
216             }
217             read = readOnce(b, off, len);
218         }
219         return read;
220     }
221 
222     private long readCrc() throws IOException {
223         final byte[] b = new byte[4];
224         final int read = IOUtils.readFully(inputStream, b);
225         count(read);
226         if (read != 4) {
227             throw new IOException("Premature end of stream");
228         }
229         return ByteUtils.fromLittleEndian(b);
230     }
231 
232     private void readNextBlock() throws IOException {
233         verifyLastChecksumAndReset();
234         inUncompressedChunk = false;
235         final int type = readOneByte();
236         if (type == -1) {
237             endReached = true;
238         } else if (type == STREAM_IDENTIFIER_TYPE) {
239             inputStream.unread(type);
240             unreadBytes++;
241             pushedBackBytes(1);
242             readStreamIdentifier();
243             readNextBlock();
244         } else if (type == PADDING_CHUNK_TYPE || type > MAX_UNSKIPPABLE_TYPE && type <= MAX_SKIPPABLE_TYPE) {
245             skipBlock();
246             readNextBlock();
247         } else if (type >= MIN_UNSKIPPABLE_TYPE && type <= MAX_UNSKIPPABLE_TYPE) {
248             throw new IOException("Unskippable chunk with type " + type + " (hex " + Integer.toHexString(type) + ")" + " detected.");
249         } else if (type == UNCOMPRESSED_CHUNK_TYPE) {
250             inUncompressedChunk = true;
251             uncompressedBytesRemaining = readSize() - 4 /* CRC */;
252             if (uncompressedBytesRemaining < 0) {
253                 throw new IOException("Found illegal chunk with negative size");
254             }
255             expectedChecksum = unmask(readCrc());
256         } else if (type == COMPRESSED_CHUNK_TYPE) {
257             final boolean expectChecksum = dialect.usesChecksumWithCompressedChunks();
258             final long size = readSize() - (expectChecksum ? 4L : 0L);
259             if (size < 0) {
260                 throw new IOException("Found illegal chunk with negative size");
261             }
262             if (expectChecksum) {
263                 expectedChecksum = unmask(readCrc());
264             } else {
265                 expectedChecksum = -1;
266             }
267             // @formatter:off
268             currentCompressedChunk = new SnappyCompressorInputStream(BoundedInputStream.builder()
269                     .setInputStream(inputStream)
270                     .setMaxCount(size)
271                     .setPropagateClose(false)
272                     .get(),
273                     blockSize);
274             // @formatter:on
275             // constructor reads uncompressed size
276             count(currentCompressedChunk.getBytesRead());
277         } else {
278             // impossible as all potential byte values have been covered
279             throw new IOException("Unknown chunk type " + type + " detected.");
280         }
281     }
282 
283     /**
284      * Read from the current chunk into the given array.
285      *
286      * @return -1 if there is no current chunk or the number of bytes read from the current chunk (which may be -1 if the end of the chunk is reached).
287      */
288     private int readOnce(final byte[] b, final int off, final int len) throws IOException {
289         int read = -1;
290         if (inUncompressedChunk) {
291             final int amount = Math.min(uncompressedBytesRemaining, len);
292             if (amount == 0) {
293                 return -1;
294             }
295             read = inputStream.read(b, off, amount);
296             if (read != -1) {
297                 uncompressedBytesRemaining -= read;
298                 count(read);
299             }
300         } else if (currentCompressedChunk != null) {
301             final long before = currentCompressedChunk.getBytesRead();
302             read = currentCompressedChunk.read(b, off, len);
303             if (read == -1) {
304                 currentCompressedChunk.close();
305                 currentCompressedChunk = null;
306             } else {
307                 count(currentCompressedChunk.getBytesRead() - before);
308             }
309         }
310         if (read > 0) {
311             checksum.update(b, off, read);
312         }
313         return read;
314     }
315 
316     private int readOneByte() throws IOException {
317         final int b = inputStream.read();
318         if (b != -1) {
319             count(1);
320             return b & 0xFF;
321         }
322         return -1;
323     }
324 
325     private int readSize() throws IOException {
326         return (int) ByteUtils.fromLittleEndian(supplier, 3);
327     }
328 
329     private void readStreamIdentifier() throws IOException {
330         final byte[] b = new byte[10];
331         final int read = IOUtils.readFully(inputStream, b);
332         count(read);
333         if (10 != read || !matches(b, 10)) {
334             throw new IOException("Not a framed Snappy stream");
335         }
336     }
337 
338     private void skipBlock() throws IOException {
339         final int size = readSize();
340         if (size < 0) {
341             throw new IOException("Found illegal chunk with negative size");
342         }
343         final long read = org.apache.commons.io.IOUtils.skip(inputStream, size);
344         count(read);
345         if (read != size) {
346             throw new IOException("Premature end of stream");
347         }
348     }
349 
350     private void verifyLastChecksumAndReset() throws IOException {
351         if (expectedChecksum >= 0 && expectedChecksum != checksum.getValue()) {
352             throw new IOException("Checksum verification failed");
353         }
354         expectedChecksum = -1;
355         checksum.reset();
356     }
357 
358 }