1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.commons.compress.compressors.snappy;
20
21 import java.io.ByteArrayOutputStream;
22 import java.io.IOException;
23 import java.io.OutputStream;
24
25 import org.apache.commons.codec.digest.PureJavaCrc32C;
26 import org.apache.commons.compress.compressors.CompressorOutputStream;
27 import org.apache.commons.compress.compressors.lz77support.Parameters;
28 import org.apache.commons.compress.utils.ByteUtils;
29
30
31
32
33
34
35
36
37
38
39
40
41 public class FramedSnappyCompressorOutputStream extends CompressorOutputStream {
42
43
44
45
46 private static final int MAX_COMPRESSED_BUFFER_SIZE = 1 << 16;
47
48 static long mask(long x) {
49
50
51 x = x >> 15 | x << 17;
52 x += FramedSnappyCompressorInputStream.MASK_OFFSET;
53 x &= 0xffffFFFFL;
54 return x;
55 }
56
57 private final OutputStream out;
58 private final Parameters params;
59 private final PureJavaCrc32C checksum = new PureJavaCrc32C();
60
61 private final byte[] oneByte = new byte[1];
62 private final byte[] buffer = new byte[MAX_COMPRESSED_BUFFER_SIZE];
63
64 private int currentIndex;
65
66 private final ByteUtils.ByteConsumer consumer;
67
68
69
70
71
72
73
74 public FramedSnappyCompressorOutputStream(final OutputStream out) throws IOException {
75 this(out, SnappyCompressorOutputStream.createParameterBuilder(SnappyCompressorInputStream.DEFAULT_BLOCK_SIZE).build());
76 }
77
78
79
80
81
82
83
84
85 public FramedSnappyCompressorOutputStream(final OutputStream out, final Parameters params) throws IOException {
86 this.out = out;
87 this.params = params;
88 consumer = new ByteUtils.OutputStreamByteConsumer(out);
89 out.write(FramedSnappyCompressorInputStream.SZ_SIGNATURE);
90 }
91
92 @Override
93 public void close() throws IOException {
94 try {
95 finish();
96 } finally {
97 out.close();
98 }
99 }
100
101
102
103
104
105
106 public void finish() throws IOException {
107 flushBuffer();
108 }
109
110 private void flushBuffer() throws IOException {
111 if (currentIndex == 0) {
112 return;
113 }
114 out.write(FramedSnappyCompressorInputStream.COMPRESSED_CHUNK_TYPE);
115 final ByteArrayOutputStream baos = new ByteArrayOutputStream();
116 try (OutputStream o = new SnappyCompressorOutputStream(baos, currentIndex, params)) {
117 o.write(buffer, 0, currentIndex);
118 }
119 final byte[] b = baos.toByteArray();
120 writeLittleEndian(3, b.length + 4L );
121 writeCrc();
122 out.write(b);
123 currentIndex = 0;
124 }
125
126 @Override
127 public void write(final byte[] data, int off, int len) throws IOException {
128 int blockDataRemaining = buffer.length - currentIndex;
129 while (len > 0) {
130 final int copyLen = Math.min(len, blockDataRemaining);
131 System.arraycopy(data, off, buffer, currentIndex, copyLen);
132 off += copyLen;
133 blockDataRemaining -= copyLen;
134 len -= copyLen;
135 currentIndex += copyLen;
136 if (blockDataRemaining == 0) {
137 flushBuffer();
138 blockDataRemaining = buffer.length;
139 }
140 }
141 }
142
143 @Override
144 public void write(final int b) throws IOException {
145 oneByte[0] = (byte) (b & 0xff);
146 write(oneByte);
147 }
148
149 private void writeCrc() throws IOException {
150 checksum.update(buffer, 0, currentIndex);
151 writeLittleEndian(4, mask(checksum.getValue()));
152 checksum.reset();
153 }
154
155 private void writeLittleEndian(final int numBytes, final long num) throws IOException {
156 ByteUtils.toLittleEndian(consumer, num, numBytes);
157 }
158 }