001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.commons.compress.compressors.snappy; 020 021import java.io.ByteArrayOutputStream; 022import java.io.IOException; 023import java.io.OutputStream; 024 025import org.apache.commons.codec.digest.PureJavaCrc32C; 026import org.apache.commons.compress.compressors.CompressorOutputStream; 027import org.apache.commons.compress.compressors.lz77support.Parameters; 028import org.apache.commons.compress.utils.ByteUtils; 029 030/** 031 * CompressorOutputStream for the framing Snappy format. 032 * 033 * <p> 034 * Based on the "spec" in the version "Last revised: 2013-10-25" 035 * </p> 036 * 037 * @see <a href="https://github.com/google/snappy/blob/master/framing_format.txt">Snappy framing format description</a> 038 * @since 1.14 039 * @NotThreadSafe 040 */ 041public class FramedSnappyCompressorOutputStream extends CompressorOutputStream { 042 // see spec: 043 // > However, we place an additional restriction that the uncompressed data 044 // > in a chunk must be no longer than 65,536 bytes. This allows consumers to 045 // > easily use small fixed-size buffers. 046 private static final int MAX_COMPRESSED_BUFFER_SIZE = 1 << 16; 047 048 static long mask(long x) { 049 // ugly, maybe we should just have used ints and deal with the 050 // overflow 051 x = x >> 15 | x << 17; 052 x += FramedSnappyCompressorInputStream.MASK_OFFSET; 053 x &= 0xffffFFFFL; 054 return x; 055 } 056 057 private final OutputStream out; 058 private final Parameters params; 059 private final PureJavaCrc32C checksum = new PureJavaCrc32C(); 060 // used in one-arg write method 061 private final byte[] oneByte = new byte[1]; 062 private final byte[] buffer = new byte[MAX_COMPRESSED_BUFFER_SIZE]; 063 064 private int currentIndex; 065 066 private final ByteUtils.ByteConsumer consumer; 067 068 /** 069 * Constructs a new output stream that compresses snappy-framed-compressed data to the specified output stream. 070 * 071 * @param out the OutputStream to which to write the compressed data 072 * @throws IOException if writing the signature fails 073 */ 074 public FramedSnappyCompressorOutputStream(final OutputStream out) throws IOException { 075 this(out, SnappyCompressorOutputStream.createParameterBuilder(SnappyCompressorInputStream.DEFAULT_BLOCK_SIZE).build()); 076 } 077 078 /** 079 * Constructs a new output stream that compresses snappy-framed-compressed data to the specified output stream. 080 * 081 * @param out the OutputStream to which to write the compressed data 082 * @param params parameters used to fine-tune compression, in particular to balance compression ratio vs compression speed. 083 * @throws IOException if writing the signature fails 084 */ 085 public FramedSnappyCompressorOutputStream(final OutputStream out, final Parameters params) throws IOException { 086 this.out = out; 087 this.params = params; 088 consumer = new ByteUtils.OutputStreamByteConsumer(out); 089 out.write(FramedSnappyCompressorInputStream.SZ_SIGNATURE); 090 } 091 092 @Override 093 public void close() throws IOException { 094 try { 095 finish(); 096 } finally { 097 out.close(); 098 } 099 } 100 101 /** 102 * Compresses all remaining data and writes it to the stream, doesn't close the underlying stream. 103 * 104 * @throws IOException if an error occurs 105 */ 106 public void finish() throws IOException { 107 flushBuffer(); 108 } 109 110 private void flushBuffer() throws IOException { 111 if (currentIndex == 0) { 112 return; 113 } 114 out.write(FramedSnappyCompressorInputStream.COMPRESSED_CHUNK_TYPE); 115 final ByteArrayOutputStream baos = new ByteArrayOutputStream(); 116 try (OutputStream o = new SnappyCompressorOutputStream(baos, currentIndex, params)) { 117 o.write(buffer, 0, currentIndex); 118 } 119 final byte[] b = baos.toByteArray(); 120 writeLittleEndian(3, b.length + 4L /* CRC */); 121 writeCrc(); 122 out.write(b); 123 currentIndex = 0; 124 } 125 126 @Override 127 public void write(final byte[] data, int off, int len) throws IOException { 128 int blockDataRemaining = buffer.length - currentIndex; 129 while (len > 0) { 130 final int copyLen = Math.min(len, blockDataRemaining); 131 System.arraycopy(data, off, buffer, currentIndex, copyLen); 132 off += copyLen; 133 blockDataRemaining -= copyLen; 134 len -= copyLen; 135 currentIndex += copyLen; 136 if (blockDataRemaining == 0) { 137 flushBuffer(); 138 blockDataRemaining = buffer.length; 139 } 140 } 141 } 142 143 @Override 144 public void write(final int b) throws IOException { 145 oneByte[0] = (byte) (b & 0xff); 146 write(oneByte); 147 } 148 149 private void writeCrc() throws IOException { 150 checksum.update(buffer, 0, currentIndex); 151 writeLittleEndian(4, mask(checksum.getValue())); 152 checksum.reset(); 153 } 154 155 private void writeLittleEndian(final int numBytes, final long num) throws IOException { 156 ByteUtils.toLittleEndian(consumer, num, numBytes); 157 } 158}