001/*
002 *  Licensed to the Apache Software Foundation (ASF) under one or more
003 *  contributor license agreements.  See the NOTICE file distributed with
004 *  this work for additional information regarding copyright ownership.
005 *  The ASF licenses this file to You under the Apache License, Version 2.0
006 *  (the "License"); you may not use this file except in compliance with
007 *  the License.  You may obtain a copy of the License at
008 *
009 *     https://www.apache.org/licenses/LICENSE-2.0
010 *
011 *  Unless required by applicable law or agreed to in writing, software
012 *  distributed under the License is distributed on an "AS IS" BASIS,
013 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 *  See the License for the specific language governing permissions and
015 *  limitations under the License.
016 */
017
018package org.apache.commons.io.input;
019
020import java.io.BufferedInputStream;
021import java.io.IOException;
022import java.io.InputStream;
023
024import org.apache.commons.io.IOUtils;
025import org.apache.commons.io.build.AbstractStreamBuilder;
026
027/**
028 * An unsynchronized version of {@link BufferedInputStream}, not thread-safe.
029 * <p>
030 * Wraps an existing {@link InputStream} and <em>buffers</em> the input. Expensive interaction with the underlying input stream is minimized, since most
031 * (smaller) requests can be satisfied by accessing the buffer alone. The drawback is that some extra space is required to hold the buffer and that copying
032 * takes place when filling that buffer, but this is usually outweighed by the performance benefits.
033 * </p>
034 * <p>
035 * To build an instance, use {@link Builder}.
036 * </p>
037 * <p>
038 * A typical application pattern for the class looks like this:
039 * </p>
040 *
041 * <pre>
042 * UnsynchronizedBufferedInputStream s = new UnsynchronizedBufferedInputStream.Builder().
043 *   .setInputStream(new FileInputStream(&quot;file.java&quot;))
044 *   .setBufferSize(8192)
045 *   .get();
046 * </pre>
047 * <p>
048 * Provenance: Apache Harmony and modified.
049 * </p>
050 *
051 * @see Builder
052 * @see BufferedInputStream
053 * @since 2.12.0
054 */
055//@NotThreadSafe
056public final class UnsynchronizedBufferedInputStream extends UnsynchronizedFilterInputStream {
057
058    // @formatter:off
059    /**
060     * Builds a new {@link UnsynchronizedBufferedInputStream}.
061     *
062     * <p>
063     * Using File IO:
064     * </p>
065     * <pre>{@code
066     * UnsynchronizedBufferedInputStream s = UnsynchronizedBufferedInputStream.builder()
067     *   .setFile(file)
068     *   .setBufferSize(8192)
069     *   .get();}
070     * </pre>
071     * <p>
072     * Using NIO Path:
073     * </p>
074     * <pre>{@code
075     * UnsynchronizedBufferedInputStream s = UnsynchronizedBufferedInputStream.builder()
076     *   .setPath(path)
077     *   .setBufferSize(8192)
078     *   .get();}
079     * </pre>
080     *
081     * @see #get()
082     */
083    // @formatter:on
084    public static class Builder extends AbstractStreamBuilder<UnsynchronizedBufferedInputStream, Builder> {
085
086        /**
087         * Constructs a builder of {@link UnsynchronizedBufferedInputStream}.
088         */
089        public Builder() {
090            // empty
091        }
092
093        /**
094         * Builds a new {@link UnsynchronizedBufferedInputStream}.
095         * <p>
096         * You must set an aspect that supports {@link #getInputStream()} on this builder, otherwise, this method throws an exception.
097         * </p>
098         * <p>
099         * This builder uses the following aspects:
100         * </p>
101         * <ul>
102         * <li>{@link #getInputStream()}</li>
103         * <li>{@link #getBufferSize()}</li>
104         * </ul>
105         *
106         * @return a new instance.
107         * @throws IllegalStateException         if the {@code origin} is {@code null}.
108         * @throws UnsupportedOperationException if the origin cannot be converted to an {@link InputStream}.
109         * @throws IOException                   if an I/O error occurs converting to an {@link InputStream} using {@link #getInputStream()}.
110         * @see #getInputStream()
111         * @see #getBufferSize()
112         * @see #getUnchecked()
113         */
114        @Override
115        public UnsynchronizedBufferedInputStream get() throws IOException {
116            return new UnsynchronizedBufferedInputStream(this);
117        }
118
119    }
120
121    /**
122     * The buffer containing the current bytes read from the target InputStream.
123     */
124    protected volatile byte[] buffer;
125
126    /**
127     * The total number of bytes inside the byte array {@code buffer}.
128     */
129    protected int count;
130
131    /**
132     * The current limit, which when passed, invalidates the current mark.
133     */
134    protected int markLimit;
135
136    /**
137     * The currently marked position. -1 indicates no mark has been set or the mark has been invalidated.
138     */
139    protected int markPos = IOUtils.EOF;
140
141    /**
142     * The current position within the byte array {@code buffer}.
143     */
144    protected int pos;
145
146    /**
147     * Constructs a new {@code BufferedInputStream} on the {@link InputStream} {@code in}. The buffer size is specified by the parameter {@code size} and all
148     * reads are now filtered through this stream.
149     *
150     * @param builder   A builder providing the input stream and buffer size.
151     * @throws IOException if an I/O error occurs.
152     * @throws IllegalArgumentException if {@code size < 0}.
153     */
154    @SuppressWarnings("resource")
155    private UnsynchronizedBufferedInputStream(final Builder builder) throws IOException {
156        super(builder.getInputStream());
157        final int bufferSize = builder.getBufferSize();
158        if (bufferSize <= 0) {
159            throw new IllegalArgumentException("Size must be > 0");
160        }
161        buffer = new byte[bufferSize];
162    }
163
164    /**
165     * Returns the number of bytes that are available before this stream will block. This method returns the number of bytes available in the buffer plus those
166     * available in the source stream.
167     *
168     * @return the number of bytes available before blocking.
169     * @throws IOException if this stream is closed.
170     */
171    @Override
172    public int available() throws IOException {
173        final InputStream localIn = inputStream; // 'in' could be invalidated by close()
174        if (buffer == null || localIn == null) {
175            throw new IOException("Stream is closed");
176        }
177        return count - pos + localIn.available();
178    }
179
180    /**
181     * Closes this stream. The source stream is closed and any resources associated with it are released.
182     *
183     * @throws IOException if an error occurs while closing this stream.
184     */
185    @Override
186    public void close() throws IOException {
187        buffer = null;
188        final InputStream localIn = inputStream;
189        inputStream = null;
190        if (localIn != null) {
191            localIn.close();
192        }
193    }
194
195    private int fillBuffer(final InputStream localIn, byte[] localBuf) throws IOException {
196        if (markPos == IOUtils.EOF || pos - markPos >= markLimit) {
197            /* Mark position not set or exceeded readLimit */
198            final int result = localIn.read(localBuf);
199            if (result > 0) {
200                markPos = IOUtils.EOF;
201                pos = 0;
202                count = result;
203            }
204            return result;
205        }
206        if (markPos == 0 && markLimit > localBuf.length) {
207            /* Increase buffer size to accommodate the readLimit */
208            int newLength = localBuf.length * 2;
209            if (newLength > markLimit) {
210                newLength = markLimit;
211            }
212            final byte[] newbuf = new byte[newLength];
213            System.arraycopy(localBuf, 0, newbuf, 0, localBuf.length);
214            // Reassign buffer, which will invalidate any local references
215            // FIXME: what if buffer was null?
216            localBuf = buffer = newbuf;
217        } else if (markPos > 0) {
218            System.arraycopy(localBuf, markPos, localBuf, 0, localBuf.length - markPos);
219        }
220        // Set the new position and mark position
221        pos -= markPos;
222        count = markPos = 0;
223        final int bytesread = localIn.read(localBuf, pos, localBuf.length - pos);
224        count = bytesread <= 0 ? pos : pos + bytesread;
225        return bytesread;
226    }
227
228    byte[] getBuffer() {
229        return buffer;
230    }
231
232    /**
233     * Sets a mark position in this stream. The parameter {@code readLimit} indicates how many bytes can be read before a mark is invalidated. Calling
234     * {@code reset()} will reposition the stream back to the marked position if {@code readLimit} has not been surpassed. The underlying buffer may be
235     * increased in size to allow {@code readLimit} number of bytes to be supported.
236     *
237     * @param readLimit the number of bytes that can be read before the mark is invalidated.
238     * @see #reset()
239     */
240    @Override
241    public void mark(final int readLimit) {
242        markLimit = readLimit;
243        markPos = pos;
244    }
245
246    /**
247     * Indicates whether {@code BufferedInputStream} supports the {@code mark()} and {@code reset()} methods.
248     *
249     * @return {@code true} for BufferedInputStreams.
250     * @see #mark(int)
251     * @see #reset()
252     */
253    @Override
254    public boolean markSupported() {
255        return true;
256    }
257
258    /**
259     * Reads a single byte from this stream and returns it as an integer in the range from 0 to 255. Returns -1 if the end of the source string has been
260     * reached. If the internal buffer does not contain any available bytes then it is filled from the source stream and the first byte is returned.
261     *
262     * @return the byte read or -1 if the end of the source stream has been reached.
263     * @throws IOException if this stream is closed or another IOException occurs.
264     */
265    @Override
266    public int read() throws IOException {
267        // Use local refs since buf and in may be invalidated by an
268        // unsynchronized close()
269        byte[] localBuf = buffer;
270        final InputStream localIn = inputStream;
271        if (localBuf == null || localIn == null) {
272            throw new IOException("Stream is closed");
273        }
274
275        /* Are there buffered bytes available? */
276        if (pos >= count && fillBuffer(localIn, localBuf) == IOUtils.EOF) {
277            return IOUtils.EOF; /* no, fill buffer */
278        }
279        // localBuf may have been invalidated by fillbuf
280        if (localBuf != buffer) {
281            localBuf = buffer;
282            if (localBuf == null) {
283                throw new IOException("Stream is closed");
284            }
285        }
286
287        /* Did filling the buffer fail with -1 (EOF)? */
288        if (count - pos > 0) {
289            return localBuf[pos++] & 0xFF;
290        }
291        return IOUtils.EOF;
292    }
293
294    /**
295     * Reads at most {@code length} bytes from this stream and stores them in byte array {@code buffer} starting at offset {@code offset}. Returns the number of
296     * bytes actually read or -1 if no bytes were read and the end of the stream was encountered. If all the buffered bytes have been used, a mark has not been
297     * set and the requested number of bytes is larger than the receiver's buffer size, this implementation bypasses the buffer and simply places the results
298     * directly into {@code buffer}.
299     *
300     * @param dest the byte array in which to store the bytes read.
301     * @param offset the initial position in {@code buffer} to store the bytes read from this stream.
302     * @param length the maximum number of bytes to store in {@code buffer}.
303     * @return the number of bytes actually read or -1 if end of stream.
304     * @throws IndexOutOfBoundsException if {@code offset < 0} or {@code length < 0}, or if {@code offset + length} is greater than the size of {@code buffer}.
305     * @throws IOException               if the stream is already closed or another IOException occurs.
306     */
307    @Override
308    public int read(final byte[] dest, int offset, final int length) throws IOException {
309        // Use local ref since buf may be invalidated by an unsynchronized
310        // close()
311        byte[] localBuf = buffer;
312        if (localBuf == null) {
313            throw new IOException("Stream is closed");
314        }
315        // avoid int overflow
316        if (offset > dest.length - length || offset < 0 || length < 0) {
317            throw new IndexOutOfBoundsException();
318        }
319        if (length == 0) {
320            return 0;
321        }
322        final InputStream localIn = inputStream;
323        if (localIn == null) {
324            throw new IOException("Stream is closed");
325        }
326
327        int required;
328        if (pos < count) {
329            /* There are bytes available in the buffer. */
330            final int copylength = count - pos >= length ? length : count - pos;
331            System.arraycopy(localBuf, pos, dest, offset, copylength);
332            pos += copylength;
333            if (copylength == length || localIn.available() == 0) {
334                return copylength;
335            }
336            offset += copylength;
337            required = length - copylength;
338        } else {
339            required = length;
340        }
341
342        while (true) {
343            final int read;
344            /*
345             * If we're not marked and the required size is greater than the buffer, simply read the bytes directly bypassing the buffer.
346             */
347            if (markPos == IOUtils.EOF && required >= localBuf.length) {
348                read = localIn.read(dest, offset, required);
349                if (read == IOUtils.EOF) {
350                    return required == length ? IOUtils.EOF : length - required;
351                }
352            } else {
353                if (fillBuffer(localIn, localBuf) == IOUtils.EOF) {
354                    return required == length ? IOUtils.EOF : length - required;
355                }
356                // localBuf may have been invalidated by fillBuffer()
357                if (localBuf != buffer) {
358                    localBuf = buffer;
359                    if (localBuf == null) {
360                        throw new IOException("Stream is closed");
361                    }
362                }
363
364                read = count - pos >= required ? required : count - pos;
365                System.arraycopy(localBuf, pos, dest, offset, read);
366                pos += read;
367            }
368            required -= read;
369            if (required == 0) {
370                return length;
371            }
372            if (localIn.available() == 0) {
373                return length - required;
374            }
375            offset += read;
376        }
377    }
378
379    /**
380     * Resets this stream to the last marked location.
381     *
382     * @throws IOException if this stream is closed, no mark has been set or the mark is no longer valid because more than {@code readLimit} bytes have been
383     *                     read since setting the mark.
384     * @see #mark(int)
385     */
386    @Override
387    public void reset() throws IOException {
388        if (buffer == null) {
389            throw new IOException("Stream is closed");
390        }
391        if (IOUtils.EOF == markPos) {
392            throw new IOException("Mark has been invalidated");
393        }
394        pos = markPos;
395    }
396
397    /**
398     * Skips {@code amount} number of bytes in this stream. Subsequent {@code read()}'s will not return these bytes unless {@code reset()} is used.
399     *
400     * @param amount the number of bytes to skip. {@code skip} does nothing and returns 0 if {@code amount} is less than zero.
401     * @return the number of bytes actually skipped.
402     * @throws IOException if this stream is closed or another IOException occurs.
403     */
404    @Override
405    public long skip(final long amount) throws IOException {
406        // Use local refs since buf and in may be invalidated by an
407        // unsynchronized close()
408        final byte[] localBuf = buffer;
409        final InputStream localIn = inputStream;
410        if (localBuf == null) {
411            throw new IOException("Stream is closed");
412        }
413        if (amount < 1) {
414            return 0;
415        }
416        if (localIn == null) {
417            throw new IOException("Stream is closed");
418        }
419
420        if (count - pos >= amount) {
421            // (int count - int pos) here is always an int so amount is also in the int range if the above test is true.
422            // We can safely cast to int and avoid static analysis warnings.
423            pos += (int) amount;
424            return amount;
425        }
426        int read = count - pos;
427        pos = count;
428
429        if (markPos != IOUtils.EOF && amount <= markLimit) {
430            if (fillBuffer(localIn, localBuf) == IOUtils.EOF) {
431                return read;
432            }
433            if (count - pos >= amount - read) {
434                // (int count - int pos) here is always an int so (amount - read) is also in the int range if the above test is true.
435                // We can safely cast to int and avoid static analysis warnings.
436                pos += (int) amount - read;
437                return amount;
438            }
439            // Couldn't get all the bytes, skip what we read
440            read += count - pos;
441            pos = count;
442            return read;
443        }
444        return read + localIn.skip(amount - read);
445    }
446}