001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.commons.compress.compressors.snappy; 020 021import java.io.IOException; 022import java.io.InputStream; 023import java.io.PushbackInputStream; 024import java.util.Arrays; 025 026import org.apache.commons.codec.digest.PureJavaCrc32C; 027import org.apache.commons.compress.compressors.CompressorInputStream; 028import org.apache.commons.compress.utils.BoundedInputStream; 029import org.apache.commons.compress.utils.ByteUtils; 030import org.apache.commons.compress.utils.IOUtils; 031import org.apache.commons.compress.utils.InputStreamStatistics; 032import org.apache.commons.io.input.CountingInputStream; 033 034/** 035 * CompressorInputStream for the framing Snappy format. 036 * 037 * <p> 038 * Based on the "spec" in the version "Last revised: 2013-10-25" 039 * </p> 040 * 041 * @see <a href="https://github.com/google/snappy/blob/master/framing_format.txt">Snappy framing format description</a> 042 * @since 1.7 043 */ 044public class FramedSnappyCompressorInputStream extends CompressorInputStream implements InputStreamStatistics { 045 046 /** 047 * package private for tests only. 048 */ 049 static final long MASK_OFFSET = 0xa282ead8L; 050 051 private static final int STREAM_IDENTIFIER_TYPE = 0xff; 052 static final int COMPRESSED_CHUNK_TYPE = 0; 053 private static final int UNCOMPRESSED_CHUNK_TYPE = 1; 054 private static final int PADDING_CHUNK_TYPE = 0xfe; 055 private static final int MIN_UNSKIPPABLE_TYPE = 2; 056 private static final int MAX_UNSKIPPABLE_TYPE = 0x7f; 057 private static final int MAX_SKIPPABLE_TYPE = 0xfd; 058 059 // used by FramedSnappyCompressorOutputStream as well 060 static final byte[] SZ_SIGNATURE = { // NOSONAR 061 (byte) STREAM_IDENTIFIER_TYPE, // tag 062 6, 0, 0, // length 063 's', 'N', 'a', 'P', 'p', 'Y' }; 064 065 /** 066 * Checks if the signature matches what is expected for a .sz file. 067 * 068 * <p> 069 * .sz files start with a chunk with tag 0xff and content sNaPpY. 070 * </p> 071 * 072 * @param signature the bytes to check 073 * @param length the number of bytes to check 074 * @return true if this is a .sz stream, false otherwise 075 */ 076 public static boolean matches(final byte[] signature, final int length) { 077 078 if (length < SZ_SIGNATURE.length) { 079 return false; 080 } 081 082 byte[] shortenedSig = signature; 083 if (signature.length > SZ_SIGNATURE.length) { 084 shortenedSig = Arrays.copyOf(signature, SZ_SIGNATURE.length); 085 } 086 087 return Arrays.equals(shortenedSig, SZ_SIGNATURE); 088 } 089 090 static long unmask(long x) { 091 // ugly, maybe we should just have used ints and deal with the 092 // overflow 093 x -= MASK_OFFSET; 094 x &= 0xffffFFFFL; 095 return (x >> 17 | x << 15) & 0xffffFFFFL; 096 } 097 098 private long unreadBytes; 099 100 private final CountingInputStream countingStream; 101 102 /** The underlying stream to read compressed data from */ 103 private final PushbackInputStream inputStream; 104 105 /** The dialect to expect */ 106 private final FramedSnappyDialect dialect; 107 108 private SnappyCompressorInputStream currentCompressedChunk; 109 110 // used in no-arg read method 111 private final byte[] oneByte = new byte[1]; 112 private boolean endReached, inUncompressedChunk; 113 private int uncompressedBytesRemaining; 114 private long expectedChecksum = -1; 115 116 private final int blockSize; 117 118 private final PureJavaCrc32C checksum = new PureJavaCrc32C(); 119 120 private final ByteUtils.ByteSupplier supplier = this::readOneByte; 121 122 /** 123 * Constructs a new input stream that decompresses snappy-framed-compressed data from the specified input stream using the 124 * {@link FramedSnappyDialect#STANDARD} dialect. 125 * 126 * @param in the InputStream from which to read the compressed data 127 * @throws IOException if reading fails 128 */ 129 public FramedSnappyCompressorInputStream(final InputStream in) throws IOException { 130 this(in, FramedSnappyDialect.STANDARD); 131 } 132 133 /** 134 * Constructs a new input stream that decompresses snappy-framed-compressed data from the specified input stream. 135 * 136 * @param in the InputStream from which to read the compressed data 137 * @param dialect the dialect used by the compressed stream 138 * @throws IOException if reading fails 139 */ 140 public FramedSnappyCompressorInputStream(final InputStream in, final FramedSnappyDialect dialect) throws IOException { 141 this(in, SnappyCompressorInputStream.DEFAULT_BLOCK_SIZE, dialect); 142 } 143 144 /** 145 * Constructs a new input stream that decompresses snappy-framed-compressed data from the specified input stream. 146 * 147 * @param in the InputStream from which to read the compressed data 148 * @param blockSize the block size to use for the compressed stream 149 * @param dialect the dialect used by the compressed stream 150 * @throws IOException if reading fails 151 * @throws IllegalArgumentException if blockSize is not bigger than 0 152 * @since 1.14 153 */ 154 public FramedSnappyCompressorInputStream(final InputStream in, final int blockSize, final FramedSnappyDialect dialect) throws IOException { 155 if (blockSize <= 0) { 156 throw new IllegalArgumentException("blockSize must be bigger than 0"); 157 } 158 countingStream = new CountingInputStream(in); 159 this.inputStream = new PushbackInputStream(countingStream, 1); 160 this.blockSize = blockSize; 161 this.dialect = dialect; 162 if (dialect.hasStreamIdentifier()) { 163 readStreamIdentifier(); 164 } 165 } 166 167 /** {@inheritDoc} */ 168 @Override 169 public int available() throws IOException { 170 if (inUncompressedChunk) { 171 return Math.min(uncompressedBytesRemaining, inputStream.available()); 172 } 173 if (currentCompressedChunk != null) { 174 return currentCompressedChunk.available(); 175 } 176 return 0; 177 } 178 179 /** {@inheritDoc} */ 180 @Override 181 public void close() throws IOException { 182 try { 183 if (currentCompressedChunk != null) { 184 currentCompressedChunk.close(); 185 currentCompressedChunk = null; 186 } 187 } finally { 188 inputStream.close(); 189 } 190 } 191 192 /** 193 * @since 1.17 194 */ 195 @Override 196 public long getCompressedCount() { 197 return countingStream.getByteCount() - unreadBytes; 198 } 199 200 /** {@inheritDoc} */ 201 @Override 202 public int read() throws IOException { 203 return read(oneByte, 0, 1) == -1 ? -1 : oneByte[0] & 0xFF; 204 } 205 206 /** {@inheritDoc} */ 207 @Override 208 public int read(final byte[] b, final int off, final int len) throws IOException { 209 if (len == 0) { 210 return 0; 211 } 212 int read = readOnce(b, off, len); 213 if (read == -1) { 214 readNextBlock(); 215 if (endReached) { 216 return -1; 217 } 218 read = readOnce(b, off, len); 219 } 220 return read; 221 } 222 223 private long readCrc() throws IOException { 224 final byte[] b = new byte[4]; 225 final int read = IOUtils.readFully(inputStream, b); 226 count(read); 227 if (read != 4) { 228 throw new IOException("Premature end of stream"); 229 } 230 return ByteUtils.fromLittleEndian(b); 231 } 232 233 private void readNextBlock() throws IOException { 234 verifyLastChecksumAndReset(); 235 inUncompressedChunk = false; 236 final int type = readOneByte(); 237 if (type == -1) { 238 endReached = true; 239 } else if (type == STREAM_IDENTIFIER_TYPE) { 240 inputStream.unread(type); 241 unreadBytes++; 242 pushedBackBytes(1); 243 readStreamIdentifier(); 244 readNextBlock(); 245 } else if (type == PADDING_CHUNK_TYPE || type > MAX_UNSKIPPABLE_TYPE && type <= MAX_SKIPPABLE_TYPE) { 246 skipBlock(); 247 readNextBlock(); 248 } else if (type >= MIN_UNSKIPPABLE_TYPE && type <= MAX_UNSKIPPABLE_TYPE) { 249 throw new IOException("Unskippable chunk with type " + type + " (hex " + Integer.toHexString(type) + ")" + " detected."); 250 } else if (type == UNCOMPRESSED_CHUNK_TYPE) { 251 inUncompressedChunk = true; 252 uncompressedBytesRemaining = readSize() - 4 /* CRC */; 253 if (uncompressedBytesRemaining < 0) { 254 throw new IOException("Found illegal chunk with negative size"); 255 } 256 expectedChecksum = unmask(readCrc()); 257 } else if (type == COMPRESSED_CHUNK_TYPE) { 258 final boolean expectChecksum = dialect.usesChecksumWithCompressedChunks(); 259 final long size = readSize() - (expectChecksum ? 4L : 0L); 260 if (size < 0) { 261 throw new IOException("Found illegal chunk with negative size"); 262 } 263 if (expectChecksum) { 264 expectedChecksum = unmask(readCrc()); 265 } else { 266 expectedChecksum = -1; 267 } 268 currentCompressedChunk = new SnappyCompressorInputStream(new BoundedInputStream(inputStream, size), blockSize); 269 // constructor reads uncompressed size 270 count(currentCompressedChunk.getBytesRead()); 271 } else { 272 // impossible as all potential byte values have been covered 273 throw new IOException("Unknown chunk type " + type + " detected."); 274 } 275 } 276 277 /** 278 * Read from the current chunk into the given array. 279 * 280 * @return -1 if there is no current chunk or the number of bytes read from the current chunk (which may be -1 if the end of the chunk is reached). 281 */ 282 private int readOnce(final byte[] b, final int off, final int len) throws IOException { 283 int read = -1; 284 if (inUncompressedChunk) { 285 final int amount = Math.min(uncompressedBytesRemaining, len); 286 if (amount == 0) { 287 return -1; 288 } 289 read = inputStream.read(b, off, amount); 290 if (read != -1) { 291 uncompressedBytesRemaining -= read; 292 count(read); 293 } 294 } else if (currentCompressedChunk != null) { 295 final long before = currentCompressedChunk.getBytesRead(); 296 read = currentCompressedChunk.read(b, off, len); 297 if (read == -1) { 298 currentCompressedChunk.close(); 299 currentCompressedChunk = null; 300 } else { 301 count(currentCompressedChunk.getBytesRead() - before); 302 } 303 } 304 if (read > 0) { 305 checksum.update(b, off, read); 306 } 307 return read; 308 } 309 310 private int readOneByte() throws IOException { 311 final int b = inputStream.read(); 312 if (b != -1) { 313 count(1); 314 return b & 0xFF; 315 } 316 return -1; 317 } 318 319 private int readSize() throws IOException { 320 return (int) ByteUtils.fromLittleEndian(supplier, 3); 321 } 322 323 private void readStreamIdentifier() throws IOException { 324 final byte[] b = new byte[10]; 325 final int read = IOUtils.readFully(inputStream, b); 326 count(read); 327 if (10 != read || !matches(b, 10)) { 328 throw new IOException("Not a framed Snappy stream"); 329 } 330 } 331 332 private void skipBlock() throws IOException { 333 final int size = readSize(); 334 if (size < 0) { 335 throw new IOException("Found illegal chunk with negative size"); 336 } 337 final long read = org.apache.commons.io.IOUtils.skip(inputStream, size); 338 count(read); 339 if (read != size) { 340 throw new IOException("Premature end of stream"); 341 } 342 } 343 344 private void verifyLastChecksumAndReset() throws IOException { 345 if (expectedChecksum >= 0 && expectedChecksum != checksum.getValue()) { 346 throw new IOException("Checksum verification failed"); 347 } 348 expectedChecksum = -1; 349 checksum.reset(); 350 } 351 352}