001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.commons.compress.compressors.snappy;
020
021import java.io.ByteArrayOutputStream;
022import java.io.IOException;
023import java.io.OutputStream;
024
025import org.apache.commons.compress.compressors.CompressorOutputStream;
026import org.apache.commons.compress.compressors.lz77support.Parameters;
027import org.apache.commons.compress.utils.ByteUtils;
028
029/**
030 * CompressorOutputStream for the framing Snappy format.
031 *
032 * <p>Based on the "spec" in the version "Last revised: 2013-10-25"</p>
033 *
034 * @see <a href="https://github.com/google/snappy/blob/master/framing_format.txt">Snappy framing format description</a>
035 * @since 1.14
036 * @NotThreadSafe
037 */
038public class FramedSnappyCompressorOutputStream extends CompressorOutputStream {
039    // see spec:
040    // > However, we place an additional restriction that the uncompressed data
041    // > in a chunk must be no longer than 65536 bytes. This allows consumers to
042    // > easily use small fixed-size buffers.
043    private static final int MAX_COMPRESSED_BUFFER_SIZE = 1 << 16;
044
045    private final OutputStream out;
046    private final Parameters params;
047    private final PureJavaCrc32C checksum = new PureJavaCrc32C();
048    // used in one-arg write method
049    private final byte[] oneByte = new byte[1];
050    private final byte[] buffer = new byte[MAX_COMPRESSED_BUFFER_SIZE];
051    private int currentIndex;
052
053    private final ByteUtils.ByteConsumer consumer;
054
055    /**
056     * Constructs a new output stream that compresses
057     * snappy-framed-compressed data to the specified output stream.
058     * @param out the OutputStream to which to write the compressed data
059     * @throws IOException if writing the signature fails
060     */
061    public FramedSnappyCompressorOutputStream(final OutputStream out) throws IOException {
062        this(out, SnappyCompressorOutputStream.createParameterBuilder(SnappyCompressorInputStream.DEFAULT_BLOCK_SIZE)
063             .build());
064    }
065
066    /**
067     * Constructs a new output stream that compresses
068     * snappy-framed-compressed data to the specified output stream.
069     * @param out the OutputStream to which to write the compressed data
070     * @param params parameters used to fine-tune compression, in
071     * particular to balance compression ratio vs compression speed.
072     * @throws IOException if writing the signature fails
073     */
074    public FramedSnappyCompressorOutputStream(final OutputStream out, final Parameters params) throws IOException {
075        this.out = out;
076        this.params = params;
077        consumer = new ByteUtils.OutputStreamByteConsumer(out);
078        out.write(FramedSnappyCompressorInputStream.SZ_SIGNATURE);
079    }
080
081    @Override
082    public void write(final int b) throws IOException {
083        oneByte[0] = (byte) (b & 0xff);
084        write(oneByte);
085    }
086
087    @Override
088    public void write(final byte[] data, int off, int len) throws IOException {
089        if (currentIndex + len > MAX_COMPRESSED_BUFFER_SIZE) {
090            flushBuffer();
091            while (len > MAX_COMPRESSED_BUFFER_SIZE) {
092                System.arraycopy(data, off, buffer, 0, MAX_COMPRESSED_BUFFER_SIZE);
093                off += MAX_COMPRESSED_BUFFER_SIZE;
094                len -= MAX_COMPRESSED_BUFFER_SIZE;
095                currentIndex = MAX_COMPRESSED_BUFFER_SIZE;
096                flushBuffer();
097            }
098        }
099        System.arraycopy(data, off, buffer, currentIndex, len);
100        currentIndex += len;
101    }
102
103    @Override
104    public void close() throws IOException {
105        try {
106            finish();
107        } finally {
108            out.close();
109        }
110    }
111
112    /**
113     * Compresses all remaining data and writes it to the stream,
114     * doesn't close the underlying stream.
115     * @throws IOException if an error occurs
116     */
117    public void finish() throws IOException {
118        if (currentIndex > 0) {
119            flushBuffer();
120        }
121    }
122
123    private void flushBuffer() throws IOException {
124        out.write(FramedSnappyCompressorInputStream.COMPRESSED_CHUNK_TYPE);
125        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
126        try (OutputStream o = new SnappyCompressorOutputStream(baos, currentIndex, params)) {
127            o.write(buffer, 0, currentIndex);
128        }
129        final byte[] b = baos.toByteArray();
130        writeLittleEndian(3, b.length + 4L /* CRC */);
131        writeCrc();
132        out.write(b);
133        currentIndex = 0;
134    }
135
136    private void writeLittleEndian(final int numBytes, final long num) throws IOException {
137        ByteUtils.toLittleEndian(consumer, num, numBytes);
138    }
139
140    private void writeCrc() throws IOException {
141        checksum.update(buffer, 0, currentIndex);
142        writeLittleEndian(4, mask(checksum.getValue()));
143        checksum.reset();
144    }
145
146    static long mask(long x) {
147        // ugly, maybe we should just have used ints and deal with the
148        // overflow
149        x = ((x >> 15) | (x << 17));
150        x += FramedSnappyCompressorInputStream.MASK_OFFSET;
151        x &= 0xffffFFFFL;
152        return x;
153    }
154}