001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   https://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.commons.compress.utils;
020
021import java.io.FileOutputStream;
022import java.io.IOException;
023import java.io.OutputStream;
024import java.nio.ByteBuffer;
025import java.nio.ByteOrder;
026import java.nio.channels.ClosedChannelException;
027import java.nio.channels.WritableByteChannel;
028import java.util.concurrent.atomic.AtomicBoolean;
029
030import org.apache.commons.io.IOUtils;
031
032/**
033 * This class supports writing to an OutputStream or WritableByteChannel in fixed length blocks.
034 * <p>
035 * It can be be used to support output to devices such as tape drives that require output in this format. If the final block does not have enough content to
036 * fill an entire block, the output will be padded to a full block size.
037 * </p>
038 *
039 * <p>
040 * This class can be used to support TAR,PAX, and CPIO blocked output to character special devices. It is not recommended that this class be used unless writing
041 * to such devices, as the padding serves no useful purpose in such cases.
042 * </p>
043 *
044 * <p>
045 * This class should normally wrap a FileOutputStream or associated WritableByteChannel directly. If there is an intervening filter that modified the output,
046 * such as a CompressorOutputStream, or performs its own buffering, such as BufferedOutputStream, output to the device may no longer be of the specified size.
047 * </p>
048 *
049 * <p>
050 * Any content written to this stream should be self-delimiting and should tolerate any padding added to fill the last block.
051 * </p>
052 *
053 * @since 1.15
054 */
055public class FixedLengthBlockOutputStream extends OutputStream implements WritableByteChannel {
056
057    /**
058     * Helper class to provide channel wrapper for arbitrary output stream that doesn't alter the size of writes. We can't use Channels.newChannel, because for
059     * non FileOutputStreams, it breaks up writes into 8KB max chunks. Since the purpose of this class is to always write complete blocks, we need to write a
060     * simple class to take care of it.
061     */
062    private static final class BufferAtATimeOutputChannel implements WritableByteChannel {
063
064        private final OutputStream out;
065        private final AtomicBoolean closed = new AtomicBoolean();
066
067        private BufferAtATimeOutputChannel(final OutputStream out) {
068            this.out = out;
069        }
070
071        @Override
072        public void close() throws IOException {
073            if (closed.compareAndSet(false, true)) {
074                out.close();
075            }
076        }
077
078        @Override
079        public boolean isOpen() {
080            return !closed.get();
081        }
082
083        @Override
084        public int write(final ByteBuffer buffer) throws IOException {
085            if (!isOpen()) {
086                throw new ClosedChannelException();
087            }
088            if (!buffer.hasArray()) {
089                throw new IOException("Direct buffer somehow written to BufferAtATimeOutputChannel");
090            }
091
092            try {
093                final int pos = buffer.position();
094                final int len = buffer.limit() - pos;
095                out.write(buffer.array(), buffer.arrayOffset() + pos, len);
096                buffer.position(buffer.limit());
097                return len;
098            } catch (final IOException e) {
099                IOUtils.closeQuietly(this);
100                throw e;
101            }
102        }
103
104    }
105
106    private final WritableByteChannel out;
107    private final int blockSize;
108    private final ByteBuffer buffer;
109
110    private final AtomicBoolean closed = new AtomicBoolean();
111
112    /**
113     * Constructs a fixed length block output stream with given destination stream and block size.
114     *
115     * @param os        The stream to wrap.
116     * @param blockSize The block size to use.
117     */
118    public FixedLengthBlockOutputStream(final OutputStream os, final int blockSize) {
119        if (os instanceof FileOutputStream) {
120            final FileOutputStream fileOutputStream = (FileOutputStream) os;
121            out = fileOutputStream.getChannel();
122            buffer = ByteBuffer.allocateDirect(blockSize);
123        } else {
124            out = new BufferAtATimeOutputChannel(os);
125            buffer = ByteBuffer.allocate(blockSize);
126        }
127        this.blockSize = blockSize;
128    }
129
130    /**
131     * Constructs a fixed length block output stream with given destination writable byte channel and block size.
132     *
133     * @param out       The writable byte channel to wrap.
134     * @param blockSize The block size to use.
135     */
136    public FixedLengthBlockOutputStream(final WritableByteChannel out, final int blockSize) {
137        this.out = out;
138        this.blockSize = blockSize;
139        this.buffer = ByteBuffer.allocateDirect(blockSize);
140    }
141
142    @Override
143    public void close() throws IOException {
144        if (closed.compareAndSet(false, true)) {
145            try {
146                flushBlock();
147            } finally {
148                out.close();
149            }
150        }
151    }
152
153    /**
154     * Potentially pads and then writes the current block to the underlying stream.
155     *
156     * @throws IOException if writing fails
157     */
158    public void flushBlock() throws IOException {
159        if (buffer.position() != 0) {
160            padBlock();
161            writeBlock();
162        }
163    }
164
165    @Override
166    public boolean isOpen() {
167        if (!out.isOpen()) {
168            closed.set(true);
169        }
170        return !closed.get();
171    }
172
173    private void maybeFlush() throws IOException {
174        if (!buffer.hasRemaining()) {
175            writeBlock();
176        }
177    }
178
179    private void padBlock() {
180        buffer.order(ByteOrder.nativeOrder());
181        int bytesToWrite = buffer.remaining();
182        if (bytesToWrite > 8) {
183            final int align = buffer.position() & 7;
184            if (align != 0) {
185                final int limit = 8 - align;
186                for (int i = 0; i < limit; i++) {
187                    buffer.put((byte) 0);
188                }
189                bytesToWrite -= limit;
190            }
191
192            while (bytesToWrite >= 8) {
193                buffer.putLong(0L);
194                bytesToWrite -= 8;
195            }
196        }
197        while (buffer.hasRemaining()) {
198            buffer.put((byte) 0);
199        }
200    }
201
202    @Override
203    public void write(final byte[] b, final int offset, final int length) throws IOException {
204        if (!isOpen()) {
205            throw new ClosedChannelException();
206        }
207        int off = offset;
208        int len = length;
209        while (len > 0) {
210            final int n = Math.min(len, buffer.remaining());
211            buffer.put(b, off, n);
212            maybeFlush();
213            len -= n;
214            off += n;
215        }
216    }
217
218    @Override
219    public int write(final ByteBuffer src) throws IOException {
220        if (!isOpen()) {
221            throw new ClosedChannelException();
222        }
223        final int srcRemaining = src.remaining();
224        if (srcRemaining >= buffer.remaining()) {
225            int srcLeft = srcRemaining;
226            final int savedLimit = src.limit();
227            // If we're not at the start of buffer, we have some bytes already buffered
228            // fill up the reset of buffer and write the block.
229            if (buffer.position() != 0) {
230                final int n = buffer.remaining();
231                src.limit(src.position() + n);
232                buffer.put(src);
233                writeBlock();
234                srcLeft -= n;
235            }
236            // whilst we have enough bytes in src for complete blocks,
237            // write them directly from src without copying them to buffer
238            while (srcLeft >= blockSize) {
239                src.limit(src.position() + blockSize);
240                out.write(src);
241                srcLeft -= blockSize;
242            }
243            // copy any remaining bytes into buffer
244            src.limit(savedLimit);
245        }
246        // if we don't have enough bytes in src to fill up a block we must buffer
247        buffer.put(src);
248        return srcRemaining;
249    }
250
251    @Override
252    public void write(final int b) throws IOException {
253        if (!isOpen()) {
254            throw new ClosedChannelException();
255        }
256        buffer.put((byte) b);
257        maybeFlush();
258    }
259
260    private void writeBlock() throws IOException {
261        buffer.flip();
262        final int i = out.write(buffer);
263        final boolean hasRemaining = buffer.hasRemaining();
264        if (i != blockSize || hasRemaining) {
265            throw new IOException(String.format("Failed to write %,d bytes atomically. Only wrote  %,d", blockSize, i));
266        }
267        buffer.clear();
268    }
269
270}