001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.commons.compress.compressors.snappy;
020
021import java.io.IOException;
022import java.io.InputStream;
023import java.io.PushbackInputStream;
024import java.util.Arrays;
025
026import org.apache.commons.codec.digest.PureJavaCrc32C;
027import org.apache.commons.compress.compressors.CompressorInputStream;
028import org.apache.commons.compress.utils.BoundedInputStream;
029import org.apache.commons.compress.utils.ByteUtils;
030import org.apache.commons.compress.utils.IOUtils;
031import org.apache.commons.compress.utils.InputStreamStatistics;
032import org.apache.commons.io.input.CountingInputStream;
033
034/**
035 * CompressorInputStream for the framing Snappy format.
036 *
037 * <p>
038 * Based on the "spec" in the version "Last revised: 2013-10-25"
039 * </p>
040 *
041 * @see <a href="https://github.com/google/snappy/blob/master/framing_format.txt">Snappy framing format description</a>
042 * @since 1.7
043 */
044public class FramedSnappyCompressorInputStream extends CompressorInputStream implements InputStreamStatistics {
045
046    /**
047     * package private for tests only.
048     */
049    static final long MASK_OFFSET = 0xa282ead8L;
050
051    private static final int STREAM_IDENTIFIER_TYPE = 0xff;
052    static final int COMPRESSED_CHUNK_TYPE = 0;
053    private static final int UNCOMPRESSED_CHUNK_TYPE = 1;
054    private static final int PADDING_CHUNK_TYPE = 0xfe;
055    private static final int MIN_UNSKIPPABLE_TYPE = 2;
056    private static final int MAX_UNSKIPPABLE_TYPE = 0x7f;
057    private static final int MAX_SKIPPABLE_TYPE = 0xfd;
058
059    // used by FramedSnappyCompressorOutputStream as well
060    static final byte[] SZ_SIGNATURE = { // NOSONAR
061            (byte) STREAM_IDENTIFIER_TYPE, // tag
062            6, 0, 0, // length
063            's', 'N', 'a', 'P', 'p', 'Y' };
064
065    /**
066     * Checks if the signature matches what is expected for a .sz file.
067     *
068     * <p>
069     * .sz files start with a chunk with tag 0xff and content sNaPpY.
070     * </p>
071     *
072     * @param signature the bytes to check
073     * @param length    the number of bytes to check
074     * @return true if this is a .sz stream, false otherwise
075     */
076    public static boolean matches(final byte[] signature, final int length) {
077
078        if (length < SZ_SIGNATURE.length) {
079            return false;
080        }
081
082        byte[] shortenedSig = signature;
083        if (signature.length > SZ_SIGNATURE.length) {
084            shortenedSig = Arrays.copyOf(signature, SZ_SIGNATURE.length);
085        }
086
087        return Arrays.equals(shortenedSig, SZ_SIGNATURE);
088    }
089
090    static long unmask(long x) {
091        // ugly, maybe we should just have used ints and deal with the
092        // overflow
093        x -= MASK_OFFSET;
094        x &= 0xffffFFFFL;
095        return (x >> 17 | x << 15) & 0xffffFFFFL;
096    }
097
098    private long unreadBytes;
099
100    private final CountingInputStream countingStream;
101
102    /** The underlying stream to read compressed data from */
103    private final PushbackInputStream inputStream;
104
105    /** The dialect to expect */
106    private final FramedSnappyDialect dialect;
107
108    private SnappyCompressorInputStream currentCompressedChunk;
109
110    // used in no-arg read method
111    private final byte[] oneByte = new byte[1];
112    private boolean endReached, inUncompressedChunk;
113    private int uncompressedBytesRemaining;
114    private long expectedChecksum = -1;
115
116    private final int blockSize;
117
118    private final PureJavaCrc32C checksum = new PureJavaCrc32C();
119
120    private final ByteUtils.ByteSupplier supplier = this::readOneByte;
121
122    /**
123     * Constructs a new input stream that decompresses snappy-framed-compressed data from the specified input stream using the
124     * {@link FramedSnappyDialect#STANDARD} dialect.
125     *
126     * @param in the InputStream from which to read the compressed data
127     * @throws IOException if reading fails
128     */
129    public FramedSnappyCompressorInputStream(final InputStream in) throws IOException {
130        this(in, FramedSnappyDialect.STANDARD);
131    }
132
133    /**
134     * Constructs a new input stream that decompresses snappy-framed-compressed data from the specified input stream.
135     *
136     * @param in      the InputStream from which to read the compressed data
137     * @param dialect the dialect used by the compressed stream
138     * @throws IOException if reading fails
139     */
140    public FramedSnappyCompressorInputStream(final InputStream in, final FramedSnappyDialect dialect) throws IOException {
141        this(in, SnappyCompressorInputStream.DEFAULT_BLOCK_SIZE, dialect);
142    }
143
144    /**
145     * Constructs a new input stream that decompresses snappy-framed-compressed data from the specified input stream.
146     *
147     * @param in        the InputStream from which to read the compressed data
148     * @param blockSize the block size to use for the compressed stream
149     * @param dialect   the dialect used by the compressed stream
150     * @throws IOException              if reading fails
151     * @throws IllegalArgumentException if blockSize is not bigger than 0
152     * @since 1.14
153     */
154    public FramedSnappyCompressorInputStream(final InputStream in, final int blockSize, final FramedSnappyDialect dialect) throws IOException {
155        if (blockSize <= 0) {
156            throw new IllegalArgumentException("blockSize must be bigger than 0");
157        }
158        countingStream = new CountingInputStream(in);
159        this.inputStream = new PushbackInputStream(countingStream, 1);
160        this.blockSize = blockSize;
161        this.dialect = dialect;
162        if (dialect.hasStreamIdentifier()) {
163            readStreamIdentifier();
164        }
165    }
166
167    /** {@inheritDoc} */
168    @Override
169    public int available() throws IOException {
170        if (inUncompressedChunk) {
171            return Math.min(uncompressedBytesRemaining, inputStream.available());
172        }
173        if (currentCompressedChunk != null) {
174            return currentCompressedChunk.available();
175        }
176        return 0;
177    }
178
179    /** {@inheritDoc} */
180    @Override
181    public void close() throws IOException {
182        try {
183            if (currentCompressedChunk != null) {
184                currentCompressedChunk.close();
185                currentCompressedChunk = null;
186            }
187        } finally {
188            inputStream.close();
189        }
190    }
191
192    /**
193     * @since 1.17
194     */
195    @Override
196    public long getCompressedCount() {
197        return countingStream.getByteCount() - unreadBytes;
198    }
199
200    /** {@inheritDoc} */
201    @Override
202    public int read() throws IOException {
203        return read(oneByte, 0, 1) == -1 ? -1 : oneByte[0] & 0xFF;
204    }
205
206    /** {@inheritDoc} */
207    @Override
208    public int read(final byte[] b, final int off, final int len) throws IOException {
209        if (len == 0) {
210            return 0;
211        }
212        int read = readOnce(b, off, len);
213        if (read == -1) {
214            readNextBlock();
215            if (endReached) {
216                return -1;
217            }
218            read = readOnce(b, off, len);
219        }
220        return read;
221    }
222
223    private long readCrc() throws IOException {
224        final byte[] b = new byte[4];
225        final int read = IOUtils.readFully(inputStream, b);
226        count(read);
227        if (read != 4) {
228            throw new IOException("Premature end of stream");
229        }
230        return ByteUtils.fromLittleEndian(b);
231    }
232
233    private void readNextBlock() throws IOException {
234        verifyLastChecksumAndReset();
235        inUncompressedChunk = false;
236        final int type = readOneByte();
237        if (type == -1) {
238            endReached = true;
239        } else if (type == STREAM_IDENTIFIER_TYPE) {
240            inputStream.unread(type);
241            unreadBytes++;
242            pushedBackBytes(1);
243            readStreamIdentifier();
244            readNextBlock();
245        } else if (type == PADDING_CHUNK_TYPE || type > MAX_UNSKIPPABLE_TYPE && type <= MAX_SKIPPABLE_TYPE) {
246            skipBlock();
247            readNextBlock();
248        } else if (type >= MIN_UNSKIPPABLE_TYPE && type <= MAX_UNSKIPPABLE_TYPE) {
249            throw new IOException("Unskippable chunk with type " + type + " (hex " + Integer.toHexString(type) + ")" + " detected.");
250        } else if (type == UNCOMPRESSED_CHUNK_TYPE) {
251            inUncompressedChunk = true;
252            uncompressedBytesRemaining = readSize() - 4 /* CRC */;
253            if (uncompressedBytesRemaining < 0) {
254                throw new IOException("Found illegal chunk with negative size");
255            }
256            expectedChecksum = unmask(readCrc());
257        } else if (type == COMPRESSED_CHUNK_TYPE) {
258            final boolean expectChecksum = dialect.usesChecksumWithCompressedChunks();
259            final long size = readSize() - (expectChecksum ? 4L : 0L);
260            if (size < 0) {
261                throw new IOException("Found illegal chunk with negative size");
262            }
263            if (expectChecksum) {
264                expectedChecksum = unmask(readCrc());
265            } else {
266                expectedChecksum = -1;
267            }
268            currentCompressedChunk = new SnappyCompressorInputStream(new BoundedInputStream(inputStream, size), blockSize);
269            // constructor reads uncompressed size
270            count(currentCompressedChunk.getBytesRead());
271        } else {
272            // impossible as all potential byte values have been covered
273            throw new IOException("Unknown chunk type " + type + " detected.");
274        }
275    }
276
277    /**
278     * Read from the current chunk into the given array.
279     *
280     * @return -1 if there is no current chunk or the number of bytes read from the current chunk (which may be -1 if the end of the chunk is reached).
281     */
282    private int readOnce(final byte[] b, final int off, final int len) throws IOException {
283        int read = -1;
284        if (inUncompressedChunk) {
285            final int amount = Math.min(uncompressedBytesRemaining, len);
286            if (amount == 0) {
287                return -1;
288            }
289            read = inputStream.read(b, off, amount);
290            if (read != -1) {
291                uncompressedBytesRemaining -= read;
292                count(read);
293            }
294        } else if (currentCompressedChunk != null) {
295            final long before = currentCompressedChunk.getBytesRead();
296            read = currentCompressedChunk.read(b, off, len);
297            if (read == -1) {
298                currentCompressedChunk.close();
299                currentCompressedChunk = null;
300            } else {
301                count(currentCompressedChunk.getBytesRead() - before);
302            }
303        }
304        if (read > 0) {
305            checksum.update(b, off, read);
306        }
307        return read;
308    }
309
310    private int readOneByte() throws IOException {
311        final int b = inputStream.read();
312        if (b != -1) {
313            count(1);
314            return b & 0xFF;
315        }
316        return -1;
317    }
318
319    private int readSize() throws IOException {
320        return (int) ByteUtils.fromLittleEndian(supplier, 3);
321    }
322
323    private void readStreamIdentifier() throws IOException {
324        final byte[] b = new byte[10];
325        final int read = IOUtils.readFully(inputStream, b);
326        count(read);
327        if (10 != read || !matches(b, 10)) {
328            throw new IOException("Not a framed Snappy stream");
329        }
330    }
331
332    private void skipBlock() throws IOException {
333        final int size = readSize();
334        if (size < 0) {
335            throw new IOException("Found illegal chunk with negative size");
336        }
337        final long read = org.apache.commons.io.IOUtils.skip(inputStream, size);
338        count(read);
339        if (read != size) {
340            throw new IOException("Premature end of stream");
341        }
342    }
343
344    private void verifyLastChecksumAndReset() throws IOException {
345        if (expectedChecksum >= 0 && expectedChecksum != checksum.getValue()) {
346            throw new IOException("Checksum verification failed");
347        }
348        expectedChecksum = -1;
349        checksum.reset();
350    }
351
352}