UnsynchronizedBufferedInputStream.java

  1. /*
  2.  *  Licensed to the Apache Software Foundation (ASF) under one or more
  3.  *  contributor license agreements.  See the NOTICE file distributed with
  4.  *  this work for additional information regarding copyright ownership.
  5.  *  The ASF licenses this file to You under the Apache License, Version 2.0
  6.  *  (the "License"); you may not use this file except in compliance with
  7.  *  the License.  You may obtain a copy of the License at
  8.  *
  9.  *     https://www.apache.org/licenses/LICENSE-2.0
  10.  *
  11.  *  Unless required by applicable law or agreed to in writing, software
  12.  *  distributed under the License is distributed on an "AS IS" BASIS,
  13.  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14.  *  See the License for the specific language governing permissions and
  15.  *  limitations under the License.
  16.  */

  17. package org.apache.commons.io.input;

  18. import java.io.BufferedInputStream;
  19. import java.io.IOException;
  20. import java.io.InputStream;

  21. import org.apache.commons.io.IOUtils;
  22. import org.apache.commons.io.build.AbstractStreamBuilder;

  23. /**
  24.  * An unsynchronized version of {@link BufferedInputStream}, not thread-safe.
  25.  * <p>
  26.  * Wraps an existing {@link InputStream} and <em>buffers</em> the input. Expensive interaction with the underlying input stream is minimized, since most
  27.  * (smaller) requests can be satisfied by accessing the buffer alone. The drawback is that some extra space is required to hold the buffer and that copying
  28.  * takes place when filling that buffer, but this is usually outweighed by the performance benefits.
  29.  * </p>
  30.  * <p>
  31.  * To build an instance, use {@link Builder}.
  32.  * </p>
  33.  * <p>
  34.  * A typical application pattern for the class looks like this:
  35.  * </p>
  36.  *
  37.  * <pre>
  38.  * UnsynchronizedBufferedInputStream s = new UnsynchronizedBufferedInputStream.Builder().
  39.  *   .setInputStream(new FileInputStream(&quot;file.java&quot;))
  40.  *   .setBufferSize(8192)
  41.  *   .get();
  42.  * </pre>
  43.  * <p>
  44.  * Provenance: Apache Harmony and modified.
  45.  * </p>
  46.  *
  47.  * @see Builder
  48.  * @see BufferedInputStream
  49.  * @since 2.12.0
  50.  */
  51. //@NotThreadSafe
  52. public final class UnsynchronizedBufferedInputStream extends UnsynchronizedFilterInputStream {

  53.     // @formatter:off
  54.     /**
  55.      * Builds a new {@link UnsynchronizedBufferedInputStream}.
  56.      *
  57.      * <p>
  58.      * Using File IO:
  59.      * </p>
  60.      * <pre>{@code
  61.      * UnsynchronizedBufferedInputStream s = UnsynchronizedBufferedInputStream.builder()
  62.      *   .setFile(file)
  63.      *   .setBufferSize(8192)
  64.      *   .get();}
  65.      * </pre>
  66.      * <p>
  67.      * Using NIO Path:
  68.      * </p>
  69.      * <pre>{@code
  70.      * UnsynchronizedBufferedInputStream s = UnsynchronizedBufferedInputStream.builder()
  71.      *   .setPath(path)
  72.      *   .setBufferSize(8192)
  73.      *   .get();}
  74.      * </pre>
  75.      *
  76.      * @see #get()
  77.      */
  78.     // @formatter:on
  79.     public static class Builder extends AbstractStreamBuilder<UnsynchronizedBufferedInputStream, Builder> {

  80.         /**
  81.          * Constructs a builder of {@link UnsynchronizedBufferedInputStream}.
  82.          */
  83.         public Builder() {
  84.             // empty
  85.         }

  86.         /**
  87.          * Builds a new {@link UnsynchronizedBufferedInputStream}.
  88.          * <p>
  89.          * You must set an aspect that supports {@link #getInputStream()} on this builder, otherwise, this method throws an exception.
  90.          * </p>
  91.          * <p>
  92.          * This builder uses the following aspects:
  93.          * </p>
  94.          * <ul>
  95.          * <li>{@link #getInputStream()}</li>
  96.          * <li>{@link #getBufferSize()}</li>
  97.          * </ul>
  98.          *
  99.          * @return a new instance.
  100.          * @throws IllegalStateException         if the {@code origin} is {@code null}.
  101.          * @throws UnsupportedOperationException if the origin cannot be converted to an {@link InputStream}.
  102.          * @throws IOException                   if an I/O error occurs converting to an {@link InputStream} using {@link #getInputStream()}.
  103.          * @see #getInputStream()
  104.          * @see #getBufferSize()
  105.          * @see #getUnchecked()
  106.          */
  107.         @Override
  108.         public UnsynchronizedBufferedInputStream get() throws IOException {
  109.             return new UnsynchronizedBufferedInputStream(this);
  110.         }

  111.     }

  112.     /**
  113.      * The buffer containing the current bytes read from the target InputStream.
  114.      */
  115.     protected volatile byte[] buffer;

  116.     /**
  117.      * The total number of bytes inside the byte array {@code buffer}.
  118.      */
  119.     protected int count;

  120.     /**
  121.      * The current limit, which when passed, invalidates the current mark.
  122.      */
  123.     protected int markLimit;

  124.     /**
  125.      * The currently marked position. -1 indicates no mark has been set or the mark has been invalidated.
  126.      */
  127.     protected int markPos = IOUtils.EOF;

  128.     /**
  129.      * The current position within the byte array {@code buffer}.
  130.      */
  131.     protected int pos;

  132.     /**
  133.      * Constructs a new {@code BufferedInputStream} on the {@link InputStream} {@code in}. The buffer size is specified by the parameter {@code size} and all
  134.      * reads are now filtered through this stream.
  135.      *
  136.      * @param builder   A builder providing the input stream and buffer size.
  137.      * @throws IOException if an I/O error occurs.
  138.      * @throws IllegalArgumentException if {@code size < 0}.
  139.      */
  140.     @SuppressWarnings("resource")
  141.     private UnsynchronizedBufferedInputStream(final Builder builder) throws IOException {
  142.         super(builder.getInputStream());
  143.         final int bufferSize = builder.getBufferSize();
  144.         if (bufferSize <= 0) {
  145.             throw new IllegalArgumentException("Size must be > 0");
  146.         }
  147.         buffer = new byte[bufferSize];
  148.     }

  149.     /**
  150.      * Returns the number of bytes that are available before this stream will block. This method returns the number of bytes available in the buffer plus those
  151.      * available in the source stream.
  152.      *
  153.      * @return the number of bytes available before blocking.
  154.      * @throws IOException if this stream is closed.
  155.      */
  156.     @Override
  157.     public int available() throws IOException {
  158.         final InputStream localIn = inputStream; // 'in' could be invalidated by close()
  159.         if (buffer == null || localIn == null) {
  160.             throw new IOException("Stream is closed");
  161.         }
  162.         return count - pos + localIn.available();
  163.     }

  164.     /**
  165.      * Closes this stream. The source stream is closed and any resources associated with it are released.
  166.      *
  167.      * @throws IOException if an error occurs while closing this stream.
  168.      */
  169.     @Override
  170.     public void close() throws IOException {
  171.         buffer = null;
  172.         final InputStream localIn = inputStream;
  173.         inputStream = null;
  174.         if (localIn != null) {
  175.             localIn.close();
  176.         }
  177.     }

  178.     private int fillBuffer(final InputStream localIn, byte[] localBuf) throws IOException {
  179.         if (markPos == IOUtils.EOF || pos - markPos >= markLimit) {
  180.             /* Mark position not set or exceeded readLimit */
  181.             final int result = localIn.read(localBuf);
  182.             if (result > 0) {
  183.                 markPos = IOUtils.EOF;
  184.                 pos = 0;
  185.                 count = result;
  186.             }
  187.             return result;
  188.         }
  189.         if (markPos == 0 && markLimit > localBuf.length) {
  190.             /* Increase buffer size to accommodate the readLimit */
  191.             int newLength = localBuf.length * 2;
  192.             if (newLength > markLimit) {
  193.                 newLength = markLimit;
  194.             }
  195.             final byte[] newbuf = new byte[newLength];
  196.             System.arraycopy(localBuf, 0, newbuf, 0, localBuf.length);
  197.             // Reassign buffer, which will invalidate any local references
  198.             // FIXME: what if buffer was null?
  199.             localBuf = buffer = newbuf;
  200.         } else if (markPos > 0) {
  201.             System.arraycopy(localBuf, markPos, localBuf, 0, localBuf.length - markPos);
  202.         }
  203.         // Set the new position and mark position
  204.         pos -= markPos;
  205.         count = markPos = 0;
  206.         final int bytesread = localIn.read(localBuf, pos, localBuf.length - pos);
  207.         count = bytesread <= 0 ? pos : pos + bytesread;
  208.         return bytesread;
  209.     }

  210.     byte[] getBuffer() {
  211.         return buffer;
  212.     }

  213.     /**
  214.      * Sets a mark position in this stream. The parameter {@code readLimit} indicates how many bytes can be read before a mark is invalidated. Calling
  215.      * {@code reset()} will reposition the stream back to the marked position if {@code readLimit} has not been surpassed. The underlying buffer may be
  216.      * increased in size to allow {@code readLimit} number of bytes to be supported.
  217.      *
  218.      * @param readLimit the number of bytes that can be read before the mark is invalidated.
  219.      * @see #reset()
  220.      */
  221.     @Override
  222.     public void mark(final int readLimit) {
  223.         markLimit = readLimit;
  224.         markPos = pos;
  225.     }

  226.     /**
  227.      * Indicates whether {@code BufferedInputStream} supports the {@code mark()} and {@code reset()} methods.
  228.      *
  229.      * @return {@code true} for BufferedInputStreams.
  230.      * @see #mark(int)
  231.      * @see #reset()
  232.      */
  233.     @Override
  234.     public boolean markSupported() {
  235.         return true;
  236.     }

  237.     /**
  238.      * Reads a single byte from this stream and returns it as an integer in the range from 0 to 255. Returns -1 if the end of the source string has been
  239.      * reached. If the internal buffer does not contain any available bytes then it is filled from the source stream and the first byte is returned.
  240.      *
  241.      * @return the byte read or -1 if the end of the source stream has been reached.
  242.      * @throws IOException if this stream is closed or another IOException occurs.
  243.      */
  244.     @Override
  245.     public int read() throws IOException {
  246.         // Use local refs since buf and in may be invalidated by an
  247.         // unsynchronized close()
  248.         byte[] localBuf = buffer;
  249.         final InputStream localIn = inputStream;
  250.         if (localBuf == null || localIn == null) {
  251.             throw new IOException("Stream is closed");
  252.         }

  253.         /* Are there buffered bytes available? */
  254.         if (pos >= count && fillBuffer(localIn, localBuf) == IOUtils.EOF) {
  255.             return IOUtils.EOF; /* no, fill buffer */
  256.         }
  257.         // localBuf may have been invalidated by fillbuf
  258.         if (localBuf != buffer) {
  259.             localBuf = buffer;
  260.             if (localBuf == null) {
  261.                 throw new IOException("Stream is closed");
  262.             }
  263.         }

  264.         /* Did filling the buffer fail with -1 (EOF)? */
  265.         if (count - pos > 0) {
  266.             return localBuf[pos++] & 0xFF;
  267.         }
  268.         return IOUtils.EOF;
  269.     }

  270.     /**
  271.      * Reads at most {@code length} bytes from this stream and stores them in byte array {@code buffer} starting at offset {@code offset}. Returns the number of
  272.      * bytes actually read or -1 if no bytes were read and the end of the stream was encountered. If all the buffered bytes have been used, a mark has not been
  273.      * set and the requested number of bytes is larger than the receiver's buffer size, this implementation bypasses the buffer and simply places the results
  274.      * directly into {@code buffer}.
  275.      *
  276.      * @param dest the byte array in which to store the bytes read.
  277.      * @param offset the initial position in {@code buffer} to store the bytes read from this stream.
  278.      * @param length the maximum number of bytes to store in {@code buffer}.
  279.      * @return the number of bytes actually read or -1 if end of stream.
  280.      * @throws IndexOutOfBoundsException if {@code offset < 0} or {@code length < 0}, or if {@code offset + length} is greater than the size of {@code buffer}.
  281.      * @throws IOException               if the stream is already closed or another IOException occurs.
  282.      */
  283.     @Override
  284.     public int read(final byte[] dest, int offset, final int length) throws IOException {
  285.         // Use local ref since buf may be invalidated by an unsynchronized
  286.         // close()
  287.         byte[] localBuf = buffer;
  288.         if (localBuf == null) {
  289.             throw new IOException("Stream is closed");
  290.         }
  291.         // avoid int overflow
  292.         if (offset > dest.length - length || offset < 0 || length < 0) {
  293.             throw new IndexOutOfBoundsException();
  294.         }
  295.         if (length == 0) {
  296.             return 0;
  297.         }
  298.         final InputStream localIn = inputStream;
  299.         if (localIn == null) {
  300.             throw new IOException("Stream is closed");
  301.         }

  302.         int required;
  303.         if (pos < count) {
  304.             /* There are bytes available in the buffer. */
  305.             final int copylength = count - pos >= length ? length : count - pos;
  306.             System.arraycopy(localBuf, pos, dest, offset, copylength);
  307.             pos += copylength;
  308.             if (copylength == length || localIn.available() == 0) {
  309.                 return copylength;
  310.             }
  311.             offset += copylength;
  312.             required = length - copylength;
  313.         } else {
  314.             required = length;
  315.         }

  316.         while (true) {
  317.             final int read;
  318.             /*
  319.              * If we're not marked and the required size is greater than the buffer, simply read the bytes directly bypassing the buffer.
  320.              */
  321.             if (markPos == IOUtils.EOF && required >= localBuf.length) {
  322.                 read = localIn.read(dest, offset, required);
  323.                 if (read == IOUtils.EOF) {
  324.                     return required == length ? IOUtils.EOF : length - required;
  325.                 }
  326.             } else {
  327.                 if (fillBuffer(localIn, localBuf) == IOUtils.EOF) {
  328.                     return required == length ? IOUtils.EOF : length - required;
  329.                 }
  330.                 // localBuf may have been invalidated by fillBuffer()
  331.                 if (localBuf != buffer) {
  332.                     localBuf = buffer;
  333.                     if (localBuf == null) {
  334.                         throw new IOException("Stream is closed");
  335.                     }
  336.                 }

  337.                 read = count - pos >= required ? required : count - pos;
  338.                 System.arraycopy(localBuf, pos, dest, offset, read);
  339.                 pos += read;
  340.             }
  341.             required -= read;
  342.             if (required == 0) {
  343.                 return length;
  344.             }
  345.             if (localIn.available() == 0) {
  346.                 return length - required;
  347.             }
  348.             offset += read;
  349.         }
  350.     }

  351.     /**
  352.      * Resets this stream to the last marked location.
  353.      *
  354.      * @throws IOException if this stream is closed, no mark has been set or the mark is no longer valid because more than {@code readLimit} bytes have been
  355.      *                     read since setting the mark.
  356.      * @see #mark(int)
  357.      */
  358.     @Override
  359.     public void reset() throws IOException {
  360.         if (buffer == null) {
  361.             throw new IOException("Stream is closed");
  362.         }
  363.         if (IOUtils.EOF == markPos) {
  364.             throw new IOException("Mark has been invalidated");
  365.         }
  366.         pos = markPos;
  367.     }

  368.     /**
  369.      * Skips {@code amount} number of bytes in this stream. Subsequent {@code read()}'s will not return these bytes unless {@code reset()} is used.
  370.      *
  371.      * @param amount the number of bytes to skip. {@code skip} does nothing and returns 0 if {@code amount} is less than zero.
  372.      * @return the number of bytes actually skipped.
  373.      * @throws IOException if this stream is closed or another IOException occurs.
  374.      */
  375.     @Override
  376.     public long skip(final long amount) throws IOException {
  377.         // Use local refs since buf and in may be invalidated by an
  378.         // unsynchronized close()
  379.         final byte[] localBuf = buffer;
  380.         final InputStream localIn = inputStream;
  381.         if (localBuf == null) {
  382.             throw new IOException("Stream is closed");
  383.         }
  384.         if (amount < 1) {
  385.             return 0;
  386.         }
  387.         if (localIn == null) {
  388.             throw new IOException("Stream is closed");
  389.         }

  390.         if (count - pos >= amount) {
  391.             // (int count - int pos) here is always an int so amount is also in the int range if the above test is true.
  392.             // We can safely cast to int and avoid static analysis warnings.
  393.             pos += (int) amount;
  394.             return amount;
  395.         }
  396.         int read = count - pos;
  397.         pos = count;

  398.         if (markPos != IOUtils.EOF && amount <= markLimit) {
  399.             if (fillBuffer(localIn, localBuf) == IOUtils.EOF) {
  400.                 return read;
  401.             }
  402.             if (count - pos >= amount - read) {
  403.                 // (int count - int pos) here is always an int so (amount - read) is also in the int range if the above test is true.
  404.                 // We can safely cast to int and avoid static analysis warnings.
  405.                 pos += (int) amount - read;
  406.                 return amount;
  407.             }
  408.             // Couldn't get all the bytes, skip what we read
  409.             read += count - pos;
  410.             pos = count;
  411.             return read;
  412.         }
  413.         return read + localIn.skip(amount - read);
  414.     }
  415. }