View Javadoc
1   /*
2    * Licensed under the Apache License, Version 2.0 (the "License");
3    * you may not use this file except in compliance with the License.
4    * You may obtain a copy of the License at
5    *
6    *     http://www.apache.org/licenses/LICENSE-2.0
7    *
8    * Unless required by applicable law or agreed to in writing, software
9    * distributed under the License is distributed on an "AS IS" BASIS,
10   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11   * See the License for the specific language governing permissions and
12   * limitations under the License.
13   */
14  package org.apache.commons.io.input;
15  
16  import static org.apache.commons.io.IOUtils.EOF;
17  
18  import java.io.BufferedInputStream;
19  import java.io.File;
20  import java.io.IOException;
21  import java.io.InputStream;
22  import java.nio.ByteBuffer;
23  import java.nio.channels.FileChannel;
24  import java.nio.file.Path;
25  import java.nio.file.StandardOpenOption;
26  import java.util.Objects;
27  
28  import org.apache.commons.io.IOUtils;
29  import org.apache.commons.io.build.AbstractStreamBuilder;
30  
31  /**
32   * {@link InputStream} implementation which uses direct buffer to read a file to avoid extra copy of data between Java and native memory which happens when
33   * using {@link BufferedInputStream}. Unfortunately, this is not something already available in JDK, {@code sun.nio.ch.ChannelInputStream} supports
34   * reading a file using NIO, but does not support buffering.
35   * <p>
36   * To build an instance, use {@link Builder}.
37   * </p>
38   * <p>
39   * This class was ported and adapted from Apache Spark commit 933dc6cb7b3de1d8ccaf73d124d6eb95b947ed19 where it was called {@code NioBufferedFileInputStream}.
40   * </p>
41   *
42   * @see Builder
43   * @since 2.9.0
44   */
45  public final class BufferedFileChannelInputStream extends InputStream {
46  
47      // @formatter:off
48      /**
49       * Builds a new {@link BufferedFileChannelInputStream}.
50       *
51       * <p>
52       * Using File IO:
53       * </p>
54       * <pre>{@code
55       * BufferedFileChannelInputStream s = BufferedFileChannelInputStream.builder()
56       *   .setFile(file)
57       *   .setBufferSize(4096)
58       *   .get();}
59       * </pre>
60       * <p>
61       * Using NIO Path:
62       * </p>
63       * <pre>{@code
64       * BufferedFileChannelInputStream s = BufferedFileChannelInputStream.builder()
65       *   .setPath(path)
66       *   .setBufferSize(4096)
67       *   .get();}
68       * </pre>
69       *
70       * @see #get()
71       * @since 2.12.0
72       */
73      // @formatter:on
74      public static class Builder extends AbstractStreamBuilder<BufferedFileChannelInputStream, Builder> {
75  
76          private FileChannel fileChannel;
77  
78          /**
79           * Constructs a new builder of {@link BufferedFileChannelInputStream}.
80           */
81          public Builder() {
82              // empty
83          }
84  
85          /**
86           * Builds a new {@link BufferedFileChannelInputStream}.
87           * <p>
88           * You must set an aspect that supports {@link #getInputStream()}, otherwise, this method throws an exception.
89           * </p>
90           * <p>
91           * This builder uses the following aspects:
92           * </p>
93           * <ul>
94           * <li>{@link FileChannel} takes precedence is set. </li>
95           * <li>{@link #getPath()} if the file channel is not set.</li>
96           * <li>{@link #getBufferSize()}</li>
97           * </ul>
98           *
99           * @return a new instance.
100          * @throws IllegalStateException         if the {@code origin} is {@code null}.
101          * @throws UnsupportedOperationException if the origin cannot be converted to a {@link Path}.
102          * @throws IOException                   if an I/O error occurs converting to an {@link Path} using {@link #getPath()}.
103          * @see #getPath()
104          * @see #getBufferSize()
105          * @see #getUnchecked()
106          */
107         @Override
108         public BufferedFileChannelInputStream get() throws IOException {
109             return fileChannel != null ? new BufferedFileChannelInputStream(fileChannel, getBufferSize())
110                     : new BufferedFileChannelInputStream(getPath(), getBufferSize());
111         }
112 
113         /**
114          * Sets the file channel.
115          * <p>
116          * This setting takes precedence over all others.
117          * </p>
118          *
119          * @param fileChannel the file channel.
120          * @return this instance.
121          * @since 2.18.0
122          */
123         public Builder setFileChannel(final FileChannel fileChannel) {
124             this.fileChannel = fileChannel;
125             return this;
126         }
127 
128     }
129 
130     /**
131      * Constructs a new {@link Builder}.
132      *
133      * @return a new {@link Builder}.
134      * @since 2.12.0
135      */
136     public static Builder builder() {
137         return new Builder();
138     }
139 
140     private final ByteBuffer byteBuffer;
141 
142     private final FileChannel fileChannel;
143 
144     /**
145      * Constructs a new instance for the given File.
146      *
147      * @param file The file to stream.
148      * @throws IOException If an I/O error occurs
149      * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
150      */
151     @Deprecated
152     public BufferedFileChannelInputStream(final File file) throws IOException {
153         this(file, IOUtils.DEFAULT_BUFFER_SIZE);
154     }
155 
156     /**
157      * Constructs a new instance for the given File and buffer size.
158      *
159      * @param file       The file to stream.
160      * @param bufferSize buffer size.
161      * @throws IOException If an I/O error occurs
162      * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
163      */
164     @Deprecated
165     public BufferedFileChannelInputStream(final File file, final int bufferSize) throws IOException {
166         this(file.toPath(), bufferSize);
167     }
168 
169     private BufferedFileChannelInputStream(final FileChannel fileChannel, final int bufferSize) {
170         this.fileChannel = Objects.requireNonNull(fileChannel, "path");
171         byteBuffer = ByteBuffer.allocateDirect(bufferSize);
172         byteBuffer.flip();
173     }
174 
175     /**
176      * Constructs a new instance for the given Path.
177      *
178      * @param path The path to stream.
179      * @throws IOException If an I/O error occurs
180      * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
181      */
182     @Deprecated
183     public BufferedFileChannelInputStream(final Path path) throws IOException {
184         this(path, IOUtils.DEFAULT_BUFFER_SIZE);
185     }
186 
187     /**
188      * Constructs a new instance for the given Path and buffer size.
189      *
190      * @param path       The path to stream.
191      * @param bufferSize buffer size.
192      * @throws IOException If an I/O error occurs
193      * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
194      */
195     @SuppressWarnings("resource")
196     @Deprecated
197     public BufferedFileChannelInputStream(final Path path, final int bufferSize) throws IOException {
198         this(FileChannel.open(path, StandardOpenOption.READ), bufferSize);
199     }
200 
201     @Override
202     public synchronized int available() throws IOException {
203         if (!fileChannel.isOpen()) {
204             return 0;
205         }
206         if (!refill()) {
207             return 0;
208         }
209         return byteBuffer.remaining();
210     }
211 
212     /**
213      * Attempts to clean up a ByteBuffer if it is direct or memory-mapped. This uses an *unsafe* Sun API that will cause errors if one attempts to read from the
214      * disposed buffer. However, neither the bytes allocated to direct buffers nor file descriptors opened for memory-mapped buffers put pressure on the garbage
215      * collector. Waiting for garbage collection may lead to the depletion of off-heap memory or huge numbers of open files. There's unfortunately no standard
216      * API to manually dispose of these kinds of buffers.
217      *
218      * @param buffer the buffer to clean.
219      */
220     private void clean(final ByteBuffer buffer) {
221         if (buffer.isDirect()) {
222             cleanDirectBuffer(buffer);
223         }
224     }
225 
226     /**
227      * In Java 8, the type of {@code sun.nio.ch.DirectBuffer.cleaner()} was {@code sun.misc.Cleaner}, and it was possible to access the method
228      * {@code sun.misc.Cleaner.clean()} to invoke it. The type changed to {@code jdk.internal.ref.Cleaner} in later JDKs, and the {@code clean()} method is not
229      * accessible even with reflection. However {@code sun.misc.Unsafe} added an {@code invokeCleaner()} method in JDK 9+ and this is still accessible with
230      * reflection.
231      *
232      * @param buffer the buffer to clean. must be a DirectBuffer.
233      */
234     private void cleanDirectBuffer(final ByteBuffer buffer) {
235         if (ByteBufferCleaner.isSupported()) {
236             ByteBufferCleaner.clean(buffer);
237         }
238     }
239 
240     @Override
241     public synchronized void close() throws IOException {
242         try {
243             fileChannel.close();
244         } finally {
245             clean(byteBuffer);
246         }
247     }
248 
249     @Override
250     public synchronized int read() throws IOException {
251         if (!refill()) {
252             return EOF;
253         }
254         return byteBuffer.get() & 0xFF;
255     }
256 
257     @Override
258     public synchronized int read(final byte[] b, final int offset, int len) throws IOException {
259         if (offset < 0 || len < 0 || offset + len < 0 || offset + len > b.length) {
260             throw new IndexOutOfBoundsException();
261         }
262         if (!refill()) {
263             return EOF;
264         }
265         len = Math.min(len, byteBuffer.remaining());
266         byteBuffer.get(b, offset, len);
267         return len;
268     }
269 
270     /**
271      * Checks whether data is left to be read from the input stream.
272      *
273      * @return true if data is left, false otherwise
274      * @throws IOException if an I/O error occurs.
275      */
276     private boolean refill() throws IOException {
277         Input.checkOpen(fileChannel.isOpen());
278         if (!byteBuffer.hasRemaining()) {
279             byteBuffer.clear();
280             int nRead = 0;
281             while (nRead == 0) {
282                 nRead = fileChannel.read(byteBuffer);
283             }
284             byteBuffer.flip();
285             return nRead >= 0;
286         }
287         return true;
288     }
289 
290     @Override
291     public synchronized long skip(final long n) throws IOException {
292         if (n <= 0L) {
293             return 0L;
294         }
295         if (byteBuffer.remaining() >= n) {
296             // The buffered content is enough to skip
297             byteBuffer.position(byteBuffer.position() + (int) n);
298             return n;
299         }
300         final long skippedFromBuffer = byteBuffer.remaining();
301         final long toSkipFromFileChannel = n - skippedFromBuffer;
302         // Discard everything we have read in the buffer.
303         byteBuffer.position(0);
304         byteBuffer.flip();
305         return skippedFromBuffer + skipFromFileChannel(toSkipFromFileChannel);
306     }
307 
308     private long skipFromFileChannel(final long n) throws IOException {
309         final long currentFilePosition = fileChannel.position();
310         final long size = fileChannel.size();
311         if (n > size - currentFilePosition) {
312             fileChannel.position(size);
313             return size - currentFilePosition;
314         }
315         fileChannel.position(currentFilePosition + n);
316         return n;
317     }
318 
319 }