1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.commons.compress.utils;
20
21 import java.io.FileOutputStream;
22 import java.io.IOException;
23 import java.io.OutputStream;
24 import java.nio.ByteBuffer;
25 import java.nio.ByteOrder;
26 import java.nio.channels.ClosedChannelException;
27 import java.nio.channels.WritableByteChannel;
28 import java.util.concurrent.atomic.AtomicBoolean;
29
30 import org.apache.commons.io.IOUtils;
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55 public class FixedLengthBlockOutputStream extends OutputStream implements WritableByteChannel {
56
57
58
59
60
61
62 private static final class BufferAtATimeOutputChannel implements WritableByteChannel {
63
64 private final OutputStream out;
65 private final AtomicBoolean closed = new AtomicBoolean();
66
67 private BufferAtATimeOutputChannel(final OutputStream out) {
68 this.out = out;
69 }
70
71 @Override
72 public void close() throws IOException {
73 if (closed.compareAndSet(false, true)) {
74 out.close();
75 }
76 }
77
78 @Override
79 public boolean isOpen() {
80 return !closed.get();
81 }
82
83 @Override
84 public int write(final ByteBuffer buffer) throws IOException {
85 if (!isOpen()) {
86 throw new ClosedChannelException();
87 }
88 if (!buffer.hasArray()) {
89 throw new IOException("Direct buffer somehow written to BufferAtATimeOutputChannel");
90 }
91
92 try {
93 final int pos = buffer.position();
94 final int len = buffer.limit() - pos;
95 out.write(buffer.array(), buffer.arrayOffset() + pos, len);
96 buffer.position(buffer.limit());
97 return len;
98 } catch (final IOException e) {
99 IOUtils.closeQuietly(this);
100 throw e;
101 }
102 }
103
104 }
105
106 private final WritableByteChannel out;
107 private final int blockSize;
108 private final ByteBuffer buffer;
109
110 private final AtomicBoolean closed = new AtomicBoolean();
111
112
113
114
115
116
117
118 public FixedLengthBlockOutputStream(final OutputStream os, final int blockSize) {
119 if (os instanceof FileOutputStream) {
120 final FileOutputStream fileOutputStream = (FileOutputStream) os;
121 out = fileOutputStream.getChannel();
122 buffer = ByteBuffer.allocateDirect(blockSize);
123 } else {
124 out = new BufferAtATimeOutputChannel(os);
125 buffer = ByteBuffer.allocate(blockSize);
126 }
127 this.blockSize = blockSize;
128 }
129
130
131
132
133
134
135
136 public FixedLengthBlockOutputStream(final WritableByteChannel out, final int blockSize) {
137 this.out = out;
138 this.blockSize = blockSize;
139 this.buffer = ByteBuffer.allocateDirect(blockSize);
140 }
141
142 @Override
143 public void close() throws IOException {
144 if (closed.compareAndSet(false, true)) {
145 try {
146 flushBlock();
147 } finally {
148 out.close();
149 }
150 }
151 }
152
153
154
155
156
157
158 public void flushBlock() throws IOException {
159 if (buffer.position() != 0) {
160 padBlock();
161 writeBlock();
162 }
163 }
164
165 @Override
166 public boolean isOpen() {
167 if (!out.isOpen()) {
168 closed.set(true);
169 }
170 return !closed.get();
171 }
172
173 private void maybeFlush() throws IOException {
174 if (!buffer.hasRemaining()) {
175 writeBlock();
176 }
177 }
178
179 private void padBlock() {
180 buffer.order(ByteOrder.nativeOrder());
181 int bytesToWrite = buffer.remaining();
182 if (bytesToWrite > 8) {
183 final int align = buffer.position() & 7;
184 if (align != 0) {
185 final int limit = 8 - align;
186 for (int i = 0; i < limit; i++) {
187 buffer.put((byte) 0);
188 }
189 bytesToWrite -= limit;
190 }
191
192 while (bytesToWrite >= 8) {
193 buffer.putLong(0L);
194 bytesToWrite -= 8;
195 }
196 }
197 while (buffer.hasRemaining()) {
198 buffer.put((byte) 0);
199 }
200 }
201
202 @Override
203 public void write(final byte[] b, final int offset, final int length) throws IOException {
204 if (!isOpen()) {
205 throw new ClosedChannelException();
206 }
207 int off = offset;
208 int len = length;
209 while (len > 0) {
210 final int n = Math.min(len, buffer.remaining());
211 buffer.put(b, off, n);
212 maybeFlush();
213 len -= n;
214 off += n;
215 }
216 }
217
218 @Override
219 public int write(final ByteBuffer src) throws IOException {
220 if (!isOpen()) {
221 throw new ClosedChannelException();
222 }
223 final int srcRemaining = src.remaining();
224 if (srcRemaining >= buffer.remaining()) {
225 int srcLeft = srcRemaining;
226 final int savedLimit = src.limit();
227
228
229 if (buffer.position() != 0) {
230 final int n = buffer.remaining();
231 src.limit(src.position() + n);
232 buffer.put(src);
233 writeBlock();
234 srcLeft -= n;
235 }
236
237
238 while (srcLeft >= blockSize) {
239 src.limit(src.position() + blockSize);
240 out.write(src);
241 srcLeft -= blockSize;
242 }
243
244 src.limit(savedLimit);
245 }
246
247 buffer.put(src);
248 return srcRemaining;
249 }
250
251 @Override
252 public void write(final int b) throws IOException {
253 if (!isOpen()) {
254 throw new ClosedChannelException();
255 }
256 buffer.put((byte) b);
257 maybeFlush();
258 }
259
260 private void writeBlock() throws IOException {
261 buffer.flip();
262 final int i = out.write(buffer);
263 final boolean hasRemaining = buffer.hasRemaining();
264 if (i != blockSize || hasRemaining) {
265 throw new IOException(String.format("Failed to write %,d bytes atomically. Only wrote %,d", blockSize, i));
266 }
267 buffer.clear();
268 }
269
270 }