FramedLZ4CompressorInputStream.java

  1. /*
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  * http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing,
  13.  * software distributed under the License is distributed on an
  14.  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15.  * KIND, either express or implied.  See the License for the
  16.  * specific language governing permissions and limitations
  17.  * under the License.
  18.  */
  19. package org.apache.commons.compress.compressors.lz4;

  20. import java.io.IOException;
  21. import java.io.InputStream;
  22. import java.util.Arrays;
  23. import java.util.zip.CheckedInputStream;

  24. import org.apache.commons.compress.compressors.CompressorInputStream;
  25. import org.apache.commons.compress.utils.ByteUtils;
  26. import org.apache.commons.compress.utils.IOUtils;
  27. import org.apache.commons.compress.utils.InputStreamStatistics;
  28. import org.apache.commons.io.input.BoundedInputStream;

  29. /**
  30.  * CompressorInputStream for the LZ4 frame format.
  31.  *
  32.  * <p>
  33.  * Based on the "spec" in the version "1.5.1 (31/03/2015)"
  34.  * </p>
  35.  *
  36.  * @see <a href="https://lz4.github.io/lz4/lz4_Frame_format.html">LZ4 Frame Format Description</a>
  37.  * @since 1.14
  38.  * @NotThreadSafe
  39.  */
  40. public class FramedLZ4CompressorInputStream extends CompressorInputStream implements InputStreamStatistics {

  41.     /** Used by FramedLZ4CompressorOutputStream as well. */
  42.     static final byte[] LZ4_SIGNATURE = { 4, 0x22, 0x4d, 0x18 };
  43.     private static final byte[] SKIPPABLE_FRAME_TRAILER = { 0x2a, 0x4d, 0x18 };
  44.     private static final byte SKIPPABLE_FRAME_PREFIX_BYTE_MASK = 0x50;

  45.     static final int VERSION_MASK = 0xC0;
  46.     static final int SUPPORTED_VERSION = 0x40;
  47.     static final int BLOCK_INDEPENDENCE_MASK = 0x20;
  48.     static final int BLOCK_CHECKSUM_MASK = 0x10;
  49.     static final int CONTENT_SIZE_MASK = 0x08;
  50.     static final int CONTENT_CHECKSUM_MASK = 0x04;
  51.     static final int BLOCK_MAX_SIZE_MASK = 0x70;
  52.     static final int UNCOMPRESSED_FLAG_MASK = 0x80000000;

  53.     private static boolean isSkippableFrameSignature(final byte[] b) {
  54.         if ((b[0] & SKIPPABLE_FRAME_PREFIX_BYTE_MASK) != SKIPPABLE_FRAME_PREFIX_BYTE_MASK) {
  55.             return false;
  56.         }
  57.         for (int i = 1; i < 4; i++) {
  58.             if (b[i] != SKIPPABLE_FRAME_TRAILER[i - 1]) {
  59.                 return false;
  60.             }
  61.         }
  62.         return true;
  63.     }

  64.     /**
  65.      * Checks if the signature matches what is expected for a .lz4 file.
  66.      * <p>
  67.      * .lz4 files start with a four byte signature.
  68.      * </p>
  69.      *
  70.      * @param signature the bytes to check
  71.      * @param length    the number of bytes to check
  72.      * @return true if this is a .sz stream, false otherwise
  73.      */
  74.     public static boolean matches(final byte[] signature, final int length) {

  75.         if (length < LZ4_SIGNATURE.length) {
  76.             return false;
  77.         }

  78.         byte[] shortenedSig = signature;
  79.         if (signature.length > LZ4_SIGNATURE.length) {
  80.             shortenedSig = Arrays.copyOf(signature, LZ4_SIGNATURE.length);
  81.         }

  82.         return Arrays.equals(shortenedSig, LZ4_SIGNATURE);
  83.     }

  84.     /** Used in no-arg read method. */
  85.     private final byte[] oneByte = new byte[1];
  86.     private final ByteUtils.ByteSupplier supplier = this::readOneByte;

  87.     private final BoundedInputStream inputStream;
  88.     private final boolean decompressConcatenated;
  89.     private boolean expectBlockChecksum;
  90.     private boolean expectBlockDependency;

  91.     private boolean expectContentChecksum;

  92.     private InputStream currentBlock;

  93.     private boolean endReached, inUncompressed;

  94.     /** Used for frame header checksum and content checksum, if present. */
  95.     private final org.apache.commons.codec.digest.XXHash32 contentHash = new org.apache.commons.codec.digest.XXHash32();

  96.     /** Used for block checksum, if present. */
  97.     private final org.apache.commons.codec.digest.XXHash32 blockHash = new org.apache.commons.codec.digest.XXHash32();

  98.     /** Only created if the frame doesn't set the block independence flag. */
  99.     private byte[] blockDependencyBuffer;

  100.     /**
  101.      * Creates a new input stream that decompresses streams compressed using the LZ4 frame format and stops after decompressing the first frame.
  102.      *
  103.      * @param in the InputStream from which to read the compressed data
  104.      * @throws IOException if reading fails
  105.      */
  106.     public FramedLZ4CompressorInputStream(final InputStream in) throws IOException {
  107.         this(in, false);
  108.     }

  109.     /**
  110.      * Creates a new input stream that decompresses streams compressed using the LZ4 frame format.
  111.      *
  112.      * @param in                     the InputStream from which to read the compressed data
  113.      * @param decompressConcatenated if true, decompress until the end of the input; if false, stop after the first LZ4 frame and leave the input position to
  114.      *                               point to the next byte after the frame stream
  115.      * @throws IOException if reading fails
  116.      */
  117.     public FramedLZ4CompressorInputStream(final InputStream in, final boolean decompressConcatenated) throws IOException {
  118.         this.inputStream = BoundedInputStream.builder().setInputStream(in).get();
  119.         this.decompressConcatenated = decompressConcatenated;
  120.         init(true);
  121.     }

  122.     private void appendToBlockDependencyBuffer(final byte[] b, final int off, int len) {
  123.         len = Math.min(len, blockDependencyBuffer.length);
  124.         if (len > 0) {
  125.             final int keep = blockDependencyBuffer.length - len;
  126.             if (keep > 0) {
  127.                 // move last keep bytes towards the start of the buffer
  128.                 System.arraycopy(blockDependencyBuffer, len, blockDependencyBuffer, 0, keep);
  129.             }
  130.             // append new data
  131.             System.arraycopy(b, off, blockDependencyBuffer, keep, len);
  132.         }
  133.     }

  134.     /** {@inheritDoc} */
  135.     @Override
  136.     public void close() throws IOException {
  137.         try {
  138.             if (currentBlock != null) {
  139.                 currentBlock.close();
  140.                 currentBlock = null;
  141.             }
  142.         } finally {
  143.             inputStream.close();
  144.         }
  145.     }

  146.     /**
  147.      * @since 1.17
  148.      */
  149.     @Override
  150.     public long getCompressedCount() {
  151.         return inputStream.getCount();
  152.     }

  153.     private void init(final boolean firstFrame) throws IOException {
  154.         if (readSignature(firstFrame)) {
  155.             readFrameDescriptor();
  156.             nextBlock();
  157.         }
  158.     }

  159.     private void maybeFinishCurrentBlock() throws IOException {
  160.         if (currentBlock != null) {
  161.             currentBlock.close();
  162.             currentBlock = null;
  163.             if (expectBlockChecksum) {
  164.                 verifyChecksum(blockHash, "block");
  165.                 blockHash.reset();
  166.             }
  167.         }
  168.     }

  169.     private void nextBlock() throws IOException {
  170.         maybeFinishCurrentBlock();
  171.         final long len = ByteUtils.fromLittleEndian(supplier, 4);
  172.         final boolean uncompressed = (len & UNCOMPRESSED_FLAG_MASK) != 0;
  173.         final int realLen = (int) (len & ~UNCOMPRESSED_FLAG_MASK);
  174.         if (realLen == 0) {
  175.             verifyContentChecksum();
  176.             if (!decompressConcatenated) {
  177.                 endReached = true;
  178.             } else {
  179.                 init(false);
  180.             }
  181.             return;
  182.         }
  183.         // @formatter:off
  184.         InputStream capped = BoundedInputStream.builder()
  185.                 .setInputStream(inputStream)
  186.                 .setMaxCount(realLen)
  187.                 .setPropagateClose(false)
  188.                 .get();
  189.         // @formatter:on
  190.         if (expectBlockChecksum) {
  191.             capped = new CheckedInputStream(capped, blockHash);
  192.         }
  193.         if (uncompressed) {
  194.             inUncompressed = true;
  195.             currentBlock = capped;
  196.         } else {
  197.             inUncompressed = false;
  198.             final BlockLZ4CompressorInputStream s = new BlockLZ4CompressorInputStream(capped);
  199.             if (expectBlockDependency) {
  200.                 s.prefill(blockDependencyBuffer);
  201.             }
  202.             currentBlock = s;
  203.         }
  204.     }

  205.     /** {@inheritDoc} */
  206.     @Override
  207.     public int read() throws IOException {
  208.         return read(oneByte, 0, 1) == -1 ? -1 : oneByte[0] & 0xFF;
  209.     }

  210.     /** {@inheritDoc} */
  211.     @Override
  212.     public int read(final byte[] b, final int off, final int len) throws IOException {
  213.         if (len == 0) {
  214.             return 0;
  215.         }
  216.         if (endReached) {
  217.             return -1;
  218.         }
  219.         int r = readOnce(b, off, len);
  220.         if (r == -1) {
  221.             nextBlock();
  222.             if (!endReached) {
  223.                 r = readOnce(b, off, len);
  224.             }
  225.         }
  226.         if (r != -1) {
  227.             if (expectBlockDependency) {
  228.                 appendToBlockDependencyBuffer(b, off, r);
  229.             }
  230.             if (expectContentChecksum) {
  231.                 contentHash.update(b, off, r);
  232.             }
  233.         }
  234.         return r;
  235.     }

  236.     private void readFrameDescriptor() throws IOException {
  237.         final int flags = readOneByte();
  238.         if (flags == -1) {
  239.             throw new IOException("Premature end of stream while reading frame flags");
  240.         }
  241.         contentHash.update(flags);
  242.         if ((flags & VERSION_MASK) != SUPPORTED_VERSION) {
  243.             throw new IOException("Unsupported version " + (flags >> 6));
  244.         }
  245.         expectBlockDependency = (flags & BLOCK_INDEPENDENCE_MASK) == 0;
  246.         if (expectBlockDependency) {
  247.             if (blockDependencyBuffer == null) {
  248.                 blockDependencyBuffer = new byte[BlockLZ4CompressorInputStream.WINDOW_SIZE];
  249.             }
  250.         } else {
  251.             blockDependencyBuffer = null;
  252.         }
  253.         expectBlockChecksum = (flags & BLOCK_CHECKSUM_MASK) != 0;
  254.         final boolean expectContentSize = (flags & CONTENT_SIZE_MASK) != 0;
  255.         expectContentChecksum = (flags & CONTENT_CHECKSUM_MASK) != 0;
  256.         final int bdByte = readOneByte();
  257.         if (bdByte == -1) { // max size is irrelevant for this implementation
  258.             throw new IOException("Premature end of stream while reading frame BD byte");
  259.         }
  260.         contentHash.update(bdByte);
  261.         if (expectContentSize) { // for now, we don't care, contains the uncompressed size
  262.             final byte[] contentSize = new byte[8];
  263.             final int skipped = IOUtils.readFully(inputStream, contentSize);
  264.             count(skipped);
  265.             if (8 != skipped) {
  266.                 throw new IOException("Premature end of stream while reading content size");
  267.             }
  268.             contentHash.update(contentSize, 0, contentSize.length);
  269.         }
  270.         final int headerHash = readOneByte();
  271.         if (headerHash == -1) { // partial hash of header.
  272.             throw new IOException("Premature end of stream while reading frame header checksum");
  273.         }
  274.         final int expectedHash = (int) (contentHash.getValue() >> 8 & 0xff);
  275.         contentHash.reset();
  276.         if (headerHash != expectedHash) {
  277.             throw new IOException("Frame header checksum mismatch");
  278.         }
  279.     }

  280.     private int readOnce(final byte[] b, final int off, final int len) throws IOException {
  281.         if (inUncompressed) {
  282.             final int cnt = currentBlock.read(b, off, len);
  283.             count(cnt);
  284.             return cnt;
  285.         }
  286.         final BlockLZ4CompressorInputStream l = (BlockLZ4CompressorInputStream) currentBlock;
  287.         final long before = l.getBytesRead();
  288.         final int cnt = currentBlock.read(b, off, len);
  289.         count(l.getBytesRead() - before);
  290.         return cnt;
  291.     }

  292.     private int readOneByte() throws IOException {
  293.         final int b = inputStream.read();
  294.         if (b != -1) {
  295.             count(1);
  296.             return b & 0xFF;
  297.         }
  298.         return -1;
  299.     }

  300.     private boolean readSignature(final boolean firstFrame) throws IOException {
  301.         final String garbageMessage = firstFrame ? "Not a LZ4 frame stream" : "LZ4 frame stream followed by garbage";
  302.         final byte[] b = new byte[4];
  303.         int read = IOUtils.readFully(inputStream, b);
  304.         count(read);
  305.         if (0 == read && !firstFrame) {
  306.             // good LZ4 frame and nothing after it
  307.             endReached = true;
  308.             return false;
  309.         }
  310.         if (4 != read) {
  311.             throw new IOException(garbageMessage);
  312.         }

  313.         read = skipSkippableFrame(b);
  314.         if (0 == read && !firstFrame) {
  315.             // good LZ4 frame with only some skippable frames after it
  316.             endReached = true;
  317.             return false;
  318.         }
  319.         if (4 != read || !matches(b, 4)) {
  320.             throw new IOException(garbageMessage);
  321.         }
  322.         return true;
  323.     }

  324.     /**
  325.      * Skips over the contents of a skippable frame as well as skippable frames following it.
  326.      * <p>
  327.      * It then tries to read four more bytes which are supposed to hold an LZ4 signature and returns the number of bytes read while storing the bytes in the
  328.      * given array.
  329.      * </p>
  330.      */
  331.     private int skipSkippableFrame(final byte[] b) throws IOException {
  332.         int read = 4;
  333.         while (read == 4 && isSkippableFrameSignature(b)) {
  334.             final long len = ByteUtils.fromLittleEndian(supplier, 4);
  335.             if (len < 0) {
  336.                 throw new IOException("Found illegal skippable frame with negative size");
  337.             }
  338.             final long skipped = org.apache.commons.io.IOUtils.skip(inputStream, len);
  339.             count(skipped);
  340.             if (len != skipped) {
  341.                 throw new IOException("Premature end of stream while skipping frame");
  342.             }
  343.             read = IOUtils.readFully(inputStream, b);
  344.             count(read);
  345.         }
  346.         return read;
  347.     }

  348.     private void verifyChecksum(final org.apache.commons.codec.digest.XXHash32 hash, final String kind) throws IOException {
  349.         final byte[] checksum = new byte[4];
  350.         final int read = IOUtils.readFully(inputStream, checksum);
  351.         count(read);
  352.         if (4 != read) {
  353.             throw new IOException("Premature end of stream while reading " + kind + " checksum");
  354.         }
  355.         final long expectedHash = hash.getValue();
  356.         if (expectedHash != ByteUtils.fromLittleEndian(checksum)) {
  357.             throw new IOException(kind + " checksum mismatch.");
  358.         }
  359.     }

  360.     private void verifyContentChecksum() throws IOException {
  361.         if (expectContentChecksum) {
  362.             verifyChecksum(contentHash, "content");
  363.         }
  364.         contentHash.reset();
  365.     }
  366. }