FramedSnappyCompressorInputStream.java

  1. /*
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  * http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing,
  13.  * software distributed under the License is distributed on an
  14.  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15.  * KIND, either express or implied.  See the License for the
  16.  * specific language governing permissions and limitations
  17.  * under the License.
  18.  */
  19. package org.apache.commons.compress.compressors.snappy;

  20. import java.io.IOException;
  21. import java.io.InputStream;
  22. import java.io.PushbackInputStream;
  23. import java.util.Arrays;

  24. import org.apache.commons.codec.digest.PureJavaCrc32C;
  25. import org.apache.commons.compress.compressors.CompressorInputStream;
  26. import org.apache.commons.compress.utils.ByteUtils;
  27. import org.apache.commons.compress.utils.IOUtils;
  28. import org.apache.commons.compress.utils.InputStreamStatistics;
  29. import org.apache.commons.io.input.BoundedInputStream;

  30. /**
  31.  * CompressorInputStream for the framing Snappy format.
  32.  *
  33.  * <p>
  34.  * Based on the "spec" in the version "Last revised: 2013-10-25"
  35.  * </p>
  36.  *
  37.  * @see <a href="https://github.com/google/snappy/blob/master/framing_format.txt">Snappy framing format description</a>
  38.  * @since 1.7
  39.  */
  40. public class FramedSnappyCompressorInputStream extends CompressorInputStream implements InputStreamStatistics {

  41.     /**
  42.      * package private for tests only.
  43.      */
  44.     static final long MASK_OFFSET = 0xa282ead8L;

  45.     private static final int STREAM_IDENTIFIER_TYPE = 0xff;
  46.     static final int COMPRESSED_CHUNK_TYPE = 0;
  47.     private static final int UNCOMPRESSED_CHUNK_TYPE = 1;
  48.     private static final int PADDING_CHUNK_TYPE = 0xfe;
  49.     private static final int MIN_UNSKIPPABLE_TYPE = 2;
  50.     private static final int MAX_UNSKIPPABLE_TYPE = 0x7f;
  51.     private static final int MAX_SKIPPABLE_TYPE = 0xfd;

  52.     // used by FramedSnappyCompressorOutputStream as well
  53.     static final byte[] SZ_SIGNATURE = { // NOSONAR
  54.             (byte) STREAM_IDENTIFIER_TYPE, // tag
  55.             6, 0, 0, // length
  56.             's', 'N', 'a', 'P', 'p', 'Y' };

  57.     /**
  58.      * Checks if the signature matches what is expected for a .sz file.
  59.      *
  60.      * <p>
  61.      * .sz files start with a chunk with tag 0xff and content sNaPpY.
  62.      * </p>
  63.      *
  64.      * @param signature the bytes to check
  65.      * @param length    the number of bytes to check
  66.      * @return true if this is a .sz stream, false otherwise
  67.      */
  68.     public static boolean matches(final byte[] signature, final int length) {

  69.         if (length < SZ_SIGNATURE.length) {
  70.             return false;
  71.         }

  72.         byte[] shortenedSig = signature;
  73.         if (signature.length > SZ_SIGNATURE.length) {
  74.             shortenedSig = Arrays.copyOf(signature, SZ_SIGNATURE.length);
  75.         }

  76.         return Arrays.equals(shortenedSig, SZ_SIGNATURE);
  77.     }

  78.     static long unmask(long x) {
  79.         // ugly, maybe we should just have used ints and deal with the
  80.         // overflow
  81.         x -= MASK_OFFSET;
  82.         x &= 0xffffFFFFL;
  83.         return (x >> 17 | x << 15) & 0xffffFFFFL;
  84.     }

  85.     private long unreadBytes;

  86.     private final BoundedInputStream countingStream;

  87.     /** The underlying stream to read compressed data from */
  88.     private final PushbackInputStream inputStream;

  89.     /** The dialect to expect */
  90.     private final FramedSnappyDialect dialect;

  91.     private SnappyCompressorInputStream currentCompressedChunk;

  92.     // used in no-arg read method
  93.     private final byte[] oneByte = new byte[1];
  94.     private boolean endReached, inUncompressedChunk;
  95.     private int uncompressedBytesRemaining;
  96.     private long expectedChecksum = -1;

  97.     private final int blockSize;

  98.     private final PureJavaCrc32C checksum = new PureJavaCrc32C();

  99.     private final ByteUtils.ByteSupplier supplier = this::readOneByte;

  100.     /**
  101.      * Constructs a new input stream that decompresses snappy-framed-compressed data from the specified input stream using the
  102.      * {@link FramedSnappyDialect#STANDARD} dialect.
  103.      *
  104.      * @param in the InputStream from which to read the compressed data
  105.      * @throws IOException if reading fails
  106.      */
  107.     public FramedSnappyCompressorInputStream(final InputStream in) throws IOException {
  108.         this(in, FramedSnappyDialect.STANDARD);
  109.     }

  110.     /**
  111.      * Constructs a new input stream that decompresses snappy-framed-compressed data from the specified input stream.
  112.      *
  113.      * @param in      the InputStream from which to read the compressed data
  114.      * @param dialect the dialect used by the compressed stream
  115.      * @throws IOException if reading fails
  116.      */
  117.     public FramedSnappyCompressorInputStream(final InputStream in, final FramedSnappyDialect dialect) throws IOException {
  118.         this(in, SnappyCompressorInputStream.DEFAULT_BLOCK_SIZE, dialect);
  119.     }

  120.     /**
  121.      * Constructs a new input stream that decompresses snappy-framed-compressed data from the specified input stream.
  122.      *
  123.      * @param in        the InputStream from which to read the compressed data
  124.      * @param blockSize the block size to use for the compressed stream
  125.      * @param dialect   the dialect used by the compressed stream
  126.      * @throws IOException              if reading fails
  127.      * @throws IllegalArgumentException if blockSize is not bigger than 0
  128.      * @since 1.14
  129.      */
  130.     public FramedSnappyCompressorInputStream(final InputStream in, final int blockSize, final FramedSnappyDialect dialect) throws IOException {
  131.         if (blockSize <= 0) {
  132.             throw new IllegalArgumentException("blockSize must be bigger than 0");
  133.         }
  134.         countingStream = BoundedInputStream.builder().setInputStream(in).get();
  135.         this.inputStream = new PushbackInputStream(countingStream, 1);
  136.         this.blockSize = blockSize;
  137.         this.dialect = dialect;
  138.         if (dialect.hasStreamIdentifier()) {
  139.             readStreamIdentifier();
  140.         }
  141.     }

  142.     /** {@inheritDoc} */
  143.     @Override
  144.     public int available() throws IOException {
  145.         if (inUncompressedChunk) {
  146.             return Math.min(uncompressedBytesRemaining, inputStream.available());
  147.         }
  148.         if (currentCompressedChunk != null) {
  149.             return currentCompressedChunk.available();
  150.         }
  151.         return 0;
  152.     }

  153.     /** {@inheritDoc} */
  154.     @Override
  155.     public void close() throws IOException {
  156.         try {
  157.             if (currentCompressedChunk != null) {
  158.                 currentCompressedChunk.close();
  159.                 currentCompressedChunk = null;
  160.             }
  161.         } finally {
  162.             inputStream.close();
  163.         }
  164.     }

  165.     /**
  166.      * @since 1.17
  167.      */
  168.     @Override
  169.     public long getCompressedCount() {
  170.         return countingStream.getCount() - unreadBytes;
  171.     }

  172.     /** {@inheritDoc} */
  173.     @Override
  174.     public int read() throws IOException {
  175.         return read(oneByte, 0, 1) == -1 ? -1 : oneByte[0] & 0xFF;
  176.     }

  177.     /** {@inheritDoc} */
  178.     @Override
  179.     public int read(final byte[] b, final int off, final int len) throws IOException {
  180.         if (len == 0) {
  181.             return 0;
  182.         }
  183.         int read = readOnce(b, off, len);
  184.         if (read == -1) {
  185.             readNextBlock();
  186.             if (endReached) {
  187.                 return -1;
  188.             }
  189.             read = readOnce(b, off, len);
  190.         }
  191.         return read;
  192.     }

  193.     private long readCrc() throws IOException {
  194.         final byte[] b = new byte[4];
  195.         final int read = IOUtils.readFully(inputStream, b);
  196.         count(read);
  197.         if (read != 4) {
  198.             throw new IOException("Premature end of stream");
  199.         }
  200.         return ByteUtils.fromLittleEndian(b);
  201.     }

  202.     private void readNextBlock() throws IOException {
  203.         verifyLastChecksumAndReset();
  204.         inUncompressedChunk = false;
  205.         final int type = readOneByte();
  206.         if (type == -1) {
  207.             endReached = true;
  208.         } else if (type == STREAM_IDENTIFIER_TYPE) {
  209.             inputStream.unread(type);
  210.             unreadBytes++;
  211.             pushedBackBytes(1);
  212.             readStreamIdentifier();
  213.             readNextBlock();
  214.         } else if (type == PADDING_CHUNK_TYPE || type > MAX_UNSKIPPABLE_TYPE && type <= MAX_SKIPPABLE_TYPE) {
  215.             skipBlock();
  216.             readNextBlock();
  217.         } else if (type >= MIN_UNSKIPPABLE_TYPE && type <= MAX_UNSKIPPABLE_TYPE) {
  218.             throw new IOException("Unskippable chunk with type " + type + " (hex " + Integer.toHexString(type) + ")" + " detected.");
  219.         } else if (type == UNCOMPRESSED_CHUNK_TYPE) {
  220.             inUncompressedChunk = true;
  221.             uncompressedBytesRemaining = readSize() - 4 /* CRC */;
  222.             if (uncompressedBytesRemaining < 0) {
  223.                 throw new IOException("Found illegal chunk with negative size");
  224.             }
  225.             expectedChecksum = unmask(readCrc());
  226.         } else if (type == COMPRESSED_CHUNK_TYPE) {
  227.             final boolean expectChecksum = dialect.usesChecksumWithCompressedChunks();
  228.             final long size = readSize() - (expectChecksum ? 4L : 0L);
  229.             if (size < 0) {
  230.                 throw new IOException("Found illegal chunk with negative size");
  231.             }
  232.             if (expectChecksum) {
  233.                 expectedChecksum = unmask(readCrc());
  234.             } else {
  235.                 expectedChecksum = -1;
  236.             }
  237.             // @formatter:off
  238.             currentCompressedChunk = new SnappyCompressorInputStream(BoundedInputStream.builder()
  239.                     .setInputStream(inputStream)
  240.                     .setMaxCount(size)
  241.                     .setPropagateClose(false)
  242.                     .get(),
  243.                     blockSize);
  244.             // @formatter:on
  245.             // constructor reads uncompressed size
  246.             count(currentCompressedChunk.getBytesRead());
  247.         } else {
  248.             // impossible as all potential byte values have been covered
  249.             throw new IOException("Unknown chunk type " + type + " detected.");
  250.         }
  251.     }

  252.     /**
  253.      * Read from the current chunk into the given array.
  254.      *
  255.      * @return -1 if there is no current chunk or the number of bytes read from the current chunk (which may be -1 if the end of the chunk is reached).
  256.      */
  257.     private int readOnce(final byte[] b, final int off, final int len) throws IOException {
  258.         int read = -1;
  259.         if (inUncompressedChunk) {
  260.             final int amount = Math.min(uncompressedBytesRemaining, len);
  261.             if (amount == 0) {
  262.                 return -1;
  263.             }
  264.             read = inputStream.read(b, off, amount);
  265.             if (read != -1) {
  266.                 uncompressedBytesRemaining -= read;
  267.                 count(read);
  268.             }
  269.         } else if (currentCompressedChunk != null) {
  270.             final long before = currentCompressedChunk.getBytesRead();
  271.             read = currentCompressedChunk.read(b, off, len);
  272.             if (read == -1) {
  273.                 currentCompressedChunk.close();
  274.                 currentCompressedChunk = null;
  275.             } else {
  276.                 count(currentCompressedChunk.getBytesRead() - before);
  277.             }
  278.         }
  279.         if (read > 0) {
  280.             checksum.update(b, off, read);
  281.         }
  282.         return read;
  283.     }

  284.     private int readOneByte() throws IOException {
  285.         final int b = inputStream.read();
  286.         if (b != -1) {
  287.             count(1);
  288.             return b & 0xFF;
  289.         }
  290.         return -1;
  291.     }

  292.     private int readSize() throws IOException {
  293.         return (int) ByteUtils.fromLittleEndian(supplier, 3);
  294.     }

  295.     private void readStreamIdentifier() throws IOException {
  296.         final byte[] b = new byte[10];
  297.         final int read = IOUtils.readFully(inputStream, b);
  298.         count(read);
  299.         if (10 != read || !matches(b, 10)) {
  300.             throw new IOException("Not a framed Snappy stream");
  301.         }
  302.     }

  303.     private void skipBlock() throws IOException {
  304.         final int size = readSize();
  305.         if (size < 0) {
  306.             throw new IOException("Found illegal chunk with negative size");
  307.         }
  308.         final long read = org.apache.commons.io.IOUtils.skip(inputStream, size);
  309.         count(read);
  310.         if (read != size) {
  311.             throw new IOException("Premature end of stream");
  312.         }
  313.     }

  314.     private void verifyLastChecksumAndReset() throws IOException {
  315.         if (expectedChecksum >= 0 && expectedChecksum != checksum.getValue()) {
  316.             throw new IOException("Checksum verification failed");
  317.         }
  318.         expectedChecksum = -1;
  319.         checksum.reset();
  320.     }

  321. }