UnsynchronizedBufferedInputStream.java
- /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package org.apache.commons.io.input;
- import java.io.BufferedInputStream;
- import java.io.IOException;
- import java.io.InputStream;
- import org.apache.commons.io.IOUtils;
- import org.apache.commons.io.build.AbstractStreamBuilder;
- /**
- * An unsynchronized version of {@link BufferedInputStream}, not thread-safe.
- * <p>
- * Wraps an existing {@link InputStream} and <em>buffers</em> the input. Expensive interaction with the underlying input stream is minimized, since most
- * (smaller) requests can be satisfied by accessing the buffer alone. The drawback is that some extra space is required to hold the buffer and that copying
- * takes place when filling that buffer, but this is usually outweighed by the performance benefits.
- * </p>
- * <p>
- * To build an instance, use {@link Builder}.
- * </p>
- * <p>
- * A typical application pattern for the class looks like this:
- * </p>
- *
- * <pre>
- * UnsynchronizedBufferedInputStream s = new UnsynchronizedBufferedInputStream.Builder().
- * .setInputStream(new FileInputStream("file.java"))
- * .setBufferSize(8192)
- * .get();
- * </pre>
- * <p>
- * Provenance: Apache Harmony and modified.
- * </p>
- *
- * @see Builder
- * @see BufferedInputStream
- * @since 2.12.0
- */
- //@NotThreadSafe
- public final class UnsynchronizedBufferedInputStream extends UnsynchronizedFilterInputStream {
- // @formatter:off
- /**
- * Builds a new {@link UnsynchronizedBufferedInputStream}.
- *
- * <p>
- * Using File IO:
- * </p>
- * <pre>{@code
- * UnsynchronizedBufferedInputStream s = UnsynchronizedBufferedInputStream.builder()
- * .setFile(file)
- * .setBufferSize(8192)
- * .get();}
- * </pre>
- * <p>
- * Using NIO Path:
- * </p>
- * <pre>{@code
- * UnsynchronizedBufferedInputStream s = UnsynchronizedBufferedInputStream.builder()
- * .setPath(path)
- * .setBufferSize(8192)
- * .get();}
- * </pre>
- *
- * @see #get()
- */
- // @formatter:on
- public static class Builder extends AbstractStreamBuilder<UnsynchronizedBufferedInputStream, Builder> {
- /**
- * Constructs a builder of {@link UnsynchronizedBufferedInputStream}.
- */
- public Builder() {
- // empty
- }
- /**
- * Builds a new {@link UnsynchronizedBufferedInputStream}.
- * <p>
- * You must set an aspect that supports {@link #getInputStream()} on this builder, otherwise, this method throws an exception.
- * </p>
- * <p>
- * This builder uses the following aspects:
- * </p>
- * <ul>
- * <li>{@link #getInputStream()}</li>
- * <li>{@link #getBufferSize()}</li>
- * </ul>
- *
- * @return a new instance.
- * @throws IllegalStateException if the {@code origin} is {@code null}.
- * @throws UnsupportedOperationException if the origin cannot be converted to an {@link InputStream}.
- * @throws IOException if an I/O error occurs converting to an {@link InputStream} using {@link #getInputStream()}.
- * @see #getInputStream()
- * @see #getBufferSize()
- * @see #getUnchecked()
- */
- @Override
- public UnsynchronizedBufferedInputStream get() throws IOException {
- return new UnsynchronizedBufferedInputStream(getInputStream(), getBufferSize());
- }
- }
- /**
- * The buffer containing the current bytes read from the target InputStream.
- */
- protected volatile byte[] buffer;
- /**
- * The total number of bytes inside the byte array {@code buffer}.
- */
- protected int count;
- /**
- * The current limit, which when passed, invalidates the current mark.
- */
- protected int markLimit;
- /**
- * The currently marked position. -1 indicates no mark has been set or the mark has been invalidated.
- */
- protected int markPos = IOUtils.EOF;
- /**
- * The current position within the byte array {@code buffer}.
- */
- protected int pos;
- /**
- * Constructs a new {@code BufferedInputStream} on the {@link InputStream} {@code in}. The buffer size is specified by the parameter {@code size} and all
- * reads are now filtered through this stream.
- *
- * @param in the input stream the buffer reads from.
- * @param size the size of buffer to allocate.
- * @throws IllegalArgumentException if {@code size < 0}.
- */
- private UnsynchronizedBufferedInputStream(final InputStream in, final int size) {
- super(in);
- if (size <= 0) {
- throw new IllegalArgumentException("Size must be > 0");
- }
- buffer = new byte[size];
- }
- /**
- * Returns the number of bytes that are available before this stream will block. This method returns the number of bytes available in the buffer plus those
- * available in the source stream.
- *
- * @return the number of bytes available before blocking.
- * @throws IOException if this stream is closed.
- */
- @Override
- public int available() throws IOException {
- final InputStream localIn = inputStream; // 'in' could be invalidated by close()
- if (buffer == null || localIn == null) {
- throw new IOException("Stream is closed");
- }
- return count - pos + localIn.available();
- }
- /**
- * Closes this stream. The source stream is closed and any resources associated with it are released.
- *
- * @throws IOException if an error occurs while closing this stream.
- */
- @Override
- public void close() throws IOException {
- buffer = null;
- final InputStream localIn = inputStream;
- inputStream = null;
- if (localIn != null) {
- localIn.close();
- }
- }
- private int fillBuffer(final InputStream localIn, byte[] localBuf) throws IOException {
- if (markPos == IOUtils.EOF || pos - markPos >= markLimit) {
- /* Mark position not set or exceeded readLimit */
- final int result = localIn.read(localBuf);
- if (result > 0) {
- markPos = IOUtils.EOF;
- pos = 0;
- count = result;
- }
- return result;
- }
- if (markPos == 0 && markLimit > localBuf.length) {
- /* Increase buffer size to accommodate the readLimit */
- int newLength = localBuf.length * 2;
- if (newLength > markLimit) {
- newLength = markLimit;
- }
- final byte[] newbuf = new byte[newLength];
- System.arraycopy(localBuf, 0, newbuf, 0, localBuf.length);
- // Reassign buffer, which will invalidate any local references
- // FIXME: what if buffer was null?
- localBuf = buffer = newbuf;
- } else if (markPos > 0) {
- System.arraycopy(localBuf, markPos, localBuf, 0, localBuf.length - markPos);
- }
- // Set the new position and mark position
- pos -= markPos;
- count = markPos = 0;
- final int bytesread = localIn.read(localBuf, pos, localBuf.length - pos);
- count = bytesread <= 0 ? pos : pos + bytesread;
- return bytesread;
- }
- byte[] getBuffer() {
- return buffer;
- }
- /**
- * Sets a mark position in this stream. The parameter {@code readLimit} indicates how many bytes can be read before a mark is invalidated. Calling
- * {@code reset()} will reposition the stream back to the marked position if {@code readLimit} has not been surpassed. The underlying buffer may be
- * increased in size to allow {@code readLimit} number of bytes to be supported.
- *
- * @param readLimit the number of bytes that can be read before the mark is invalidated.
- * @see #reset()
- */
- @Override
- public void mark(final int readLimit) {
- markLimit = readLimit;
- markPos = pos;
- }
- /**
- * Indicates whether {@code BufferedInputStream} supports the {@code mark()} and {@code reset()} methods.
- *
- * @return {@code true} for BufferedInputStreams.
- * @see #mark(int)
- * @see #reset()
- */
- @Override
- public boolean markSupported() {
- return true;
- }
- /**
- * Reads a single byte from this stream and returns it as an integer in the range from 0 to 255. Returns -1 if the end of the source string has been
- * reached. If the internal buffer does not contain any available bytes then it is filled from the source stream and the first byte is returned.
- *
- * @return the byte read or -1 if the end of the source stream has been reached.
- * @throws IOException if this stream is closed or another IOException occurs.
- */
- @Override
- public int read() throws IOException {
- // Use local refs since buf and in may be invalidated by an
- // unsynchronized close()
- byte[] localBuf = buffer;
- final InputStream localIn = inputStream;
- if (localBuf == null || localIn == null) {
- throw new IOException("Stream is closed");
- }
- /* Are there buffered bytes available? */
- if (pos >= count && fillBuffer(localIn, localBuf) == IOUtils.EOF) {
- return IOUtils.EOF; /* no, fill buffer */
- }
- // localBuf may have been invalidated by fillbuf
- if (localBuf != buffer) {
- localBuf = buffer;
- if (localBuf == null) {
- throw new IOException("Stream is closed");
- }
- }
- /* Did filling the buffer fail with -1 (EOF)? */
- if (count - pos > 0) {
- return localBuf[pos++] & 0xFF;
- }
- return IOUtils.EOF;
- }
- /**
- * Reads at most {@code length} bytes from this stream and stores them in byte array {@code buffer} starting at offset {@code offset}. Returns the number of
- * bytes actually read or -1 if no bytes were read and the end of the stream was encountered. If all the buffered bytes have been used, a mark has not been
- * set and the requested number of bytes is larger than the receiver's buffer size, this implementation bypasses the buffer and simply places the results
- * directly into {@code buffer}.
- *
- * @param dest the byte array in which to store the bytes read.
- * @param offset the initial position in {@code buffer} to store the bytes read from this stream.
- * @param length the maximum number of bytes to store in {@code buffer}.
- * @return the number of bytes actually read or -1 if end of stream.
- * @throws IndexOutOfBoundsException if {@code offset < 0} or {@code length < 0}, or if {@code offset + length} is greater than the size of {@code buffer}.
- * @throws IOException if the stream is already closed or another IOException occurs.
- */
- @Override
- public int read(final byte[] dest, int offset, final int length) throws IOException {
- // Use local ref since buf may be invalidated by an unsynchronized
- // close()
- byte[] localBuf = buffer;
- if (localBuf == null) {
- throw new IOException("Stream is closed");
- }
- // avoid int overflow
- if (offset > dest.length - length || offset < 0 || length < 0) {
- throw new IndexOutOfBoundsException();
- }
- if (length == 0) {
- return 0;
- }
- final InputStream localIn = inputStream;
- if (localIn == null) {
- throw new IOException("Stream is closed");
- }
- int required;
- if (pos < count) {
- /* There are bytes available in the buffer. */
- final int copylength = count - pos >= length ? length : count - pos;
- System.arraycopy(localBuf, pos, dest, offset, copylength);
- pos += copylength;
- if (copylength == length || localIn.available() == 0) {
- return copylength;
- }
- offset += copylength;
- required = length - copylength;
- } else {
- required = length;
- }
- while (true) {
- final int read;
- /*
- * If we're not marked and the required size is greater than the buffer, simply read the bytes directly bypassing the buffer.
- */
- if (markPos == IOUtils.EOF && required >= localBuf.length) {
- read = localIn.read(dest, offset, required);
- if (read == IOUtils.EOF) {
- return required == length ? IOUtils.EOF : length - required;
- }
- } else {
- if (fillBuffer(localIn, localBuf) == IOUtils.EOF) {
- return required == length ? IOUtils.EOF : length - required;
- }
- // localBuf may have been invalidated by fillBuffer()
- if (localBuf != buffer) {
- localBuf = buffer;
- if (localBuf == null) {
- throw new IOException("Stream is closed");
- }
- }
- read = count - pos >= required ? required : count - pos;
- System.arraycopy(localBuf, pos, dest, offset, read);
- pos += read;
- }
- required -= read;
- if (required == 0) {
- return length;
- }
- if (localIn.available() == 0) {
- return length - required;
- }
- offset += read;
- }
- }
- /**
- * Resets this stream to the last marked location.
- *
- * @throws IOException if this stream is closed, no mark has been set or the mark is no longer valid because more than {@code readLimit} bytes have been
- * read since setting the mark.
- * @see #mark(int)
- */
- @Override
- public void reset() throws IOException {
- if (buffer == null) {
- throw new IOException("Stream is closed");
- }
- if (IOUtils.EOF == markPos) {
- throw new IOException("Mark has been invalidated");
- }
- pos = markPos;
- }
- /**
- * Skips {@code amount} number of bytes in this stream. Subsequent {@code read()}'s will not return these bytes unless {@code reset()} is used.
- *
- * @param amount the number of bytes to skip. {@code skip} does nothing and returns 0 if {@code amount} is less than zero.
- * @return the number of bytes actually skipped.
- * @throws IOException if this stream is closed or another IOException occurs.
- */
- @Override
- public long skip(final long amount) throws IOException {
- // Use local refs since buf and in may be invalidated by an
- // unsynchronized close()
- final byte[] localBuf = buffer;
- final InputStream localIn = inputStream;
- if (localBuf == null) {
- throw new IOException("Stream is closed");
- }
- if (amount < 1) {
- return 0;
- }
- if (localIn == null) {
- throw new IOException("Stream is closed");
- }
- if (count - pos >= amount) {
- // (int count - int pos) here is always an int so amount is also in the int range if the above test is true.
- // We can safely cast to int and avoid static analysis warnings.
- pos += (int) amount;
- return amount;
- }
- int read = count - pos;
- pos = count;
- if (markPos != IOUtils.EOF && amount <= markLimit) {
- if (fillBuffer(localIn, localBuf) == IOUtils.EOF) {
- return read;
- }
- if (count - pos >= amount - read) {
- // (int count - int pos) here is always an int so (amount - read) is also in the int range if the above test is true.
- // We can safely cast to int and avoid static analysis warnings.
- pos += (int) amount - read;
- return amount;
- }
- // Couldn't get all the bytes, skip what we read
- read += count - pos;
- pos = count;
- return read;
- }
- return read + localIn.skip(amount - read);
- }
- }