001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.commons.compress.compressors.snappy;
020
021import java.io.ByteArrayOutputStream;
022import java.io.IOException;
023import java.io.OutputStream;
024
025import org.apache.commons.codec.digest.PureJavaCrc32C;
026import org.apache.commons.compress.compressors.CompressorOutputStream;
027import org.apache.commons.compress.compressors.lz77support.Parameters;
028import org.apache.commons.compress.utils.ByteUtils;
029
030/**
031 * CompressorOutputStream for the framing Snappy format.
032 *
033 * <p>
034 * Based on the "spec" in the version "Last revised: 2013-10-25"
035 * </p>
036 *
037 * @see <a href="https://github.com/google/snappy/blob/master/framing_format.txt">Snappy framing format description</a>
038 * @since 1.14
039 * @NotThreadSafe
040 */
041public class FramedSnappyCompressorOutputStream extends CompressorOutputStream {
042    // see spec:
043    // > However, we place an additional restriction that the uncompressed data
044    // > in a chunk must be no longer than 65,536 bytes. This allows consumers to
045    // > easily use small fixed-size buffers.
046    private static final int MAX_COMPRESSED_BUFFER_SIZE = 1 << 16;
047
048    static long mask(long x) {
049        // ugly, maybe we should just have used ints and deal with the
050        // overflow
051        x = x >> 15 | x << 17;
052        x += FramedSnappyCompressorInputStream.MASK_OFFSET;
053        x &= 0xffffFFFFL;
054        return x;
055    }
056
057    private final OutputStream out;
058    private final Parameters params;
059    private final PureJavaCrc32C checksum = new PureJavaCrc32C();
060    // used in one-arg write method
061    private final byte[] oneByte = new byte[1];
062    private final byte[] buffer = new byte[MAX_COMPRESSED_BUFFER_SIZE];
063
064    private int currentIndex;
065
066    private final ByteUtils.ByteConsumer consumer;
067
068    /**
069     * Constructs a new output stream that compresses snappy-framed-compressed data to the specified output stream.
070     *
071     * @param out the OutputStream to which to write the compressed data
072     * @throws IOException if writing the signature fails
073     */
074    public FramedSnappyCompressorOutputStream(final OutputStream out) throws IOException {
075        this(out, SnappyCompressorOutputStream.createParameterBuilder(SnappyCompressorInputStream.DEFAULT_BLOCK_SIZE).build());
076    }
077
078    /**
079     * Constructs a new output stream that compresses snappy-framed-compressed data to the specified output stream.
080     *
081     * @param out    the OutputStream to which to write the compressed data
082     * @param params parameters used to fine-tune compression, in particular to balance compression ratio vs compression speed.
083     * @throws IOException if writing the signature fails
084     */
085    public FramedSnappyCompressorOutputStream(final OutputStream out, final Parameters params) throws IOException {
086        this.out = out;
087        this.params = params;
088        consumer = new ByteUtils.OutputStreamByteConsumer(out);
089        out.write(FramedSnappyCompressorInputStream.SZ_SIGNATURE);
090    }
091
092    @Override
093    public void close() throws IOException {
094        try {
095            finish();
096        } finally {
097            out.close();
098        }
099    }
100
101    /**
102     * Compresses all remaining data and writes it to the stream, doesn't close the underlying stream.
103     *
104     * @throws IOException if an error occurs
105     */
106    public void finish() throws IOException {
107        flushBuffer();
108    }
109
110    private void flushBuffer() throws IOException {
111        if (currentIndex == 0) {
112            return;
113        }
114        out.write(FramedSnappyCompressorInputStream.COMPRESSED_CHUNK_TYPE);
115        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
116        try (OutputStream o = new SnappyCompressorOutputStream(baos, currentIndex, params)) {
117            o.write(buffer, 0, currentIndex);
118        }
119        final byte[] b = baos.toByteArray();
120        writeLittleEndian(3, b.length + 4L /* CRC */);
121        writeCrc();
122        out.write(b);
123        currentIndex = 0;
124    }
125
126    @Override
127    public void write(final byte[] data, int off, int len) throws IOException {
128        int blockDataRemaining = buffer.length - currentIndex;
129        while (len > 0) {
130            final int copyLen = Math.min(len, blockDataRemaining);
131            System.arraycopy(data, off, buffer, currentIndex, copyLen);
132            off += copyLen;
133            blockDataRemaining -= copyLen;
134            len -= copyLen;
135            currentIndex += copyLen;
136            if (blockDataRemaining == 0) {
137                flushBuffer();
138                blockDataRemaining = buffer.length;
139            }
140        }
141    }
142
143    @Override
144    public void write(final int b) throws IOException {
145        oneByte[0] = (byte) (b & 0xff);
146        write(oneByte);
147    }
148
149    private void writeCrc() throws IOException {
150        checksum.update(buffer, 0, currentIndex);
151        writeLittleEndian(4, mask(checksum.getValue()));
152        checksum.reset();
153    }
154
155    private void writeLittleEndian(final int numBytes, final long num) throws IOException {
156        ByteUtils.toLittleEndian(consumer, num, numBytes);
157    }
158}