1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 package org.apache.commons.io.input;
19
20 import java.io.BufferedInputStream;
21 import java.io.IOException;
22 import java.io.InputStream;
23
24 import org.apache.commons.io.IOUtils;
25 import org.apache.commons.io.build.AbstractStreamBuilder;
26
27 /**
28 * An unsynchronized version of {@link BufferedInputStream}, not thread-safe.
29 * <p>
30 * Wraps an existing {@link InputStream} and <em>buffers</em> the input. Expensive interaction with the underlying input stream is minimized, since most
31 * (smaller) requests can be satisfied by accessing the buffer alone. The drawback is that some extra space is required to hold the buffer and that copying
32 * takes place when filling that buffer, but this is usually outweighed by the performance benefits.
33 * </p>
34 * <p>
35 * To build an instance, use {@link Builder}.
36 * </p>
37 * <p>
38 * A typical application pattern for the class looks like this:
39 * </p>
40 *
41 * <pre>
42 * UnsynchronizedBufferedInputStream s = new UnsynchronizedBufferedInputStream.Builder().
43 * .setInputStream(new FileInputStream("file.java"))
44 * .setBufferSize(8192)
45 * .get();
46 * </pre>
47 * <p>
48 * Provenance: Apache Harmony and modified.
49 * </p>
50 *
51 * @see Builder
52 * @see BufferedInputStream
53 * @since 2.12.0
54 */
55 //@NotThreadSafe
56 public final class UnsynchronizedBufferedInputStream extends UnsynchronizedFilterInputStream {
57
58 // @formatter:off
59 /**
60 * Builds a new {@link UnsynchronizedBufferedInputStream}.
61 *
62 * <p>
63 * Using File IO:
64 * </p>
65 * <pre>{@code
66 * UnsynchronizedBufferedInputStream s = UnsynchronizedBufferedInputStream.builder()
67 * .setFile(file)
68 * .setBufferSize(8192)
69 * .get();}
70 * </pre>
71 * <p>
72 * Using NIO Path:
73 * </p>
74 * <pre>{@code
75 * UnsynchronizedBufferedInputStream s = UnsynchronizedBufferedInputStream.builder()
76 * .setPath(path)
77 * .setBufferSize(8192)
78 * .get();}
79 * </pre>
80 *
81 * @see #get()
82 */
83 // @formatter:on
84 public static class Builder extends AbstractStreamBuilder<UnsynchronizedBufferedInputStream, Builder> {
85
86 /**
87 * Constructs a builder of {@link UnsynchronizedBufferedInputStream}.
88 */
89 public Builder() {
90 // empty
91 }
92
93 /**
94 * Builds a new {@link UnsynchronizedBufferedInputStream}.
95 * <p>
96 * You must set an aspect that supports {@link #getInputStream()} on this builder, otherwise, this method throws an exception.
97 * </p>
98 * <p>
99 * This builder uses the following aspects:
100 * </p>
101 * <ul>
102 * <li>{@link #getInputStream()}</li>
103 * <li>{@link #getBufferSize()}</li>
104 * </ul>
105 *
106 * @return a new instance.
107 * @throws IllegalStateException if the {@code origin} is {@code null}.
108 * @throws UnsupportedOperationException if the origin cannot be converted to an {@link InputStream}.
109 * @throws IOException if an I/O error occurs converting to an {@link InputStream} using {@link #getInputStream()}.
110 * @see #getInputStream()
111 * @see #getBufferSize()
112 * @see #getUnchecked()
113 */
114 @Override
115 public UnsynchronizedBufferedInputStream get() throws IOException {
116 return new UnsynchronizedBufferedInputStream(this);
117 }
118
119 }
120
121 /**
122 * The buffer containing the current bytes read from the target InputStream.
123 */
124 protected volatile byte[] buffer;
125
126 /**
127 * The total number of bytes inside the byte array {@code buffer}.
128 */
129 protected int count;
130
131 /**
132 * The current limit, which when passed, invalidates the current mark.
133 */
134 protected int markLimit;
135
136 /**
137 * The currently marked position. -1 indicates no mark has been set or the mark has been invalidated.
138 */
139 protected int markPos = IOUtils.EOF;
140
141 /**
142 * The current position within the byte array {@code buffer}.
143 */
144 protected int pos;
145
146 /**
147 * Constructs a new {@code BufferedInputStream} on the {@link InputStream} {@code in}. The buffer size is specified by the parameter {@code size} and all
148 * reads are now filtered through this stream.
149 *
150 * @param builder A builder providing the input stream and buffer size.
151 * @throws IOException if an I/O error occurs.
152 * @throws IllegalArgumentException if {@code size < 0}.
153 */
154 @SuppressWarnings("resource")
155 private UnsynchronizedBufferedInputStream(final Builder builder) throws IOException {
156 super(builder.getInputStream());
157 final int bufferSize = builder.getBufferSize();
158 if (bufferSize <= 0) {
159 throw new IllegalArgumentException("Size must be > 0");
160 }
161 buffer = new byte[bufferSize];
162 }
163
164 /**
165 * Returns the number of bytes that are available before this stream will block. This method returns the number of bytes available in the buffer plus those
166 * available in the source stream.
167 *
168 * @return the number of bytes available before blocking.
169 * @throws IOException if this stream is closed.
170 */
171 @Override
172 public int available() throws IOException {
173 final InputStream localIn = inputStream; // 'in' could be invalidated by close()
174 if (buffer == null || localIn == null) {
175 throw new IOException("Stream is closed");
176 }
177 return count - pos + localIn.available();
178 }
179
180 /**
181 * Closes this stream. The source stream is closed and any resources associated with it are released.
182 *
183 * @throws IOException if an error occurs while closing this stream.
184 */
185 @Override
186 public void close() throws IOException {
187 buffer = null;
188 final InputStream localIn = inputStream;
189 inputStream = null;
190 if (localIn != null) {
191 localIn.close();
192 }
193 }
194
195 private int fillBuffer(final InputStream localIn, byte[] localBuf) throws IOException {
196 if (markPos == IOUtils.EOF || pos - markPos >= markLimit) {
197 /* Mark position not set or exceeded readLimit */
198 final int result = localIn.read(localBuf);
199 if (result > 0) {
200 markPos = IOUtils.EOF;
201 pos = 0;
202 count = result;
203 }
204 return result;
205 }
206 if (markPos == 0 && markLimit > localBuf.length) {
207 /* Increase buffer size to accommodate the readLimit */
208 int newLength = localBuf.length * 2;
209 if (newLength > markLimit) {
210 newLength = markLimit;
211 }
212 final byte[] newbuf = new byte[newLength];
213 System.arraycopy(localBuf, 0, newbuf, 0, localBuf.length);
214 // Reassign buffer, which will invalidate any local references
215 // FIXME: what if buffer was null?
216 localBuf = buffer = newbuf;
217 } else if (markPos > 0) {
218 System.arraycopy(localBuf, markPos, localBuf, 0, localBuf.length - markPos);
219 }
220 // Set the new position and mark position
221 pos -= markPos;
222 count = markPos = 0;
223 final int bytesread = localIn.read(localBuf, pos, localBuf.length - pos);
224 count = bytesread <= 0 ? pos : pos + bytesread;
225 return bytesread;
226 }
227
228 byte[] getBuffer() {
229 return buffer;
230 }
231
232 /**
233 * Sets a mark position in this stream. The parameter {@code readLimit} indicates how many bytes can be read before a mark is invalidated. Calling
234 * {@code reset()} will reposition the stream back to the marked position if {@code readLimit} has not been surpassed. The underlying buffer may be
235 * increased in size to allow {@code readLimit} number of bytes to be supported.
236 *
237 * @param readLimit the number of bytes that can be read before the mark is invalidated.
238 * @see #reset()
239 */
240 @Override
241 public void mark(final int readLimit) {
242 markLimit = readLimit;
243 markPos = pos;
244 }
245
246 /**
247 * Indicates whether {@code BufferedInputStream} supports the {@code mark()} and {@code reset()} methods.
248 *
249 * @return {@code true} for BufferedInputStreams.
250 * @see #mark(int)
251 * @see #reset()
252 */
253 @Override
254 public boolean markSupported() {
255 return true;
256 }
257
258 /**
259 * Reads a single byte from this stream and returns it as an integer in the range from 0 to 255. Returns -1 if the end of the source string has been
260 * reached. If the internal buffer does not contain any available bytes then it is filled from the source stream and the first byte is returned.
261 *
262 * @return the byte read or -1 if the end of the source stream has been reached.
263 * @throws IOException if this stream is closed or another IOException occurs.
264 */
265 @Override
266 public int read() throws IOException {
267 // Use local refs since buf and in may be invalidated by an
268 // unsynchronized close()
269 byte[] localBuf = buffer;
270 final InputStream localIn = inputStream;
271 if (localBuf == null || localIn == null) {
272 throw new IOException("Stream is closed");
273 }
274
275 /* Are there buffered bytes available? */
276 if (pos >= count && fillBuffer(localIn, localBuf) == IOUtils.EOF) {
277 return IOUtils.EOF; /* no, fill buffer */
278 }
279 // localBuf may have been invalidated by fillbuf
280 if (localBuf != buffer) {
281 localBuf = buffer;
282 if (localBuf == null) {
283 throw new IOException("Stream is closed");
284 }
285 }
286
287 /* Did filling the buffer fail with -1 (EOF)? */
288 if (count - pos > 0) {
289 return localBuf[pos++] & 0xFF;
290 }
291 return IOUtils.EOF;
292 }
293
294 /**
295 * Reads at most {@code length} bytes from this stream and stores them in byte array {@code buffer} starting at offset {@code offset}. Returns the number of
296 * bytes actually read or -1 if no bytes were read and the end of the stream was encountered. If all the buffered bytes have been used, a mark has not been
297 * set and the requested number of bytes is larger than the receiver's buffer size, this implementation bypasses the buffer and simply places the results
298 * directly into {@code buffer}.
299 *
300 * @param dest the byte array in which to store the bytes read.
301 * @param offset the initial position in {@code buffer} to store the bytes read from this stream.
302 * @param length the maximum number of bytes to store in {@code buffer}.
303 * @return the number of bytes actually read or -1 if end of stream.
304 * @throws IndexOutOfBoundsException if {@code offset < 0} or {@code length < 0}, or if {@code offset + length} is greater than the size of {@code buffer}.
305 * @throws IOException if the stream is already closed or another IOException occurs.
306 */
307 @Override
308 public int read(final byte[] dest, int offset, final int length) throws IOException {
309 IOUtils.checkFromIndexSize(dest, offset, length);
310 if (length == 0) {
311 return 0;
312 }
313 // Use local ref since buf may be invalidated by an unsynchronized
314 // close()
315 byte[] localBuf = buffer;
316 if (localBuf == null) {
317 throw new IOException("Stream is closed");
318 }
319 final InputStream localIn = inputStream;
320 if (localIn == null) {
321 throw new IOException("Stream is closed");
322 }
323
324 int required;
325 if (pos < count) {
326 /* There are bytes available in the buffer. */
327 final int copylength = count - pos >= length ? length : count - pos;
328 System.arraycopy(localBuf, pos, dest, offset, copylength);
329 pos += copylength;
330 if (copylength == length || localIn.available() == 0) {
331 return copylength;
332 }
333 offset += copylength;
334 required = length - copylength;
335 } else {
336 required = length;
337 }
338
339 while (true) {
340 final int read;
341 /*
342 * If we're not marked and the required size is greater than the buffer, simply read the bytes directly bypassing the buffer.
343 */
344 if (markPos == IOUtils.EOF && required >= localBuf.length) {
345 read = localIn.read(dest, offset, required);
346 if (read == IOUtils.EOF) {
347 return required == length ? IOUtils.EOF : length - required;
348 }
349 } else {
350 if (fillBuffer(localIn, localBuf) == IOUtils.EOF) {
351 return required == length ? IOUtils.EOF : length - required;
352 }
353 // localBuf may have been invalidated by fillBuffer()
354 if (localBuf != buffer) {
355 localBuf = buffer;
356 if (localBuf == null) {
357 throw new IOException("Stream is closed");
358 }
359 }
360
361 read = count - pos >= required ? required : count - pos;
362 System.arraycopy(localBuf, pos, dest, offset, read);
363 pos += read;
364 }
365 required -= read;
366 if (required == 0) {
367 return length;
368 }
369 if (localIn.available() == 0) {
370 return length - required;
371 }
372 offset += read;
373 }
374 }
375
376 /**
377 * Resets this stream to the last marked location.
378 *
379 * @throws IOException if this stream is closed, no mark has been set or the mark is no longer valid because more than {@code readLimit} bytes have been
380 * read since setting the mark.
381 * @see #mark(int)
382 */
383 @Override
384 public void reset() throws IOException {
385 if (buffer == null) {
386 throw new IOException("Stream is closed");
387 }
388 if (IOUtils.EOF == markPos) {
389 throw new IOException("Mark has been invalidated");
390 }
391 pos = markPos;
392 }
393
394 /**
395 * Skips {@code amount} number of bytes in this stream. Subsequent {@code read()}'s will not return these bytes unless {@code reset()} is used.
396 *
397 * @param amount the number of bytes to skip. {@code skip} does nothing and returns 0 if {@code amount} is less than zero.
398 * @return the number of bytes actually skipped.
399 * @throws IOException if this stream is closed or another IOException occurs.
400 */
401 @Override
402 public long skip(final long amount) throws IOException {
403 // Use local refs since buf and in may be invalidated by an
404 // unsynchronized close()
405 final byte[] localBuf = buffer;
406 final InputStream localIn = inputStream;
407 if (localBuf == null) {
408 throw new IOException("Stream is closed");
409 }
410 if (amount < 1) {
411 return 0;
412 }
413 if (localIn == null) {
414 throw new IOException("Stream is closed");
415 }
416
417 if (count - pos >= amount) {
418 // (int count - int pos) here is always an int so amount is also in the int range if the above test is true.
419 // We can safely cast to int and avoid static analysis warnings.
420 pos += (int) amount;
421 return amount;
422 }
423 int read = count - pos;
424 pos = count;
425
426 if (markPos != IOUtils.EOF && amount <= markLimit) {
427 if (fillBuffer(localIn, localBuf) == IOUtils.EOF) {
428 return read;
429 }
430 if (count - pos >= amount - read) {
431 // (int count - int pos) here is always an int so (amount - read) is also in the int range if the above test is true.
432 // We can safely cast to int and avoid static analysis warnings.
433 pos += (int) amount - read;
434 return amount;
435 }
436 // Couldn't get all the bytes, skip what we read
437 read += count - pos;
438 pos = count;
439 return read;
440 }
441 return read + localIn.skip(amount - read);
442 }
443 }