001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.commons.compress.utils;
020
021import java.io.FileOutputStream;
022import java.io.IOException;
023import java.io.OutputStream;
024import java.nio.ByteBuffer;
025import java.nio.ByteOrder;
026import java.nio.channels.ClosedChannelException;
027import java.nio.channels.WritableByteChannel;
028import java.util.concurrent.atomic.AtomicBoolean;
029
030/**
031 * This class supports writing to an OutputStream or WritableByteChannel in fixed length blocks.
032 * <p>
033 * It can be be used to support output to devices such as tape drives that require output in this format. If the final block does not have enough content to
034 * fill an entire block, the output will be padded to a full block size.
035 * </p>
036 *
037 * <p>
038 * This class can be used to support TAR,PAX, and CPIO blocked output to character special devices. It is not recommended that this class be used unless writing
039 * to such devices, as the padding serves no useful purpose in such cases.
040 * </p>
041 *
042 * <p>
043 * This class should normally wrap a FileOutputStream or associated WritableByteChannel directly. If there is an intervening filter that modified the output,
044 * such as a CompressorOutputStream, or performs its own buffering, such as BufferedOutputStream, output to the device may no longer be of the specified size.
045 * </p>
046 *
047 * <p>
048 * Any content written to this stream should be self-delimiting and should tolerate any padding added to fill the last block.
049 * </p>
050 *
051 * @since 1.15
052 */
053public class FixedLengthBlockOutputStream extends OutputStream implements WritableByteChannel {
054
055    /**
056     * Helper class to provide channel wrapper for arbitrary output stream that doesn't alter the size of writes. We can't use Channels.newChannel, because for
057     * non FileOutputStreams, it breaks up writes into 8KB max chunks. Since the purpose of this class is to always write complete blocks, we need to write a
058     * simple class to take care of it.
059     */
060    private static final class BufferAtATimeOutputChannel implements WritableByteChannel {
061
062        private final OutputStream out;
063        private final AtomicBoolean closed = new AtomicBoolean();
064
065        private BufferAtATimeOutputChannel(final OutputStream out) {
066            this.out = out;
067        }
068
069        @Override
070        public void close() throws IOException {
071            if (closed.compareAndSet(false, true)) {
072                out.close();
073            }
074        }
075
076        @Override
077        public boolean isOpen() {
078            return !closed.get();
079        }
080
081        @Override
082        public int write(final ByteBuffer buffer) throws IOException {
083            if (!isOpen()) {
084                throw new ClosedChannelException();
085            }
086            if (!buffer.hasArray()) {
087                throw new IOException("Direct buffer somehow written to BufferAtATimeOutputChannel");
088            }
089
090            try {
091                final int pos = buffer.position();
092                final int len = buffer.limit() - pos;
093                out.write(buffer.array(), buffer.arrayOffset() + pos, len);
094                buffer.position(buffer.limit());
095                return len;
096            } catch (final IOException e) {
097                try {
098                    close();
099                } catch (final IOException ignored) { // NOSONAR
100                }
101                throw e;
102            }
103        }
104
105    }
106
107    private final WritableByteChannel out;
108    private final int blockSize;
109    private final ByteBuffer buffer;
110
111    private final AtomicBoolean closed = new AtomicBoolean();
112
113    /**
114     * Constructs a fixed length block output stream with given destination stream and block size.
115     *
116     * @param os        The stream to wrap.
117     * @param blockSize The block size to use.
118     */
119    public FixedLengthBlockOutputStream(final OutputStream os, final int blockSize) {
120        if (os instanceof FileOutputStream) {
121            final FileOutputStream fileOutputStream = (FileOutputStream) os;
122            out = fileOutputStream.getChannel();
123            buffer = ByteBuffer.allocateDirect(blockSize);
124        } else {
125            out = new BufferAtATimeOutputChannel(os);
126            buffer = ByteBuffer.allocate(blockSize);
127        }
128        this.blockSize = blockSize;
129    }
130
131    /**
132     * Constructs a fixed length block output stream with given destination writable byte channel and block size.
133     *
134     * @param out       The writable byte channel to wrap.
135     * @param blockSize The block size to use.
136     */
137    public FixedLengthBlockOutputStream(final WritableByteChannel out, final int blockSize) {
138        this.out = out;
139        this.blockSize = blockSize;
140        this.buffer = ByteBuffer.allocateDirect(blockSize);
141    }
142
143    @Override
144    public void close() throws IOException {
145        if (closed.compareAndSet(false, true)) {
146            try {
147                flushBlock();
148            } finally {
149                out.close();
150            }
151        }
152    }
153
154    /**
155     * Potentially pads and then writes the current block to the underlying stream.
156     *
157     * @throws IOException if writing fails
158     */
159    public void flushBlock() throws IOException {
160        if (buffer.position() != 0) {
161            padBlock();
162            writeBlock();
163        }
164    }
165
166    @Override
167    public boolean isOpen() {
168        if (!out.isOpen()) {
169            closed.set(true);
170        }
171        return !closed.get();
172    }
173
174    private void maybeFlush() throws IOException {
175        if (!buffer.hasRemaining()) {
176            writeBlock();
177        }
178    }
179
180    private void padBlock() {
181        buffer.order(ByteOrder.nativeOrder());
182        int bytesToWrite = buffer.remaining();
183        if (bytesToWrite > 8) {
184            final int align = buffer.position() & 7;
185            if (align != 0) {
186                final int limit = 8 - align;
187                for (int i = 0; i < limit; i++) {
188                    buffer.put((byte) 0);
189                }
190                bytesToWrite -= limit;
191            }
192
193            while (bytesToWrite >= 8) {
194                buffer.putLong(0L);
195                bytesToWrite -= 8;
196            }
197        }
198        while (buffer.hasRemaining()) {
199            buffer.put((byte) 0);
200        }
201    }
202
203    @Override
204    public void write(final byte[] b, final int offset, final int length) throws IOException {
205        if (!isOpen()) {
206            throw new ClosedChannelException();
207        }
208        int off = offset;
209        int len = length;
210        while (len > 0) {
211            final int n = Math.min(len, buffer.remaining());
212            buffer.put(b, off, n);
213            maybeFlush();
214            len -= n;
215            off += n;
216        }
217    }
218
219    @Override
220    public int write(final ByteBuffer src) throws IOException {
221        if (!isOpen()) {
222            throw new ClosedChannelException();
223        }
224        final int srcRemaining = src.remaining();
225
226        if (srcRemaining < buffer.remaining()) {
227            // if we don't have enough bytes in src to fill up a block we must buffer
228            buffer.put(src);
229        } else {
230            int srcLeft = srcRemaining;
231            final int savedLimit = src.limit();
232            // If we're not at the start of buffer, we have some bytes already buffered
233            // fill up the reset of buffer and write the block.
234            if (buffer.position() != 0) {
235                final int n = buffer.remaining();
236                src.limit(src.position() + n);
237                buffer.put(src);
238                writeBlock();
239                srcLeft -= n;
240            }
241            // whilst we have enough bytes in src for complete blocks,
242            // write them directly from src without copying them to buffer
243            while (srcLeft >= blockSize) {
244                src.limit(src.position() + blockSize);
245                out.write(src);
246                srcLeft -= blockSize;
247            }
248            // copy any remaining bytes into buffer
249            src.limit(savedLimit);
250            buffer.put(src);
251        }
252        return srcRemaining;
253    }
254
255    @Override
256    public void write(final int b) throws IOException {
257        if (!isOpen()) {
258            throw new ClosedChannelException();
259        }
260        buffer.put((byte) b);
261        maybeFlush();
262    }
263
264    private void writeBlock() throws IOException {
265        buffer.flip();
266        final int i = out.write(buffer);
267        final boolean hasRemaining = buffer.hasRemaining();
268        if (i != blockSize || hasRemaining) {
269            final String msg = String.format("Failed to write %,d bytes atomically. Only wrote  %,d", blockSize, i);
270            throw new IOException(msg);
271        }
272        buffer.clear();
273    }
274
275}