FramedSnappyCompressorOutputStream.java

  1. /*
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  * http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing,
  13.  * software distributed under the License is distributed on an
  14.  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15.  * KIND, either express or implied.  See the License for the
  16.  * specific language governing permissions and limitations
  17.  * under the License.
  18.  */
  19. package org.apache.commons.compress.compressors.snappy;

  20. import java.io.ByteArrayOutputStream;
  21. import java.io.IOException;
  22. import java.io.OutputStream;

  23. import org.apache.commons.codec.digest.PureJavaCrc32C;
  24. import org.apache.commons.compress.compressors.CompressorOutputStream;
  25. import org.apache.commons.compress.compressors.lz77support.Parameters;
  26. import org.apache.commons.compress.utils.ByteUtils;

  27. /**
  28.  * CompressorOutputStream for the framing Snappy format.
  29.  *
  30.  * <p>
  31.  * Based on the "spec" in the version "Last revised: 2013-10-25"
  32.  * </p>
  33.  *
  34.  * @see <a href="https://github.com/google/snappy/blob/master/framing_format.txt">Snappy framing format description</a>
  35.  * @since 1.14
  36.  * @NotThreadSafe
  37.  */
  38. public class FramedSnappyCompressorOutputStream extends CompressorOutputStream<OutputStream> {
  39.     // see spec:
  40.     // > However, we place an additional restriction that the uncompressed data
  41.     // > in a chunk must be no longer than 65,536 bytes. This allows consumers to
  42.     // > easily use small fixed-size buffers.
  43.     private static final int MAX_COMPRESSED_BUFFER_SIZE = 1 << 16;

  44.     static long mask(long x) {
  45.         // ugly, maybe we should just have used ints and deal with the
  46.         // overflow
  47.         x = x >> 15 | x << 17;
  48.         x += FramedSnappyCompressorInputStream.MASK_OFFSET;
  49.         x &= 0xffffFFFFL;
  50.         return x;
  51.     }

  52.     private final Parameters params;
  53.     private final PureJavaCrc32C checksum = new PureJavaCrc32C();
  54.     // used in one-arg write method
  55.     private final byte[] oneByte = new byte[1];
  56.     private final byte[] buffer = new byte[MAX_COMPRESSED_BUFFER_SIZE];

  57.     private int currentIndex;

  58.     private final ByteUtils.ByteConsumer consumer;

  59.     /**
  60.      * Constructs a new output stream that compresses snappy-framed-compressed data to the specified output stream.
  61.      *
  62.      * @param out the OutputStream to which to write the compressed data
  63.      * @throws IOException if writing the signature fails
  64.      */
  65.     public FramedSnappyCompressorOutputStream(final OutputStream out) throws IOException {
  66.         this(out, SnappyCompressorOutputStream.createParameterBuilder(SnappyCompressorInputStream.DEFAULT_BLOCK_SIZE).build());
  67.     }

  68.     /**
  69.      * Constructs a new output stream that compresses snappy-framed-compressed data to the specified output stream.
  70.      *
  71.      * @param out    the OutputStream to which to write the compressed data
  72.      * @param params parameters used to fine-tune compression, in particular to balance compression ratio vs compression speed.
  73.      * @throws IOException if writing the signature fails
  74.      */
  75.     public FramedSnappyCompressorOutputStream(final OutputStream out, final Parameters params) throws IOException {
  76.         super(out);
  77.         this.params = params;
  78.         consumer = new ByteUtils.OutputStreamByteConsumer(out);
  79.         out.write(FramedSnappyCompressorInputStream.SZ_SIGNATURE);
  80.     }

  81.     @Override
  82.     public void close() throws IOException {
  83.         try {
  84.             finish();
  85.         } finally {
  86.             out.close();
  87.         }
  88.     }

  89.     /**
  90.      * Compresses all remaining data and writes it to the stream, doesn't close the underlying stream.
  91.      *
  92.      * @throws IOException if an error occurs
  93.      */
  94.     public void finish() throws IOException {
  95.         flushBuffer();
  96.     }

  97.     private void flushBuffer() throws IOException {
  98.         if (currentIndex == 0) {
  99.             return;
  100.         }
  101.         out.write(FramedSnappyCompressorInputStream.COMPRESSED_CHUNK_TYPE);
  102.         final ByteArrayOutputStream baos = new ByteArrayOutputStream();
  103.         try (OutputStream o = new SnappyCompressorOutputStream(baos, currentIndex, params)) {
  104.             o.write(buffer, 0, currentIndex);
  105.         }
  106.         final byte[] b = baos.toByteArray();
  107.         writeLittleEndian(3, b.length + 4L /* CRC */);
  108.         writeCrc();
  109.         out.write(b);
  110.         currentIndex = 0;
  111.     }

  112.     @Override
  113.     public void write(final byte[] data, int off, int len) throws IOException {
  114.         int blockDataRemaining = buffer.length - currentIndex;
  115.         while (len > 0) {
  116.             final int copyLen = Math.min(len, blockDataRemaining);
  117.             System.arraycopy(data, off, buffer, currentIndex, copyLen);
  118.             off += copyLen;
  119.             blockDataRemaining -= copyLen;
  120.             len -= copyLen;
  121.             currentIndex += copyLen;
  122.             if (blockDataRemaining == 0) {
  123.                 flushBuffer();
  124.                 blockDataRemaining = buffer.length;
  125.             }
  126.         }
  127.     }

  128.     @Override
  129.     public void write(final int b) throws IOException {
  130.         oneByte[0] = (byte) (b & 0xff);
  131.         write(oneByte);
  132.     }

  133.     private void writeCrc() throws IOException {
  134.         checksum.update(buffer, 0, currentIndex);
  135.         writeLittleEndian(4, mask(checksum.getValue()));
  136.         checksum.reset();
  137.     }

  138.     private void writeLittleEndian(final int numBytes, final long num) throws IOException {
  139.         ByteUtils.toLittleEndian(consumer, num, numBytes);
  140.     }
  141. }