001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.commons.compress.compressors.snappy;
020
021import java.io.IOException;
022import java.io.InputStream;
023import java.io.PushbackInputStream;
024import java.util.Arrays;
025
026import org.apache.commons.compress.compressors.CompressorInputStream;
027import org.apache.commons.compress.utils.BoundedInputStream;
028import org.apache.commons.compress.utils.ByteUtils;
029import org.apache.commons.compress.utils.CountingInputStream;
030import org.apache.commons.compress.utils.IOUtils;
031import org.apache.commons.compress.utils.InputStreamStatistics;
032
033/**
034 * CompressorInputStream for the framing Snappy format.
035 *
036 * <p>Based on the "spec" in the version "Last revised: 2013-10-25"</p>
037 *
038 * @see <a href="https://github.com/google/snappy/blob/master/framing_format.txt">Snappy framing format description</a>
039 * @since 1.7
040 */
041public class FramedSnappyCompressorInputStream extends CompressorInputStream
042    implements InputStreamStatistics {
043
044    /**
045     * package private for tests only.
046     */
047    static final long MASK_OFFSET = 0xa282ead8L;
048
049    private static final int STREAM_IDENTIFIER_TYPE = 0xff;
050    static final int COMPRESSED_CHUNK_TYPE = 0;
051    private static final int UNCOMPRESSED_CHUNK_TYPE = 1;
052    private static final int PADDING_CHUNK_TYPE = 0xfe;
053    private static final int MIN_UNSKIPPABLE_TYPE = 2;
054    private static final int MAX_UNSKIPPABLE_TYPE = 0x7f;
055    private static final int MAX_SKIPPABLE_TYPE = 0xfd;
056
057    // used by FramedSnappyCompressorOutputStream as well
058    static final byte[] SZ_SIGNATURE = new byte[] { //NOSONAR
059        (byte) STREAM_IDENTIFIER_TYPE, // tag
060        6, 0, 0, // length
061        's', 'N', 'a', 'P', 'p', 'Y'
062    };
063
064    private long unreadBytes;
065    private final CountingInputStream countingStream;
066
067    /** The underlying stream to read compressed data from */
068    private final PushbackInputStream in;
069
070    /** The dialect to expect */
071    private final FramedSnappyDialect dialect;
072
073    private SnappyCompressorInputStream currentCompressedChunk;
074
075    // used in no-arg read method
076    private final byte[] oneByte = new byte[1];
077
078    private boolean endReached, inUncompressedChunk;
079
080    private int uncompressedBytesRemaining;
081    private long expectedChecksum = -1;
082    private final int blockSize;
083    private final PureJavaCrc32C checksum = new PureJavaCrc32C();
084
085    private final ByteUtils.ByteSupplier supplier = new ByteUtils.ByteSupplier() {
086        @Override
087        public int getAsByte() throws IOException {
088            return readOneByte();
089        }
090    };
091
092    /**
093     * Constructs a new input stream that decompresses
094     * snappy-framed-compressed data from the specified input stream
095     * using the {@link FramedSnappyDialect#STANDARD} dialect.
096     * @param in  the InputStream from which to read the compressed data
097     * @throws IOException if reading fails
098     */
099    public FramedSnappyCompressorInputStream(final InputStream in) throws IOException {
100        this(in, FramedSnappyDialect.STANDARD);
101    }
102
103    /**
104     * Constructs a new input stream that decompresses snappy-framed-compressed data
105     * from the specified input stream.
106     * @param in  the InputStream from which to read the compressed data
107     * @param dialect the dialect used by the compressed stream
108     * @throws IOException if reading fails
109     */
110    public FramedSnappyCompressorInputStream(final InputStream in,
111                                             final FramedSnappyDialect dialect)
112        throws IOException {
113        this(in, SnappyCompressorInputStream.DEFAULT_BLOCK_SIZE, dialect);
114    }
115
116    /**
117     * Constructs a new input stream that decompresses snappy-framed-compressed data
118     * from the specified input stream.
119     * @param in  the InputStream from which to read the compressed data
120     * @param blockSize the block size to use for the compressed stream
121     * @param dialect the dialect used by the compressed stream
122     * @throws IOException if reading fails
123     * @throws IllegalArgumentException if blockSize is not bigger than 0
124     * @since 1.14
125     */
126    public FramedSnappyCompressorInputStream(final InputStream in,
127                                             final int blockSize,
128                                             final FramedSnappyDialect dialect)
129        throws IOException {
130        if (blockSize <= 0) {
131            throw new IllegalArgumentException("blockSize must be bigger than 0");
132        }
133        countingStream = new CountingInputStream(in);
134        this.in = new PushbackInputStream(countingStream, 1);
135        this.blockSize = blockSize;
136        this.dialect = dialect;
137        if (dialect.hasStreamIdentifier()) {
138            readStreamIdentifier();
139        }
140    }
141
142    /** {@inheritDoc} */
143    @Override
144    public int read() throws IOException {
145        return read(oneByte, 0, 1) == -1 ? -1 : oneByte[0] & 0xFF;
146    }
147
148    /** {@inheritDoc} */
149    @Override
150    public void close() throws IOException {
151        try {
152            if (currentCompressedChunk != null) {
153                currentCompressedChunk.close();
154                currentCompressedChunk = null;
155            }
156        } finally {
157            in.close();
158        }
159    }
160
161    /** {@inheritDoc} */
162    @Override
163    public int read(final byte[] b, final int off, final int len) throws IOException {
164        int read = readOnce(b, off, len);
165        if (read == -1) {
166            readNextBlock();
167            if (endReached) {
168                return -1;
169            }
170            read = readOnce(b, off, len);
171        }
172        return read;
173    }
174
175    /** {@inheritDoc} */
176    @Override
177    public int available() throws IOException {
178        if (inUncompressedChunk) {
179            return Math.min(uncompressedBytesRemaining,
180                            in.available());
181        } else if (currentCompressedChunk != null) {
182            return currentCompressedChunk.available();
183        }
184        return 0;
185    }
186
187    /**
188     * @since 1.17
189     */
190    @Override
191    public long getCompressedCount() {
192        return countingStream.getBytesRead() - unreadBytes;
193    }
194
195    /**
196     * Read from the current chunk into the given array.
197     *
198     * @return -1 if there is no current chunk or the number of bytes
199     * read from the current chunk (which may be -1 if the end of the
200     * chunk is reached).
201     */
202    private int readOnce(final byte[] b, final int off, final int len) throws IOException {
203        int read = -1;
204        if (inUncompressedChunk) {
205            final int amount = Math.min(uncompressedBytesRemaining, len);
206            if (amount == 0) {
207                return -1;
208            }
209            read = in.read(b, off, amount);
210            if (read != -1) {
211                uncompressedBytesRemaining -= read;
212                count(read);
213            }
214        } else if (currentCompressedChunk != null) {
215            final long before = currentCompressedChunk.getBytesRead();
216            read = currentCompressedChunk.read(b, off, len);
217            if (read == -1) {
218                currentCompressedChunk.close();
219                currentCompressedChunk = null;
220            } else {
221                count(currentCompressedChunk.getBytesRead() - before);
222            }
223        }
224        if (read > 0) {
225            checksum.update(b, off, read);
226        }
227        return read;
228    }
229
230    private void readNextBlock() throws IOException {
231        verifyLastChecksumAndReset();
232        inUncompressedChunk = false;
233        final int type = readOneByte();
234        if (type == -1) {
235            endReached = true;
236        } else if (type == STREAM_IDENTIFIER_TYPE) {
237            in.unread(type);
238            unreadBytes++;
239            pushedBackBytes(1);
240            readStreamIdentifier();
241            readNextBlock();
242        } else if (type == PADDING_CHUNK_TYPE
243                   || (type > MAX_UNSKIPPABLE_TYPE && type <= MAX_SKIPPABLE_TYPE)) {
244            skipBlock();
245            readNextBlock();
246        } else if (type >= MIN_UNSKIPPABLE_TYPE && type <= MAX_UNSKIPPABLE_TYPE) {
247            throw new IOException("Unskippable chunk with type " + type
248                                  + " (hex " + Integer.toHexString(type) + ")"
249                                  + " detected.");
250        } else if (type == UNCOMPRESSED_CHUNK_TYPE) {
251            inUncompressedChunk = true;
252            uncompressedBytesRemaining = readSize() - 4 /* CRC */;
253            if (uncompressedBytesRemaining < 0) {
254                throw new IOException("Found illegal chunk with negative size");
255            }
256            expectedChecksum = unmask(readCrc());
257        } else if (type == COMPRESSED_CHUNK_TYPE) {
258            final boolean expectChecksum = dialect.usesChecksumWithCompressedChunks();
259            final long size = readSize() - (expectChecksum ? 4L : 0L);
260            if (size < 0) {
261                throw new IOException("Found illegal chunk with negative size");
262            }
263            if (expectChecksum) {
264                expectedChecksum = unmask(readCrc());
265            } else {
266                expectedChecksum = -1;
267            }
268            currentCompressedChunk =
269                new SnappyCompressorInputStream(new BoundedInputStream(in, size), blockSize);
270            // constructor reads uncompressed size
271            count(currentCompressedChunk.getBytesRead());
272        } else {
273            // impossible as all potential byte values have been covered
274            throw new IOException("Unknown chunk type " + type
275                                  + " detected.");
276        }
277    }
278
279    private long readCrc() throws IOException {
280        final byte[] b = new byte[4];
281        final int read = IOUtils.readFully(in, b);
282        count(read);
283        if (read != 4) {
284            throw new IOException("Premature end of stream");
285        }
286        return ByteUtils.fromLittleEndian(b);
287    }
288
289    static long unmask(long x) {
290        // ugly, maybe we should just have used ints and deal with the
291        // overflow
292        x -= MASK_OFFSET;
293        x &= 0xffffFFFFL;
294        return ((x >> 17) | (x << 15)) & 0xffffFFFFL;
295    }
296
297    private int readSize() throws IOException {
298        return (int) ByteUtils.fromLittleEndian(supplier, 3);
299    }
300
301    private void skipBlock() throws IOException {
302        final int size = readSize();
303        if (size < 0) {
304            throw new IOException("Found illegal chunk with negative size");
305        }
306        final long read = IOUtils.skip(in, size);
307        count(read);
308        if (read != size) {
309            throw new IOException("Premature end of stream");
310        }
311    }
312
313    private void readStreamIdentifier() throws IOException {
314        final byte[] b = new byte[10];
315        final int read = IOUtils.readFully(in, b);
316        count(read);
317        if (10 != read || !matches(b, 10)) {
318            throw new IOException("Not a framed Snappy stream");
319        }
320    }
321
322    private int readOneByte() throws IOException {
323        final int b = in.read();
324        if (b != -1) {
325            count(1);
326            return b & 0xFF;
327        }
328        return -1;
329    }
330
331    private void verifyLastChecksumAndReset() throws IOException {
332        if (expectedChecksum >= 0 && expectedChecksum != checksum.getValue()) {
333            throw new IOException("Checksum verification failed");
334        }
335        expectedChecksum = -1;
336        checksum.reset();
337    }
338
339    /**
340     * Checks if the signature matches what is expected for a .sz file.
341     *
342     * <p>.sz files start with a chunk with tag 0xff and content sNaPpY.</p>
343     *
344     * @param signature the bytes to check
345     * @param length    the number of bytes to check
346     * @return          true if this is a .sz stream, false otherwise
347     */
348    public static boolean matches(final byte[] signature, final int length) {
349
350        if (length < SZ_SIGNATURE.length) {
351            return false;
352        }
353
354        byte[] shortenedSig = signature;
355        if (signature.length > SZ_SIGNATURE.length) {
356            shortenedSig = new byte[SZ_SIGNATURE.length];
357            System.arraycopy(signature, 0, shortenedSig, 0, SZ_SIGNATURE.length);
358        }
359
360        return Arrays.equals(shortenedSig, SZ_SIGNATURE);
361    }
362
363}