1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.commons.compress.compressors.snappy;
20
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.PushbackInputStream;
24 import java.util.Arrays;
25
26 import org.apache.commons.codec.digest.PureJavaCrc32C;
27 import org.apache.commons.compress.compressors.CompressorInputStream;
28 import org.apache.commons.compress.utils.BoundedInputStream;
29 import org.apache.commons.compress.utils.ByteUtils;
30 import org.apache.commons.compress.utils.IOUtils;
31 import org.apache.commons.compress.utils.InputStreamStatistics;
32 import org.apache.commons.io.input.CountingInputStream;
33
34
35
36
37
38
39
40
41
42
43
44 public class FramedSnappyCompressorInputStream extends CompressorInputStream implements InputStreamStatistics {
45
46
47
48
49 static final long MASK_OFFSET = 0xa282ead8L;
50
51 private static final int STREAM_IDENTIFIER_TYPE = 0xff;
52 static final int COMPRESSED_CHUNK_TYPE = 0;
53 private static final int UNCOMPRESSED_CHUNK_TYPE = 1;
54 private static final int PADDING_CHUNK_TYPE = 0xfe;
55 private static final int MIN_UNSKIPPABLE_TYPE = 2;
56 private static final int MAX_UNSKIPPABLE_TYPE = 0x7f;
57 private static final int MAX_SKIPPABLE_TYPE = 0xfd;
58
59
60 static final byte[] SZ_SIGNATURE = {
61 (byte) STREAM_IDENTIFIER_TYPE,
62 6, 0, 0,
63 's', 'N', 'a', 'P', 'p', 'Y' };
64
65
66
67
68
69
70
71
72
73
74
75
76 public static boolean matches(final byte[] signature, final int length) {
77
78 if (length < SZ_SIGNATURE.length) {
79 return false;
80 }
81
82 byte[] shortenedSig = signature;
83 if (signature.length > SZ_SIGNATURE.length) {
84 shortenedSig = Arrays.copyOf(signature, SZ_SIGNATURE.length);
85 }
86
87 return Arrays.equals(shortenedSig, SZ_SIGNATURE);
88 }
89
90 static long unmask(long x) {
91
92
93 x -= MASK_OFFSET;
94 x &= 0xffffFFFFL;
95 return (x >> 17 | x << 15) & 0xffffFFFFL;
96 }
97
98 private long unreadBytes;
99
100 private final CountingInputStream countingStream;
101
102
103 private final PushbackInputStream inputStream;
104
105
106 private final FramedSnappyDialect dialect;
107
108 private SnappyCompressorInputStream currentCompressedChunk;
109
110
111 private final byte[] oneByte = new byte[1];
112 private boolean endReached, inUncompressedChunk;
113 private int uncompressedBytesRemaining;
114 private long expectedChecksum = -1;
115
116 private final int blockSize;
117
118 private final PureJavaCrc32C checksum = new PureJavaCrc32C();
119
120 private final ByteUtils.ByteSupplier supplier = this::readOneByte;
121
122
123
124
125
126
127
128
129 public FramedSnappyCompressorInputStream(final InputStream in) throws IOException {
130 this(in, FramedSnappyDialect.STANDARD);
131 }
132
133
134
135
136
137
138
139
140 public FramedSnappyCompressorInputStream(final InputStream in, final FramedSnappyDialect dialect) throws IOException {
141 this(in, SnappyCompressorInputStream.DEFAULT_BLOCK_SIZE, dialect);
142 }
143
144
145
146
147
148
149
150
151
152
153
154 public FramedSnappyCompressorInputStream(final InputStream in, final int blockSize, final FramedSnappyDialect dialect) throws IOException {
155 if (blockSize <= 0) {
156 throw new IllegalArgumentException("blockSize must be bigger than 0");
157 }
158 countingStream = new CountingInputStream(in);
159 this.inputStream = new PushbackInputStream(countingStream, 1);
160 this.blockSize = blockSize;
161 this.dialect = dialect;
162 if (dialect.hasStreamIdentifier()) {
163 readStreamIdentifier();
164 }
165 }
166
167
168 @Override
169 public int available() throws IOException {
170 if (inUncompressedChunk) {
171 return Math.min(uncompressedBytesRemaining, inputStream.available());
172 }
173 if (currentCompressedChunk != null) {
174 return currentCompressedChunk.available();
175 }
176 return 0;
177 }
178
179
180 @Override
181 public void close() throws IOException {
182 try {
183 if (currentCompressedChunk != null) {
184 currentCompressedChunk.close();
185 currentCompressedChunk = null;
186 }
187 } finally {
188 inputStream.close();
189 }
190 }
191
192
193
194
195 @Override
196 public long getCompressedCount() {
197 return countingStream.getByteCount() - unreadBytes;
198 }
199
200
201 @Override
202 public int read() throws IOException {
203 return read(oneByte, 0, 1) == -1 ? -1 : oneByte[0] & 0xFF;
204 }
205
206
207 @Override
208 public int read(final byte[] b, final int off, final int len) throws IOException {
209 if (len == 0) {
210 return 0;
211 }
212 int read = readOnce(b, off, len);
213 if (read == -1) {
214 readNextBlock();
215 if (endReached) {
216 return -1;
217 }
218 read = readOnce(b, off, len);
219 }
220 return read;
221 }
222
223 private long readCrc() throws IOException {
224 final byte[] b = new byte[4];
225 final int read = IOUtils.readFully(inputStream, b);
226 count(read);
227 if (read != 4) {
228 throw new IOException("Premature end of stream");
229 }
230 return ByteUtils.fromLittleEndian(b);
231 }
232
233 private void readNextBlock() throws IOException {
234 verifyLastChecksumAndReset();
235 inUncompressedChunk = false;
236 final int type = readOneByte();
237 if (type == -1) {
238 endReached = true;
239 } else if (type == STREAM_IDENTIFIER_TYPE) {
240 inputStream.unread(type);
241 unreadBytes++;
242 pushedBackBytes(1);
243 readStreamIdentifier();
244 readNextBlock();
245 } else if (type == PADDING_CHUNK_TYPE || type > MAX_UNSKIPPABLE_TYPE && type <= MAX_SKIPPABLE_TYPE) {
246 skipBlock();
247 readNextBlock();
248 } else if (type >= MIN_UNSKIPPABLE_TYPE && type <= MAX_UNSKIPPABLE_TYPE) {
249 throw new IOException("Unskippable chunk with type " + type + " (hex " + Integer.toHexString(type) + ")" + " detected.");
250 } else if (type == UNCOMPRESSED_CHUNK_TYPE) {
251 inUncompressedChunk = true;
252 uncompressedBytesRemaining = readSize() - 4 ;
253 if (uncompressedBytesRemaining < 0) {
254 throw new IOException("Found illegal chunk with negative size");
255 }
256 expectedChecksum = unmask(readCrc());
257 } else if (type == COMPRESSED_CHUNK_TYPE) {
258 final boolean expectChecksum = dialect.usesChecksumWithCompressedChunks();
259 final long size = readSize() - (expectChecksum ? 4L : 0L);
260 if (size < 0) {
261 throw new IOException("Found illegal chunk with negative size");
262 }
263 if (expectChecksum) {
264 expectedChecksum = unmask(readCrc());
265 } else {
266 expectedChecksum = -1;
267 }
268 currentCompressedChunk = new SnappyCompressorInputStream(new BoundedInputStream(inputStream, size), blockSize);
269
270 count(currentCompressedChunk.getBytesRead());
271 } else {
272
273 throw new IOException("Unknown chunk type " + type + " detected.");
274 }
275 }
276
277
278
279
280
281
282 private int readOnce(final byte[] b, final int off, final int len) throws IOException {
283 int read = -1;
284 if (inUncompressedChunk) {
285 final int amount = Math.min(uncompressedBytesRemaining, len);
286 if (amount == 0) {
287 return -1;
288 }
289 read = inputStream.read(b, off, amount);
290 if (read != -1) {
291 uncompressedBytesRemaining -= read;
292 count(read);
293 }
294 } else if (currentCompressedChunk != null) {
295 final long before = currentCompressedChunk.getBytesRead();
296 read = currentCompressedChunk.read(b, off, len);
297 if (read == -1) {
298 currentCompressedChunk.close();
299 currentCompressedChunk = null;
300 } else {
301 count(currentCompressedChunk.getBytesRead() - before);
302 }
303 }
304 if (read > 0) {
305 checksum.update(b, off, read);
306 }
307 return read;
308 }
309
310 private int readOneByte() throws IOException {
311 final int b = inputStream.read();
312 if (b != -1) {
313 count(1);
314 return b & 0xFF;
315 }
316 return -1;
317 }
318
319 private int readSize() throws IOException {
320 return (int) ByteUtils.fromLittleEndian(supplier, 3);
321 }
322
323 private void readStreamIdentifier() throws IOException {
324 final byte[] b = new byte[10];
325 final int read = IOUtils.readFully(inputStream, b);
326 count(read);
327 if (10 != read || !matches(b, 10)) {
328 throw new IOException("Not a framed Snappy stream");
329 }
330 }
331
332 private void skipBlock() throws IOException {
333 final int size = readSize();
334 if (size < 0) {
335 throw new IOException("Found illegal chunk with negative size");
336 }
337 final long read = org.apache.commons.io.IOUtils.skip(inputStream, size);
338 count(read);
339 if (read != size) {
340 throw new IOException("Premature end of stream");
341 }
342 }
343
344 private void verifyLastChecksumAndReset() throws IOException {
345 if (expectedChecksum >= 0 && expectedChecksum != checksum.getValue()) {
346 throw new IOException("Checksum verification failed");
347 }
348 expectedChecksum = -1;
349 checksum.reset();
350 }
351
352 }