UnsynchronizedBufferedInputStream.java

  1. /*
  2.  *  Licensed to the Apache Software Foundation (ASF) under one or more
  3.  *  contributor license agreements.  See the NOTICE file distributed with
  4.  *  this work for additional information regarding copyright ownership.
  5.  *  The ASF licenses this file to You under the Apache License, Version 2.0
  6.  *  (the "License"); you may not use this file except in compliance with
  7.  *  the License.  You may obtain a copy of the License at
  8.  *
  9.  *     http://www.apache.org/licenses/LICENSE-2.0
  10.  *
  11.  *  Unless required by applicable law or agreed to in writing, software
  12.  *  distributed under the License is distributed on an "AS IS" BASIS,
  13.  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14.  *  See the License for the specific language governing permissions and
  15.  *  limitations under the License.
  16.  */

  17. package org.apache.commons.io.input;

  18. import java.io.BufferedInputStream;
  19. import java.io.IOException;
  20. import java.io.InputStream;

  21. import org.apache.commons.io.IOUtils;
  22. import org.apache.commons.io.build.AbstractStreamBuilder;

  23. /**
  24.  * An unsynchronized version of {@link BufferedInputStream}, not thread-safe.
  25.  * <p>
  26.  * Wraps an existing {@link InputStream} and <em>buffers</em> the input. Expensive interaction with the underlying input stream is minimized, since most
  27.  * (smaller) requests can be satisfied by accessing the buffer alone. The drawback is that some extra space is required to hold the buffer and that copying
  28.  * takes place when filling that buffer, but this is usually outweighed by the performance benefits.
  29.  * </p>
  30.  * <p>
  31.  * To build an instance, use {@link Builder}.
  32.  * </p>
  33.  * <p>
  34.  * A typical application pattern for the class looks like this:
  35.  * </p>
  36.  *
  37.  * <pre>
  38.  * UnsynchronizedBufferedInputStream s = new UnsynchronizedBufferedInputStream.Builder().
  39.  *   .setInputStream(new FileInputStream(&quot;file.java&quot;))
  40.  *   .setBufferSize(8192)
  41.  *   .get();
  42.  * </pre>
  43.  * <p>
  44.  * Provenance: Apache Harmony and modified.
  45.  * </p>
  46.  *
  47.  * @see Builder
  48.  * @see BufferedInputStream
  49.  * @since 2.12.0
  50.  */
  51. //@NotThreadSafe
  52. public final class UnsynchronizedBufferedInputStream extends UnsynchronizedFilterInputStream {

  53.     // @formatter:off
  54.     /**
  55.      * Builds a new {@link UnsynchronizedBufferedInputStream}.
  56.      *
  57.      * <p>
  58.      * Using File IO:
  59.      * </p>
  60.      * <pre>{@code
  61.      * UnsynchronizedBufferedInputStream s = UnsynchronizedBufferedInputStream.builder()
  62.      *   .setFile(file)
  63.      *   .setBufferSize(8192)
  64.      *   .get();}
  65.      * </pre>
  66.      * <p>
  67.      * Using NIO Path:
  68.      * </p>
  69.      * <pre>{@code
  70.      * UnsynchronizedBufferedInputStream s = UnsynchronizedBufferedInputStream.builder()
  71.      *   .setPath(path)
  72.      *   .setBufferSize(8192)
  73.      *   .get();}
  74.      * </pre>
  75.      *
  76.      * @see #get()
  77.      */
  78.     // @formatter:on
  79.     public static class Builder extends AbstractStreamBuilder<UnsynchronizedBufferedInputStream, Builder> {

  80.         /**
  81.          * Constructs a builder of {@link UnsynchronizedBufferedInputStream}.
  82.          */
  83.         public Builder() {
  84.             // empty
  85.         }

  86.         /**
  87.          * Builds a new {@link UnsynchronizedBufferedInputStream}.
  88.          * <p>
  89.          * You must set an aspect that supports {@link #getInputStream()} on this builder, otherwise, this method throws an exception.
  90.          * </p>
  91.          * <p>
  92.          * This builder uses the following aspects:
  93.          * </p>
  94.          * <ul>
  95.          * <li>{@link #getInputStream()}</li>
  96.          * <li>{@link #getBufferSize()}</li>
  97.          * </ul>
  98.          *
  99.          * @return a new instance.
  100.          * @throws IllegalStateException         if the {@code origin} is {@code null}.
  101.          * @throws UnsupportedOperationException if the origin cannot be converted to an {@link InputStream}.
  102.          * @throws IOException                   if an I/O error occurs converting to an {@link InputStream} using {@link #getInputStream()}.
  103.          * @see #getInputStream()
  104.          * @see #getBufferSize()
  105.          * @see #getUnchecked()
  106.          */
  107.         @Override
  108.         public UnsynchronizedBufferedInputStream get() throws IOException {
  109.             return new UnsynchronizedBufferedInputStream(getInputStream(), getBufferSize());
  110.         }

  111.     }

  112.     /**
  113.      * The buffer containing the current bytes read from the target InputStream.
  114.      */
  115.     protected volatile byte[] buffer;

  116.     /**
  117.      * The total number of bytes inside the byte array {@code buffer}.
  118.      */
  119.     protected int count;

  120.     /**
  121.      * The current limit, which when passed, invalidates the current mark.
  122.      */
  123.     protected int markLimit;

  124.     /**
  125.      * The currently marked position. -1 indicates no mark has been set or the mark has been invalidated.
  126.      */
  127.     protected int markPos = IOUtils.EOF;

  128.     /**
  129.      * The current position within the byte array {@code buffer}.
  130.      */
  131.     protected int pos;

  132.     /**
  133.      * Constructs a new {@code BufferedInputStream} on the {@link InputStream} {@code in}. The buffer size is specified by the parameter {@code size} and all
  134.      * reads are now filtered through this stream.
  135.      *
  136.      * @param in   the input stream the buffer reads from.
  137.      * @param size the size of buffer to allocate.
  138.      * @throws IllegalArgumentException if {@code size < 0}.
  139.      */
  140.     private UnsynchronizedBufferedInputStream(final InputStream in, final int size) {
  141.         super(in);
  142.         if (size <= 0) {
  143.             throw new IllegalArgumentException("Size must be > 0");
  144.         }
  145.         buffer = new byte[size];
  146.     }

  147.     /**
  148.      * Returns the number of bytes that are available before this stream will block. This method returns the number of bytes available in the buffer plus those
  149.      * available in the source stream.
  150.      *
  151.      * @return the number of bytes available before blocking.
  152.      * @throws IOException if this stream is closed.
  153.      */
  154.     @Override
  155.     public int available() throws IOException {
  156.         final InputStream localIn = inputStream; // 'in' could be invalidated by close()
  157.         if (buffer == null || localIn == null) {
  158.             throw new IOException("Stream is closed");
  159.         }
  160.         return count - pos + localIn.available();
  161.     }

  162.     /**
  163.      * Closes this stream. The source stream is closed and any resources associated with it are released.
  164.      *
  165.      * @throws IOException if an error occurs while closing this stream.
  166.      */
  167.     @Override
  168.     public void close() throws IOException {
  169.         buffer = null;
  170.         final InputStream localIn = inputStream;
  171.         inputStream = null;
  172.         if (localIn != null) {
  173.             localIn.close();
  174.         }
  175.     }

  176.     private int fillBuffer(final InputStream localIn, byte[] localBuf) throws IOException {
  177.         if (markPos == IOUtils.EOF || pos - markPos >= markLimit) {
  178.             /* Mark position not set or exceeded readLimit */
  179.             final int result = localIn.read(localBuf);
  180.             if (result > 0) {
  181.                 markPos = IOUtils.EOF;
  182.                 pos = 0;
  183.                 count = result;
  184.             }
  185.             return result;
  186.         }
  187.         if (markPos == 0 && markLimit > localBuf.length) {
  188.             /* Increase buffer size to accommodate the readLimit */
  189.             int newLength = localBuf.length * 2;
  190.             if (newLength > markLimit) {
  191.                 newLength = markLimit;
  192.             }
  193.             final byte[] newbuf = new byte[newLength];
  194.             System.arraycopy(localBuf, 0, newbuf, 0, localBuf.length);
  195.             // Reassign buffer, which will invalidate any local references
  196.             // FIXME: what if buffer was null?
  197.             localBuf = buffer = newbuf;
  198.         } else if (markPos > 0) {
  199.             System.arraycopy(localBuf, markPos, localBuf, 0, localBuf.length - markPos);
  200.         }
  201.         // Set the new position and mark position
  202.         pos -= markPos;
  203.         count = markPos = 0;
  204.         final int bytesread = localIn.read(localBuf, pos, localBuf.length - pos);
  205.         count = bytesread <= 0 ? pos : pos + bytesread;
  206.         return bytesread;
  207.     }

  208.     byte[] getBuffer() {
  209.         return buffer;
  210.     }

  211.     /**
  212.      * Sets a mark position in this stream. The parameter {@code readLimit} indicates how many bytes can be read before a mark is invalidated. Calling
  213.      * {@code reset()} will reposition the stream back to the marked position if {@code readLimit} has not been surpassed. The underlying buffer may be
  214.      * increased in size to allow {@code readLimit} number of bytes to be supported.
  215.      *
  216.      * @param readLimit the number of bytes that can be read before the mark is invalidated.
  217.      * @see #reset()
  218.      */
  219.     @Override
  220.     public void mark(final int readLimit) {
  221.         markLimit = readLimit;
  222.         markPos = pos;
  223.     }

  224.     /**
  225.      * Indicates whether {@code BufferedInputStream} supports the {@code mark()} and {@code reset()} methods.
  226.      *
  227.      * @return {@code true} for BufferedInputStreams.
  228.      * @see #mark(int)
  229.      * @see #reset()
  230.      */
  231.     @Override
  232.     public boolean markSupported() {
  233.         return true;
  234.     }

  235.     /**
  236.      * Reads a single byte from this stream and returns it as an integer in the range from 0 to 255. Returns -1 if the end of the source string has been
  237.      * reached. If the internal buffer does not contain any available bytes then it is filled from the source stream and the first byte is returned.
  238.      *
  239.      * @return the byte read or -1 if the end of the source stream has been reached.
  240.      * @throws IOException if this stream is closed or another IOException occurs.
  241.      */
  242.     @Override
  243.     public int read() throws IOException {
  244.         // Use local refs since buf and in may be invalidated by an
  245.         // unsynchronized close()
  246.         byte[] localBuf = buffer;
  247.         final InputStream localIn = inputStream;
  248.         if (localBuf == null || localIn == null) {
  249.             throw new IOException("Stream is closed");
  250.         }

  251.         /* Are there buffered bytes available? */
  252.         if (pos >= count && fillBuffer(localIn, localBuf) == IOUtils.EOF) {
  253.             return IOUtils.EOF; /* no, fill buffer */
  254.         }
  255.         // localBuf may have been invalidated by fillbuf
  256.         if (localBuf != buffer) {
  257.             localBuf = buffer;
  258.             if (localBuf == null) {
  259.                 throw new IOException("Stream is closed");
  260.             }
  261.         }

  262.         /* Did filling the buffer fail with -1 (EOF)? */
  263.         if (count - pos > 0) {
  264.             return localBuf[pos++] & 0xFF;
  265.         }
  266.         return IOUtils.EOF;
  267.     }

  268.     /**
  269.      * Reads at most {@code length} bytes from this stream and stores them in byte array {@code buffer} starting at offset {@code offset}. Returns the number of
  270.      * bytes actually read or -1 if no bytes were read and the end of the stream was encountered. If all the buffered bytes have been used, a mark has not been
  271.      * set and the requested number of bytes is larger than the receiver's buffer size, this implementation bypasses the buffer and simply places the results
  272.      * directly into {@code buffer}.
  273.      *
  274.      * @param dest the byte array in which to store the bytes read.
  275.      * @param offset the initial position in {@code buffer} to store the bytes read from this stream.
  276.      * @param length the maximum number of bytes to store in {@code buffer}.
  277.      * @return the number of bytes actually read or -1 if end of stream.
  278.      * @throws IndexOutOfBoundsException if {@code offset < 0} or {@code length < 0}, or if {@code offset + length} is greater than the size of {@code buffer}.
  279.      * @throws IOException               if the stream is already closed or another IOException occurs.
  280.      */
  281.     @Override
  282.     public int read(final byte[] dest, int offset, final int length) throws IOException {
  283.         // Use local ref since buf may be invalidated by an unsynchronized
  284.         // close()
  285.         byte[] localBuf = buffer;
  286.         if (localBuf == null) {
  287.             throw new IOException("Stream is closed");
  288.         }
  289.         // avoid int overflow
  290.         if (offset > dest.length - length || offset < 0 || length < 0) {
  291.             throw new IndexOutOfBoundsException();
  292.         }
  293.         if (length == 0) {
  294.             return 0;
  295.         }
  296.         final InputStream localIn = inputStream;
  297.         if (localIn == null) {
  298.             throw new IOException("Stream is closed");
  299.         }

  300.         int required;
  301.         if (pos < count) {
  302.             /* There are bytes available in the buffer. */
  303.             final int copylength = count - pos >= length ? length : count - pos;
  304.             System.arraycopy(localBuf, pos, dest, offset, copylength);
  305.             pos += copylength;
  306.             if (copylength == length || localIn.available() == 0) {
  307.                 return copylength;
  308.             }
  309.             offset += copylength;
  310.             required = length - copylength;
  311.         } else {
  312.             required = length;
  313.         }

  314.         while (true) {
  315.             final int read;
  316.             /*
  317.              * If we're not marked and the required size is greater than the buffer, simply read the bytes directly bypassing the buffer.
  318.              */
  319.             if (markPos == IOUtils.EOF && required >= localBuf.length) {
  320.                 read = localIn.read(dest, offset, required);
  321.                 if (read == IOUtils.EOF) {
  322.                     return required == length ? IOUtils.EOF : length - required;
  323.                 }
  324.             } else {
  325.                 if (fillBuffer(localIn, localBuf) == IOUtils.EOF) {
  326.                     return required == length ? IOUtils.EOF : length - required;
  327.                 }
  328.                 // localBuf may have been invalidated by fillBuffer()
  329.                 if (localBuf != buffer) {
  330.                     localBuf = buffer;
  331.                     if (localBuf == null) {
  332.                         throw new IOException("Stream is closed");
  333.                     }
  334.                 }

  335.                 read = count - pos >= required ? required : count - pos;
  336.                 System.arraycopy(localBuf, pos, dest, offset, read);
  337.                 pos += read;
  338.             }
  339.             required -= read;
  340.             if (required == 0) {
  341.                 return length;
  342.             }
  343.             if (localIn.available() == 0) {
  344.                 return length - required;
  345.             }
  346.             offset += read;
  347.         }
  348.     }

  349.     /**
  350.      * Resets this stream to the last marked location.
  351.      *
  352.      * @throws IOException if this stream is closed, no mark has been set or the mark is no longer valid because more than {@code readLimit} bytes have been
  353.      *                     read since setting the mark.
  354.      * @see #mark(int)
  355.      */
  356.     @Override
  357.     public void reset() throws IOException {
  358.         if (buffer == null) {
  359.             throw new IOException("Stream is closed");
  360.         }
  361.         if (IOUtils.EOF == markPos) {
  362.             throw new IOException("Mark has been invalidated");
  363.         }
  364.         pos = markPos;
  365.     }

  366.     /**
  367.      * Skips {@code amount} number of bytes in this stream. Subsequent {@code read()}'s will not return these bytes unless {@code reset()} is used.
  368.      *
  369.      * @param amount the number of bytes to skip. {@code skip} does nothing and returns 0 if {@code amount} is less than zero.
  370.      * @return the number of bytes actually skipped.
  371.      * @throws IOException if this stream is closed or another IOException occurs.
  372.      */
  373.     @Override
  374.     public long skip(final long amount) throws IOException {
  375.         // Use local refs since buf and in may be invalidated by an
  376.         // unsynchronized close()
  377.         final byte[] localBuf = buffer;
  378.         final InputStream localIn = inputStream;
  379.         if (localBuf == null) {
  380.             throw new IOException("Stream is closed");
  381.         }
  382.         if (amount < 1) {
  383.             return 0;
  384.         }
  385.         if (localIn == null) {
  386.             throw new IOException("Stream is closed");
  387.         }

  388.         if (count - pos >= amount) {
  389.             // (int count - int pos) here is always an int so amount is also in the int range if the above test is true.
  390.             // We can safely cast to int and avoid static analysis warnings.
  391.             pos += (int) amount;
  392.             return amount;
  393.         }
  394.         int read = count - pos;
  395.         pos = count;

  396.         if (markPos != IOUtils.EOF && amount <= markLimit) {
  397.             if (fillBuffer(localIn, localBuf) == IOUtils.EOF) {
  398.                 return read;
  399.             }
  400.             if (count - pos >= amount - read) {
  401.                 // (int count - int pos) here is always an int so (amount - read) is also in the int range if the above test is true.
  402.                 // We can safely cast to int and avoid static analysis warnings.
  403.                 pos += (int) amount - read;
  404.                 return amount;
  405.             }
  406.             // Couldn't get all the bytes, skip what we read
  407.             read += count - pos;
  408.             pos = count;
  409.             return read;
  410.         }
  411.         return read + localIn.skip(amount - read);
  412.     }
  413. }