View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one or more
3    *  contributor license agreements.  See the NOTICE file distributed with
4    *  this work for additional information regarding copyright ownership.
5    *  The ASF licenses this file to You under the Apache License, Version 2.0
6    *  (the "License"); you may not use this file except in compliance with
7    *  the License.  You may obtain a copy of the License at
8    *
9    *     https://www.apache.org/licenses/LICENSE-2.0
10   *
11   *  Unless required by applicable law or agreed to in writing, software
12   *  distributed under the License is distributed on an "AS IS" BASIS,
13   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   *  See the License for the specific language governing permissions and
15   *  limitations under the License.
16   */
17  
18  package org.apache.commons.io.input;
19  
20  import java.io.BufferedInputStream;
21  import java.io.IOException;
22  import java.io.InputStream;
23  
24  import org.apache.commons.io.IOUtils;
25  import org.apache.commons.io.build.AbstractStreamBuilder;
26  
27  /**
28   * An unsynchronized version of {@link BufferedInputStream}, not thread-safe.
29   * <p>
30   * Wraps an existing {@link InputStream} and <em>buffers</em> the input. Expensive interaction with the underlying input stream is minimized, since most
31   * (smaller) requests can be satisfied by accessing the buffer alone. The drawback is that some extra space is required to hold the buffer and that copying
32   * takes place when filling that buffer, but this is usually outweighed by the performance benefits.
33   * </p>
34   * <p>
35   * To build an instance, use {@link Builder}.
36   * </p>
37   * <p>
38   * A typical application pattern for the class looks like this:
39   * </p>
40   *
41   * <pre>
42   * UnsynchronizedBufferedInputStream s = new UnsynchronizedBufferedInputStream.Builder().
43   *   .setInputStream(new FileInputStream(&quot;file.java&quot;))
44   *   .setBufferSize(8192)
45   *   .get();
46   * </pre>
47   * <p>
48   * Provenance: Apache Harmony and modified.
49   * </p>
50   *
51   * @see Builder
52   * @see BufferedInputStream
53   * @since 2.12.0
54   */
55  //@NotThreadSafe
56  public final class UnsynchronizedBufferedInputStream extends UnsynchronizedFilterInputStream {
57  
58      // @formatter:off
59      /**
60       * Builds a new {@link UnsynchronizedBufferedInputStream}.
61       *
62       * <p>
63       * Using File IO:
64       * </p>
65       * <pre>{@code
66       * UnsynchronizedBufferedInputStream s = UnsynchronizedBufferedInputStream.builder()
67       *   .setFile(file)
68       *   .setBufferSize(8192)
69       *   .get();}
70       * </pre>
71       * <p>
72       * Using NIO Path:
73       * </p>
74       * <pre>{@code
75       * UnsynchronizedBufferedInputStream s = UnsynchronizedBufferedInputStream.builder()
76       *   .setPath(path)
77       *   .setBufferSize(8192)
78       *   .get();}
79       * </pre>
80       *
81       * @see #get()
82       */
83      // @formatter:on
84      public static class Builder extends AbstractStreamBuilder<UnsynchronizedBufferedInputStream, Builder> {
85  
86          /**
87           * Constructs a builder of {@link UnsynchronizedBufferedInputStream}.
88           */
89          public Builder() {
90              // empty
91          }
92  
93          /**
94           * Builds a new {@link UnsynchronizedBufferedInputStream}.
95           * <p>
96           * You must set an aspect that supports {@link #getInputStream()} on this builder, otherwise, this method throws an exception.
97           * </p>
98           * <p>
99           * This builder uses the following aspects:
100          * </p>
101          * <ul>
102          * <li>{@link #getInputStream()}</li>
103          * <li>{@link #getBufferSize()}</li>
104          * </ul>
105          *
106          * @return a new instance.
107          * @throws IllegalStateException         if the {@code origin} is {@code null}.
108          * @throws UnsupportedOperationException if the origin cannot be converted to an {@link InputStream}.
109          * @throws IOException                   if an I/O error occurs converting to an {@link InputStream} using {@link #getInputStream()}.
110          * @see #getInputStream()
111          * @see #getBufferSize()
112          * @see #getUnchecked()
113          */
114         @Override
115         public UnsynchronizedBufferedInputStream get() throws IOException {
116             return new UnsynchronizedBufferedInputStream(this);
117         }
118 
119     }
120 
121     /**
122      * The buffer containing the current bytes read from the target InputStream.
123      */
124     protected volatile byte[] buffer;
125 
126     /**
127      * The total number of bytes inside the byte array {@code buffer}.
128      */
129     protected int count;
130 
131     /**
132      * The current limit, which when passed, invalidates the current mark.
133      */
134     protected int markLimit;
135 
136     /**
137      * The currently marked position. -1 indicates no mark has been set or the mark has been invalidated.
138      */
139     protected int markPos = IOUtils.EOF;
140 
141     /**
142      * The current position within the byte array {@code buffer}.
143      */
144     protected int pos;
145 
146     /**
147      * Constructs a new {@code BufferedInputStream} on the {@link InputStream} {@code in}. The buffer size is specified by the parameter {@code size} and all
148      * reads are now filtered through this stream.
149      *
150      * @param builder   A builder providing the input stream and buffer size.
151      * @throws IOException if an I/O error occurs.
152      * @throws IllegalArgumentException if {@code size < 0}.
153      */
154     @SuppressWarnings("resource")
155     private UnsynchronizedBufferedInputStream(final Builder builder) throws IOException {
156         super(builder.getInputStream());
157         final int bufferSize = builder.getBufferSize();
158         if (bufferSize <= 0) {
159             throw new IllegalArgumentException("Size must be > 0");
160         }
161         buffer = new byte[bufferSize];
162     }
163 
164     /**
165      * Returns the number of bytes that are available before this stream will block. This method returns the number of bytes available in the buffer plus those
166      * available in the source stream.
167      *
168      * @return the number of bytes available before blocking.
169      * @throws IOException if this stream is closed.
170      */
171     @Override
172     public int available() throws IOException {
173         final InputStream localIn = inputStream; // 'in' could be invalidated by close()
174         if (buffer == null || localIn == null) {
175             throw new IOException("Stream is closed");
176         }
177         return count - pos + localIn.available();
178     }
179 
180     /**
181      * Closes this stream. The source stream is closed and any resources associated with it are released.
182      *
183      * @throws IOException if an error occurs while closing this stream.
184      */
185     @Override
186     public void close() throws IOException {
187         buffer = null;
188         final InputStream localIn = inputStream;
189         inputStream = null;
190         if (localIn != null) {
191             localIn.close();
192         }
193     }
194 
195     private int fillBuffer(final InputStream localIn, byte[] localBuf) throws IOException {
196         if (markPos == IOUtils.EOF || pos - markPos >= markLimit) {
197             /* Mark position not set or exceeded readLimit */
198             final int result = localIn.read(localBuf);
199             if (result > 0) {
200                 markPos = IOUtils.EOF;
201                 pos = 0;
202                 count = result;
203             }
204             return result;
205         }
206         if (markPos == 0 && markLimit > localBuf.length) {
207             /* Increase buffer size to accommodate the readLimit */
208             int newLength = localBuf.length * 2;
209             if (newLength > markLimit) {
210                 newLength = markLimit;
211             }
212             final byte[] newbuf = new byte[newLength];
213             System.arraycopy(localBuf, 0, newbuf, 0, localBuf.length);
214             // Reassign buffer, which will invalidate any local references
215             // FIXME: what if buffer was null?
216             localBuf = buffer = newbuf;
217         } else if (markPos > 0) {
218             System.arraycopy(localBuf, markPos, localBuf, 0, localBuf.length - markPos);
219         }
220         // Set the new position and mark position
221         pos -= markPos;
222         count = markPos = 0;
223         final int bytesread = localIn.read(localBuf, pos, localBuf.length - pos);
224         count = bytesread <= 0 ? pos : pos + bytesread;
225         return bytesread;
226     }
227 
228     byte[] getBuffer() {
229         return buffer;
230     }
231 
232     /**
233      * Sets a mark position in this stream. The parameter {@code readLimit} indicates how many bytes can be read before a mark is invalidated. Calling
234      * {@code reset()} will reposition the stream back to the marked position if {@code readLimit} has not been surpassed. The underlying buffer may be
235      * increased in size to allow {@code readLimit} number of bytes to be supported.
236      *
237      * @param readLimit the number of bytes that can be read before the mark is invalidated.
238      * @see #reset()
239      */
240     @Override
241     public void mark(final int readLimit) {
242         markLimit = readLimit;
243         markPos = pos;
244     }
245 
246     /**
247      * Indicates whether {@code BufferedInputStream} supports the {@code mark()} and {@code reset()} methods.
248      *
249      * @return {@code true} for BufferedInputStreams.
250      * @see #mark(int)
251      * @see #reset()
252      */
253     @Override
254     public boolean markSupported() {
255         return true;
256     }
257 
258     /**
259      * Reads a single byte from this stream and returns it as an integer in the range from 0 to 255. Returns -1 if the end of the source string has been
260      * reached. If the internal buffer does not contain any available bytes then it is filled from the source stream and the first byte is returned.
261      *
262      * @return the byte read or -1 if the end of the source stream has been reached.
263      * @throws IOException if this stream is closed or another IOException occurs.
264      */
265     @Override
266     public int read() throws IOException {
267         // Use local refs since buf and in may be invalidated by an
268         // unsynchronized close()
269         byte[] localBuf = buffer;
270         final InputStream localIn = inputStream;
271         if (localBuf == null || localIn == null) {
272             throw new IOException("Stream is closed");
273         }
274 
275         /* Are there buffered bytes available? */
276         if (pos >= count && fillBuffer(localIn, localBuf) == IOUtils.EOF) {
277             return IOUtils.EOF; /* no, fill buffer */
278         }
279         // localBuf may have been invalidated by fillbuf
280         if (localBuf != buffer) {
281             localBuf = buffer;
282             if (localBuf == null) {
283                 throw new IOException("Stream is closed");
284             }
285         }
286 
287         /* Did filling the buffer fail with -1 (EOF)? */
288         if (count - pos > 0) {
289             return localBuf[pos++] & 0xFF;
290         }
291         return IOUtils.EOF;
292     }
293 
294     /**
295      * Reads at most {@code length} bytes from this stream and stores them in byte array {@code buffer} starting at offset {@code offset}. Returns the number of
296      * bytes actually read or -1 if no bytes were read and the end of the stream was encountered. If all the buffered bytes have been used, a mark has not been
297      * set and the requested number of bytes is larger than the receiver's buffer size, this implementation bypasses the buffer and simply places the results
298      * directly into {@code buffer}.
299      *
300      * @param dest the byte array in which to store the bytes read.
301      * @param offset the initial position in {@code buffer} to store the bytes read from this stream.
302      * @param length the maximum number of bytes to store in {@code buffer}.
303      * @return the number of bytes actually read or -1 if end of stream.
304      * @throws IndexOutOfBoundsException if {@code offset < 0} or {@code length < 0}, or if {@code offset + length} is greater than the size of {@code buffer}.
305      * @throws IOException               if the stream is already closed or another IOException occurs.
306      */
307     @Override
308     public int read(final byte[] dest, int offset, final int length) throws IOException {
309         // Use local ref since buf may be invalidated by an unsynchronized
310         // close()
311         byte[] localBuf = buffer;
312         if (localBuf == null) {
313             throw new IOException("Stream is closed");
314         }
315         // avoid int overflow
316         if (offset > dest.length - length || offset < 0 || length < 0) {
317             throw new IndexOutOfBoundsException();
318         }
319         if (length == 0) {
320             return 0;
321         }
322         final InputStream localIn = inputStream;
323         if (localIn == null) {
324             throw new IOException("Stream is closed");
325         }
326 
327         int required;
328         if (pos < count) {
329             /* There are bytes available in the buffer. */
330             final int copylength = count - pos >= length ? length : count - pos;
331             System.arraycopy(localBuf, pos, dest, offset, copylength);
332             pos += copylength;
333             if (copylength == length || localIn.available() == 0) {
334                 return copylength;
335             }
336             offset += copylength;
337             required = length - copylength;
338         } else {
339             required = length;
340         }
341 
342         while (true) {
343             final int read;
344             /*
345              * If we're not marked and the required size is greater than the buffer, simply read the bytes directly bypassing the buffer.
346              */
347             if (markPos == IOUtils.EOF && required >= localBuf.length) {
348                 read = localIn.read(dest, offset, required);
349                 if (read == IOUtils.EOF) {
350                     return required == length ? IOUtils.EOF : length - required;
351                 }
352             } else {
353                 if (fillBuffer(localIn, localBuf) == IOUtils.EOF) {
354                     return required == length ? IOUtils.EOF : length - required;
355                 }
356                 // localBuf may have been invalidated by fillBuffer()
357                 if (localBuf != buffer) {
358                     localBuf = buffer;
359                     if (localBuf == null) {
360                         throw new IOException("Stream is closed");
361                     }
362                 }
363 
364                 read = count - pos >= required ? required : count - pos;
365                 System.arraycopy(localBuf, pos, dest, offset, read);
366                 pos += read;
367             }
368             required -= read;
369             if (required == 0) {
370                 return length;
371             }
372             if (localIn.available() == 0) {
373                 return length - required;
374             }
375             offset += read;
376         }
377     }
378 
379     /**
380      * Resets this stream to the last marked location.
381      *
382      * @throws IOException if this stream is closed, no mark has been set or the mark is no longer valid because more than {@code readLimit} bytes have been
383      *                     read since setting the mark.
384      * @see #mark(int)
385      */
386     @Override
387     public void reset() throws IOException {
388         if (buffer == null) {
389             throw new IOException("Stream is closed");
390         }
391         if (IOUtils.EOF == markPos) {
392             throw new IOException("Mark has been invalidated");
393         }
394         pos = markPos;
395     }
396 
397     /**
398      * Skips {@code amount} number of bytes in this stream. Subsequent {@code read()}'s will not return these bytes unless {@code reset()} is used.
399      *
400      * @param amount the number of bytes to skip. {@code skip} does nothing and returns 0 if {@code amount} is less than zero.
401      * @return the number of bytes actually skipped.
402      * @throws IOException if this stream is closed or another IOException occurs.
403      */
404     @Override
405     public long skip(final long amount) throws IOException {
406         // Use local refs since buf and in may be invalidated by an
407         // unsynchronized close()
408         final byte[] localBuf = buffer;
409         final InputStream localIn = inputStream;
410         if (localBuf == null) {
411             throw new IOException("Stream is closed");
412         }
413         if (amount < 1) {
414             return 0;
415         }
416         if (localIn == null) {
417             throw new IOException("Stream is closed");
418         }
419 
420         if (count - pos >= amount) {
421             // (int count - int pos) here is always an int so amount is also in the int range if the above test is true.
422             // We can safely cast to int and avoid static analysis warnings.
423             pos += (int) amount;
424             return amount;
425         }
426         int read = count - pos;
427         pos = count;
428 
429         if (markPos != IOUtils.EOF && amount <= markLimit) {
430             if (fillBuffer(localIn, localBuf) == IOUtils.EOF) {
431                 return read;
432             }
433             if (count - pos >= amount - read) {
434                 // (int count - int pos) here is always an int so (amount - read) is also in the int range if the above test is true.
435                 // We can safely cast to int and avoid static analysis warnings.
436                 pos += (int) amount - read;
437                 return amount;
438             }
439             // Couldn't get all the bytes, skip what we read
440             read += count - pos;
441             pos = count;
442             return read;
443         }
444         return read + localIn.skip(amount - read);
445     }
446 }