1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.commons.compress.compressors.snappy;
20
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.PushbackInputStream;
24 import java.util.Arrays;
25
26 import org.apache.commons.codec.digest.PureJavaCrc32C;
27 import org.apache.commons.compress.compressors.CompressorInputStream;
28 import org.apache.commons.compress.utils.ByteUtils;
29 import org.apache.commons.compress.utils.IOUtils;
30 import org.apache.commons.compress.utils.InputStreamStatistics;
31 import org.apache.commons.io.input.BoundedInputStream;
32
33
34
35
36
37
38
39
40
41
42
43 public class FramedSnappyCompressorInputStream extends CompressorInputStream implements InputStreamStatistics {
44
45
46
47
48 static final long MASK_OFFSET = 0xa282ead8L;
49
50 private static final int STREAM_IDENTIFIER_TYPE = 0xff;
51 static final int COMPRESSED_CHUNK_TYPE = 0;
52 private static final int UNCOMPRESSED_CHUNK_TYPE = 1;
53 private static final int PADDING_CHUNK_TYPE = 0xfe;
54 private static final int MIN_UNSKIPPABLE_TYPE = 2;
55 private static final int MAX_UNSKIPPABLE_TYPE = 0x7f;
56 private static final int MAX_SKIPPABLE_TYPE = 0xfd;
57
58
59 static final byte[] SZ_SIGNATURE = {
60 (byte) STREAM_IDENTIFIER_TYPE,
61 6, 0, 0,
62 's', 'N', 'a', 'P', 'p', 'Y' };
63
64
65
66
67
68
69
70
71
72
73
74
75 public static boolean matches(final byte[] signature, final int length) {
76
77 if (length < SZ_SIGNATURE.length) {
78 return false;
79 }
80
81 byte[] shortenedSig = signature;
82 if (signature.length > SZ_SIGNATURE.length) {
83 shortenedSig = Arrays.copyOf(signature, SZ_SIGNATURE.length);
84 }
85
86 return Arrays.equals(shortenedSig, SZ_SIGNATURE);
87 }
88
89 static long unmask(long x) {
90
91
92 x -= MASK_OFFSET;
93 x &= 0xffffFFFFL;
94 return (x >> 17 | x << 15) & 0xffffFFFFL;
95 }
96
97 private long unreadBytes;
98
99 private final BoundedInputStream countingStream;
100
101
102 private final PushbackInputStream inputStream;
103
104
105 private final FramedSnappyDialect dialect;
106
107 private SnappyCompressorInputStream currentCompressedChunk;
108
109
110 private final byte[] oneByte = new byte[1];
111 private boolean endReached, inUncompressedChunk;
112 private int uncompressedBytesRemaining;
113 private long expectedChecksum = -1;
114
115 private final int blockSize;
116
117 private final PureJavaCrc32C checksum = new PureJavaCrc32C();
118
119 private final ByteUtils.ByteSupplier supplier = this::readOneByte;
120
121
122
123
124
125
126
127
128 public FramedSnappyCompressorInputStream(final InputStream in) throws IOException {
129 this(in, FramedSnappyDialect.STANDARD);
130 }
131
132
133
134
135
136
137
138
139 public FramedSnappyCompressorInputStream(final InputStream in, final FramedSnappyDialect dialect) throws IOException {
140 this(in, SnappyCompressorInputStream.DEFAULT_BLOCK_SIZE, dialect);
141 }
142
143
144
145
146
147
148
149
150
151
152
153 public FramedSnappyCompressorInputStream(final InputStream in, final int blockSize, final FramedSnappyDialect dialect) throws IOException {
154 if (blockSize <= 0) {
155 throw new IllegalArgumentException("blockSize must be bigger than 0");
156 }
157 countingStream = BoundedInputStream.builder().setInputStream(in).get();
158 this.inputStream = new PushbackInputStream(countingStream, 1);
159 this.blockSize = blockSize;
160 this.dialect = dialect;
161 if (dialect.hasStreamIdentifier()) {
162 readStreamIdentifier();
163 }
164 }
165
166
167 @Override
168 public int available() throws IOException {
169 if (inUncompressedChunk) {
170 return Math.min(uncompressedBytesRemaining, inputStream.available());
171 }
172 if (currentCompressedChunk != null) {
173 return currentCompressedChunk.available();
174 }
175 return 0;
176 }
177
178
179 @Override
180 public void close() throws IOException {
181 try {
182 if (currentCompressedChunk != null) {
183 currentCompressedChunk.close();
184 currentCompressedChunk = null;
185 }
186 } finally {
187 inputStream.close();
188 }
189 }
190
191
192
193
194 @Override
195 public long getCompressedCount() {
196 return countingStream.getCount() - unreadBytes;
197 }
198
199
200 @Override
201 public int read() throws IOException {
202 return read(oneByte, 0, 1) == -1 ? -1 : oneByte[0] & 0xFF;
203 }
204
205
206 @Override
207 public int read(final byte[] b, final int off, final int len) throws IOException {
208 if (len == 0) {
209 return 0;
210 }
211 int read = readOnce(b, off, len);
212 if (read == -1) {
213 readNextBlock();
214 if (endReached) {
215 return -1;
216 }
217 read = readOnce(b, off, len);
218 }
219 return read;
220 }
221
222 private long readCrc() throws IOException {
223 final byte[] b = new byte[4];
224 final int read = IOUtils.readFully(inputStream, b);
225 count(read);
226 if (read != 4) {
227 throw new IOException("Premature end of stream");
228 }
229 return ByteUtils.fromLittleEndian(b);
230 }
231
232 private void readNextBlock() throws IOException {
233 verifyLastChecksumAndReset();
234 inUncompressedChunk = false;
235 final int type = readOneByte();
236 if (type == -1) {
237 endReached = true;
238 } else if (type == STREAM_IDENTIFIER_TYPE) {
239 inputStream.unread(type);
240 unreadBytes++;
241 pushedBackBytes(1);
242 readStreamIdentifier();
243 readNextBlock();
244 } else if (type == PADDING_CHUNK_TYPE || type > MAX_UNSKIPPABLE_TYPE && type <= MAX_SKIPPABLE_TYPE) {
245 skipBlock();
246 readNextBlock();
247 } else if (type >= MIN_UNSKIPPABLE_TYPE && type <= MAX_UNSKIPPABLE_TYPE) {
248 throw new IOException("Unskippable chunk with type " + type + " (hex " + Integer.toHexString(type) + ")" + " detected.");
249 } else if (type == UNCOMPRESSED_CHUNK_TYPE) {
250 inUncompressedChunk = true;
251 uncompressedBytesRemaining = readSize() - 4 ;
252 if (uncompressedBytesRemaining < 0) {
253 throw new IOException("Found illegal chunk with negative size");
254 }
255 expectedChecksum = unmask(readCrc());
256 } else if (type == COMPRESSED_CHUNK_TYPE) {
257 final boolean expectChecksum = dialect.usesChecksumWithCompressedChunks();
258 final long size = readSize() - (expectChecksum ? 4L : 0L);
259 if (size < 0) {
260 throw new IOException("Found illegal chunk with negative size");
261 }
262 if (expectChecksum) {
263 expectedChecksum = unmask(readCrc());
264 } else {
265 expectedChecksum = -1;
266 }
267
268 currentCompressedChunk = new SnappyCompressorInputStream(BoundedInputStream.builder()
269 .setInputStream(inputStream)
270 .setMaxCount(size)
271 .setPropagateClose(false)
272 .get(),
273 blockSize);
274
275
276 count(currentCompressedChunk.getBytesRead());
277 } else {
278
279 throw new IOException("Unknown chunk type " + type + " detected.");
280 }
281 }
282
283
284
285
286
287
288 private int readOnce(final byte[] b, final int off, final int len) throws IOException {
289 int read = -1;
290 if (inUncompressedChunk) {
291 final int amount = Math.min(uncompressedBytesRemaining, len);
292 if (amount == 0) {
293 return -1;
294 }
295 read = inputStream.read(b, off, amount);
296 if (read != -1) {
297 uncompressedBytesRemaining -= read;
298 count(read);
299 }
300 } else if (currentCompressedChunk != null) {
301 final long before = currentCompressedChunk.getBytesRead();
302 read = currentCompressedChunk.read(b, off, len);
303 if (read == -1) {
304 currentCompressedChunk.close();
305 currentCompressedChunk = null;
306 } else {
307 count(currentCompressedChunk.getBytesRead() - before);
308 }
309 }
310 if (read > 0) {
311 checksum.update(b, off, read);
312 }
313 return read;
314 }
315
316 private int readOneByte() throws IOException {
317 final int b = inputStream.read();
318 if (b != -1) {
319 count(1);
320 return b & 0xFF;
321 }
322 return -1;
323 }
324
325 private int readSize() throws IOException {
326 return (int) ByteUtils.fromLittleEndian(supplier, 3);
327 }
328
329 private void readStreamIdentifier() throws IOException {
330 final byte[] b = new byte[10];
331 final int read = IOUtils.readFully(inputStream, b);
332 count(read);
333 if (10 != read || !matches(b, 10)) {
334 throw new IOException("Not a framed Snappy stream");
335 }
336 }
337
338 private void skipBlock() throws IOException {
339 final int size = readSize();
340 if (size < 0) {
341 throw new IOException("Found illegal chunk with negative size");
342 }
343 final long read = org.apache.commons.io.IOUtils.skip(inputStream, size);
344 count(read);
345 if (read != size) {
346 throw new IOException("Premature end of stream");
347 }
348 }
349
350 private void verifyLastChecksumAndReset() throws IOException {
351 if (expectedChecksum >= 0 && expectedChecksum != checksum.getValue()) {
352 throw new IOException("Checksum verification failed");
353 }
354 expectedChecksum = -1;
355 checksum.reset();
356 }
357
358 }