001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * https://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018package org.apache.commons.io.input; 019 020import java.io.BufferedInputStream; 021import java.io.IOException; 022import java.io.InputStream; 023 024import org.apache.commons.io.IOUtils; 025import org.apache.commons.io.build.AbstractStreamBuilder; 026 027/** 028 * An unsynchronized version of {@link BufferedInputStream}, not thread-safe. 029 * <p> 030 * Wraps an existing {@link InputStream} and <em>buffers</em> the input. Expensive interaction with the underlying input stream is minimized, since most 031 * (smaller) requests can be satisfied by accessing the buffer alone. The drawback is that some extra space is required to hold the buffer and that copying 032 * takes place when filling that buffer, but this is usually outweighed by the performance benefits. 033 * </p> 034 * <p> 035 * To build an instance, use {@link Builder}. 036 * </p> 037 * <p> 038 * A typical application pattern for the class looks like this: 039 * </p> 040 * 041 * <pre> 042 * UnsynchronizedBufferedInputStream s = new UnsynchronizedBufferedInputStream.Builder(). 043 * .setInputStream(new FileInputStream("file.java")) 044 * .setBufferSize(8192) 045 * .get(); 046 * </pre> 047 * <p> 048 * Provenance: Apache Harmony and modified. 049 * </p> 050 * 051 * @see Builder 052 * @see BufferedInputStream 053 * @since 2.12.0 054 */ 055//@NotThreadSafe 056public final class UnsynchronizedBufferedInputStream extends UnsynchronizedFilterInputStream { 057 058 // @formatter:off 059 /** 060 * Builds a new {@link UnsynchronizedBufferedInputStream}. 061 * 062 * <p> 063 * Using File IO: 064 * </p> 065 * <pre>{@code 066 * UnsynchronizedBufferedInputStream s = UnsynchronizedBufferedInputStream.builder() 067 * .setFile(file) 068 * .setBufferSize(8192) 069 * .get();} 070 * </pre> 071 * <p> 072 * Using NIO Path: 073 * </p> 074 * <pre>{@code 075 * UnsynchronizedBufferedInputStream s = UnsynchronizedBufferedInputStream.builder() 076 * .setPath(path) 077 * .setBufferSize(8192) 078 * .get();} 079 * </pre> 080 * 081 * @see #get() 082 */ 083 // @formatter:on 084 public static class Builder extends AbstractStreamBuilder<UnsynchronizedBufferedInputStream, Builder> { 085 086 /** 087 * Constructs a builder of {@link UnsynchronizedBufferedInputStream}. 088 */ 089 public Builder() { 090 // empty 091 } 092 093 /** 094 * Builds a new {@link UnsynchronizedBufferedInputStream}. 095 * <p> 096 * You must set an aspect that supports {@link #getInputStream()} on this builder, otherwise, this method throws an exception. 097 * </p> 098 * <p> 099 * This builder uses the following aspects: 100 * </p> 101 * <ul> 102 * <li>{@link #getInputStream()}</li> 103 * <li>{@link #getBufferSize()}</li> 104 * </ul> 105 * 106 * @return a new instance. 107 * @throws IllegalStateException if the {@code origin} is {@code null}. 108 * @throws UnsupportedOperationException if the origin cannot be converted to an {@link InputStream}. 109 * @throws IOException if an I/O error occurs converting to an {@link InputStream} using {@link #getInputStream()}. 110 * @see #getInputStream() 111 * @see #getBufferSize() 112 * @see #getUnchecked() 113 */ 114 @Override 115 public UnsynchronizedBufferedInputStream get() throws IOException { 116 return new UnsynchronizedBufferedInputStream(this); 117 } 118 119 } 120 121 /** 122 * The buffer containing the current bytes read from the target InputStream. 123 */ 124 protected volatile byte[] buffer; 125 126 /** 127 * The total number of bytes inside the byte array {@code buffer}. 128 */ 129 protected int count; 130 131 /** 132 * The current limit, which when passed, invalidates the current mark. 133 */ 134 protected int markLimit; 135 136 /** 137 * The currently marked position. -1 indicates no mark has been set or the mark has been invalidated. 138 */ 139 protected int markPos = IOUtils.EOF; 140 141 /** 142 * The current position within the byte array {@code buffer}. 143 */ 144 protected int pos; 145 146 /** 147 * Constructs a new {@code BufferedInputStream} on the {@link InputStream} {@code in}. The buffer size is specified by the parameter {@code size} and all 148 * reads are now filtered through this stream. 149 * 150 * @param builder A builder providing the input stream and buffer size. 151 * @throws IOException if an I/O error occurs. 152 * @throws IllegalArgumentException if {@code size < 0}. 153 */ 154 @SuppressWarnings("resource") 155 private UnsynchronizedBufferedInputStream(final Builder builder) throws IOException { 156 super(builder.getInputStream()); 157 final int bufferSize = builder.getBufferSize(); 158 if (bufferSize <= 0) { 159 throw new IllegalArgumentException("Size must be > 0"); 160 } 161 buffer = new byte[bufferSize]; 162 } 163 164 /** 165 * Returns the number of bytes that are available before this stream will block. This method returns the number of bytes available in the buffer plus those 166 * available in the source stream. 167 * 168 * @return the number of bytes available before blocking. 169 * @throws IOException if this stream is closed. 170 */ 171 @Override 172 public int available() throws IOException { 173 final InputStream localIn = inputStream; // 'in' could be invalidated by close() 174 if (buffer == null || localIn == null) { 175 throw new IOException("Stream is closed"); 176 } 177 return count - pos + localIn.available(); 178 } 179 180 /** 181 * Closes this stream. The source stream is closed and any resources associated with it are released. 182 * 183 * @throws IOException if an error occurs while closing this stream. 184 */ 185 @Override 186 public void close() throws IOException { 187 buffer = null; 188 final InputStream localIn = inputStream; 189 inputStream = null; 190 if (localIn != null) { 191 localIn.close(); 192 } 193 } 194 195 private int fillBuffer(final InputStream localIn, byte[] localBuf) throws IOException { 196 if (markPos == IOUtils.EOF || pos - markPos >= markLimit) { 197 /* Mark position not set or exceeded readLimit */ 198 final int result = localIn.read(localBuf); 199 if (result > 0) { 200 markPos = IOUtils.EOF; 201 pos = 0; 202 count = result; 203 } 204 return result; 205 } 206 if (markPos == 0 && markLimit > localBuf.length) { 207 /* Increase buffer size to accommodate the readLimit */ 208 int newLength = localBuf.length * 2; 209 if (newLength > markLimit) { 210 newLength = markLimit; 211 } 212 final byte[] newbuf = new byte[newLength]; 213 System.arraycopy(localBuf, 0, newbuf, 0, localBuf.length); 214 // Reassign buffer, which will invalidate any local references 215 // FIXME: what if buffer was null? 216 localBuf = buffer = newbuf; 217 } else if (markPos > 0) { 218 System.arraycopy(localBuf, markPos, localBuf, 0, localBuf.length - markPos); 219 } 220 // Set the new position and mark position 221 pos -= markPos; 222 count = markPos = 0; 223 final int bytesread = localIn.read(localBuf, pos, localBuf.length - pos); 224 count = bytesread <= 0 ? pos : pos + bytesread; 225 return bytesread; 226 } 227 228 byte[] getBuffer() { 229 return buffer; 230 } 231 232 /** 233 * Sets a mark position in this stream. The parameter {@code readLimit} indicates how many bytes can be read before a mark is invalidated. Calling 234 * {@code reset()} will reposition the stream back to the marked position if {@code readLimit} has not been surpassed. The underlying buffer may be 235 * increased in size to allow {@code readLimit} number of bytes to be supported. 236 * 237 * @param readLimit the number of bytes that can be read before the mark is invalidated. 238 * @see #reset() 239 */ 240 @Override 241 public void mark(final int readLimit) { 242 markLimit = readLimit; 243 markPos = pos; 244 } 245 246 /** 247 * Indicates whether {@code BufferedInputStream} supports the {@code mark()} and {@code reset()} methods. 248 * 249 * @return {@code true} for BufferedInputStreams. 250 * @see #mark(int) 251 * @see #reset() 252 */ 253 @Override 254 public boolean markSupported() { 255 return true; 256 } 257 258 /** 259 * Reads a single byte from this stream and returns it as an integer in the range from 0 to 255. Returns -1 if the end of the source string has been 260 * reached. If the internal buffer does not contain any available bytes then it is filled from the source stream and the first byte is returned. 261 * 262 * @return the byte read or -1 if the end of the source stream has been reached. 263 * @throws IOException if this stream is closed or another IOException occurs. 264 */ 265 @Override 266 public int read() throws IOException { 267 // Use local refs since buf and in may be invalidated by an 268 // unsynchronized close() 269 byte[] localBuf = buffer; 270 final InputStream localIn = inputStream; 271 if (localBuf == null || localIn == null) { 272 throw new IOException("Stream is closed"); 273 } 274 275 /* Are there buffered bytes available? */ 276 if (pos >= count && fillBuffer(localIn, localBuf) == IOUtils.EOF) { 277 return IOUtils.EOF; /* no, fill buffer */ 278 } 279 // localBuf may have been invalidated by fillbuf 280 if (localBuf != buffer) { 281 localBuf = buffer; 282 if (localBuf == null) { 283 throw new IOException("Stream is closed"); 284 } 285 } 286 287 /* Did filling the buffer fail with -1 (EOF)? */ 288 if (count - pos > 0) { 289 return localBuf[pos++] & 0xFF; 290 } 291 return IOUtils.EOF; 292 } 293 294 /** 295 * Reads at most {@code length} bytes from this stream and stores them in byte array {@code buffer} starting at offset {@code offset}. Returns the number of 296 * bytes actually read or -1 if no bytes were read and the end of the stream was encountered. If all the buffered bytes have been used, a mark has not been 297 * set and the requested number of bytes is larger than the receiver's buffer size, this implementation bypasses the buffer and simply places the results 298 * directly into {@code buffer}. 299 * 300 * @param dest the byte array in which to store the bytes read. 301 * @param offset the initial position in {@code buffer} to store the bytes read from this stream. 302 * @param length the maximum number of bytes to store in {@code buffer}. 303 * @return the number of bytes actually read or -1 if end of stream. 304 * @throws IndexOutOfBoundsException if {@code offset < 0} or {@code length < 0}, or if {@code offset + length} is greater than the size of {@code buffer}. 305 * @throws IOException if the stream is already closed or another IOException occurs. 306 */ 307 @Override 308 public int read(final byte[] dest, int offset, final int length) throws IOException { 309 // Use local ref since buf may be invalidated by an unsynchronized 310 // close() 311 byte[] localBuf = buffer; 312 if (localBuf == null) { 313 throw new IOException("Stream is closed"); 314 } 315 // avoid int overflow 316 if (offset > dest.length - length || offset < 0 || length < 0) { 317 throw new IndexOutOfBoundsException(); 318 } 319 if (length == 0) { 320 return 0; 321 } 322 final InputStream localIn = inputStream; 323 if (localIn == null) { 324 throw new IOException("Stream is closed"); 325 } 326 327 int required; 328 if (pos < count) { 329 /* There are bytes available in the buffer. */ 330 final int copylength = count - pos >= length ? length : count - pos; 331 System.arraycopy(localBuf, pos, dest, offset, copylength); 332 pos += copylength; 333 if (copylength == length || localIn.available() == 0) { 334 return copylength; 335 } 336 offset += copylength; 337 required = length - copylength; 338 } else { 339 required = length; 340 } 341 342 while (true) { 343 final int read; 344 /* 345 * If we're not marked and the required size is greater than the buffer, simply read the bytes directly bypassing the buffer. 346 */ 347 if (markPos == IOUtils.EOF && required >= localBuf.length) { 348 read = localIn.read(dest, offset, required); 349 if (read == IOUtils.EOF) { 350 return required == length ? IOUtils.EOF : length - required; 351 } 352 } else { 353 if (fillBuffer(localIn, localBuf) == IOUtils.EOF) { 354 return required == length ? IOUtils.EOF : length - required; 355 } 356 // localBuf may have been invalidated by fillBuffer() 357 if (localBuf != buffer) { 358 localBuf = buffer; 359 if (localBuf == null) { 360 throw new IOException("Stream is closed"); 361 } 362 } 363 364 read = count - pos >= required ? required : count - pos; 365 System.arraycopy(localBuf, pos, dest, offset, read); 366 pos += read; 367 } 368 required -= read; 369 if (required == 0) { 370 return length; 371 } 372 if (localIn.available() == 0) { 373 return length - required; 374 } 375 offset += read; 376 } 377 } 378 379 /** 380 * Resets this stream to the last marked location. 381 * 382 * @throws IOException if this stream is closed, no mark has been set or the mark is no longer valid because more than {@code readLimit} bytes have been 383 * read since setting the mark. 384 * @see #mark(int) 385 */ 386 @Override 387 public void reset() throws IOException { 388 if (buffer == null) { 389 throw new IOException("Stream is closed"); 390 } 391 if (IOUtils.EOF == markPos) { 392 throw new IOException("Mark has been invalidated"); 393 } 394 pos = markPos; 395 } 396 397 /** 398 * Skips {@code amount} number of bytes in this stream. Subsequent {@code read()}'s will not return these bytes unless {@code reset()} is used. 399 * 400 * @param amount the number of bytes to skip. {@code skip} does nothing and returns 0 if {@code amount} is less than zero. 401 * @return the number of bytes actually skipped. 402 * @throws IOException if this stream is closed or another IOException occurs. 403 */ 404 @Override 405 public long skip(final long amount) throws IOException { 406 // Use local refs since buf and in may be invalidated by an 407 // unsynchronized close() 408 final byte[] localBuf = buffer; 409 final InputStream localIn = inputStream; 410 if (localBuf == null) { 411 throw new IOException("Stream is closed"); 412 } 413 if (amount < 1) { 414 return 0; 415 } 416 if (localIn == null) { 417 throw new IOException("Stream is closed"); 418 } 419 420 if (count - pos >= amount) { 421 // (int count - int pos) here is always an int so amount is also in the int range if the above test is true. 422 // We can safely cast to int and avoid static analysis warnings. 423 pos += (int) amount; 424 return amount; 425 } 426 int read = count - pos; 427 pos = count; 428 429 if (markPos != IOUtils.EOF && amount <= markLimit) { 430 if (fillBuffer(localIn, localBuf) == IOUtils.EOF) { 431 return read; 432 } 433 if (count - pos >= amount - read) { 434 // (int count - int pos) here is always an int so (amount - read) is also in the int range if the above test is true. 435 // We can safely cast to int and avoid static analysis warnings. 436 pos += (int) amount - read; 437 return amount; 438 } 439 // Couldn't get all the bytes, skip what we read 440 read += count - pos; 441 pos = count; 442 return read; 443 } 444 return read + localIn.skip(amount - read); 445 } 446}