View Javadoc
1   /*
2    * Licensed under the Apache License, Version 2.0 (the "License");
3    * you may not use this file except in compliance with the License.
4    * You may obtain a copy of the License at
5    *
6    *     http://www.apache.org/licenses/LICENSE-2.0
7    *
8    * Unless required by applicable law or agreed to in writing, software
9    * distributed under the License is distributed on an "AS IS" BASIS,
10   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11   * See the License for the specific language governing permissions and
12   * limitations under the License.
13   */
14  package org.apache.commons.io.input;
15  
16  import static org.apache.commons.io.IOUtils.EOF;
17  
18  import java.io.File;
19  import java.io.IOException;
20  import java.io.InputStream;
21  import java.nio.ByteBuffer;
22  import java.nio.channels.FileChannel;
23  import java.nio.file.Path;
24  import java.nio.file.StandardOpenOption;
25  import java.util.Objects;
26  
27  import org.apache.commons.io.IOUtils;
28  import org.apache.commons.io.build.AbstractOrigin;
29  import org.apache.commons.io.build.AbstractStreamBuilder;
30  
31  /**
32   * {@link InputStream} implementation which uses direct buffer to read a file to avoid extra copy of data between Java and native memory which happens when
33   * using {@link java.io.BufferedInputStream}. Unfortunately, this is not something already available in JDK, {@code sun.nio.ch.ChannelInputStream} supports
34   * reading a file using NIO, but does not support buffering.
35   * <p>
36   * To build an instance, see {@link Builder}.
37   * </p>
38   * <p>
39   * This class was ported and adapted from Apache Spark commit 933dc6cb7b3de1d8ccaf73d124d6eb95b947ed19 where it was called {@code NioBufferedFileInputStream}.
40   * </p>
41   *
42   * @since 2.9.0
43   */
44  public final class BufferedFileChannelInputStream extends InputStream {
45  
46      /**
47       * Builds a new {@link BufferedFileChannelInputStream} instance.
48       * <p>
49       * Using File IO:
50       * </p>
51       *
52       * <pre>{@code
53       * BufferedFileChannelInputStream s = BufferedFileChannelInputStream.builder()
54       *   .setFile(file)
55       *   .setBufferSize(4096)
56       *   .get();}
57       * </pre>
58       * <p>
59       * Using NIO Path:
60       * </p>
61       *
62       * <pre>{@code
63       * BufferedFileChannelInputStream s = BufferedFileChannelInputStream.builder()
64       *   .setPath(path)
65       *   .setBufferSize(4096)
66       *   .get();}
67       * </pre>
68       *
69       * @since 2.12.0
70       */
71      public static class Builder extends AbstractStreamBuilder<BufferedFileChannelInputStream, Builder> {
72  
73          /**
74           * Constructs a new instance.
75           * <p>
76           * This builder use the aspects Path and buffer size.
77           * </p>
78           * <p>
79           * You must provide an origin that can be converted to a Path by this builder, otherwise, this call will throw an
80           * {@link UnsupportedOperationException}.
81           * </p>
82           *
83           * @return a new instance.
84           * @throws UnsupportedOperationException if the origin cannot provide a Path.
85           * @see AbstractOrigin#getPath()
86           */
87          @Override
88          public BufferedFileChannelInputStream get() throws IOException {
89              return new BufferedFileChannelInputStream(getPath(), getBufferSize());
90          }
91  
92      }
93  
94      /**
95       * Constructs a new {@link Builder}.
96       *
97       * @return a new {@link Builder}.
98       * @since 2.12.0
99       */
100     public static Builder builder() {
101         return new Builder();
102     }
103 
104     private final ByteBuffer byteBuffer;
105 
106     private final FileChannel fileChannel;
107 
108     /**
109      * Constructs a new instance for the given File.
110      *
111      * @param file The file to stream.
112      * @throws IOException If an I/O error occurs
113      * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
114      */
115     @Deprecated
116     public BufferedFileChannelInputStream(final File file) throws IOException {
117         this(file, IOUtils.DEFAULT_BUFFER_SIZE);
118     }
119 
120     /**
121      * Constructs a new instance for the given File and buffer size.
122      *
123      * @param file       The file to stream.
124      * @param bufferSize buffer size.
125      * @throws IOException If an I/O error occurs
126      * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
127      */
128     @Deprecated
129     public BufferedFileChannelInputStream(final File file, final int bufferSize) throws IOException {
130         this(file.toPath(), bufferSize);
131     }
132 
133     /**
134      * Constructs a new instance for the given Path.
135      *
136      * @param path The path to stream.
137      * @throws IOException If an I/O error occurs
138      * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
139      */
140     @Deprecated
141     public BufferedFileChannelInputStream(final Path path) throws IOException {
142         this(path, IOUtils.DEFAULT_BUFFER_SIZE);
143     }
144 
145     /**
146      * Constructs a new instance for the given Path and buffer size.
147      *
148      * @param path       The path to stream.
149      * @param bufferSize buffer size.
150      * @throws IOException If an I/O error occurs
151      * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
152      */
153     @Deprecated
154     public BufferedFileChannelInputStream(final Path path, final int bufferSize) throws IOException {
155         Objects.requireNonNull(path, "path");
156         fileChannel = FileChannel.open(path, StandardOpenOption.READ);
157         byteBuffer = ByteBuffer.allocateDirect(bufferSize);
158         byteBuffer.flip();
159     }
160 
161     @Override
162     public synchronized int available() throws IOException {
163         return byteBuffer.remaining();
164     }
165 
166     /**
167      * Attempts to clean up a ByteBuffer if it is direct or memory-mapped. This uses an *unsafe* Sun API that will cause errors if one attempts to read from the
168      * disposed buffer. However, neither the bytes allocated to direct buffers nor file descriptors opened for memory-mapped buffers put pressure on the garbage
169      * collector. Waiting for garbage collection may lead to the depletion of off-heap memory or huge numbers of open files. There's unfortunately no standard
170      * API to manually dispose of these kinds of buffers.
171      *
172      * @param buffer the buffer to clean.
173      */
174     private void clean(final ByteBuffer buffer) {
175         if (buffer.isDirect()) {
176             cleanDirectBuffer(buffer);
177         }
178     }
179 
180     /**
181      * In Java 8, the type of {@code sun.nio.ch.DirectBuffer.cleaner()} was {@code sun.misc.Cleaner}, and it was possible to access the method
182      * {@code sun.misc.Cleaner.clean()} to invoke it. The type changed to {@code jdk.internal.ref.Cleaner} in later JDKs, and the {@code clean()} method is not
183      * accessible even with reflection. However {@code sun.misc.Unsafe} added an {@code invokeCleaner()} method in JDK 9+ and this is still accessible with
184      * reflection.
185      *
186      * @param buffer the buffer to clean. must be a DirectBuffer.
187      */
188     private void cleanDirectBuffer(final ByteBuffer buffer) {
189         if (ByteBufferCleaner.isSupported()) {
190             ByteBufferCleaner.clean(buffer);
191         }
192     }
193 
194     @Override
195     public synchronized void close() throws IOException {
196         try {
197             fileChannel.close();
198         } finally {
199             clean(byteBuffer);
200         }
201     }
202 
203     @Override
204     public synchronized int read() throws IOException {
205         if (!refill()) {
206             return EOF;
207         }
208         return byteBuffer.get() & 0xFF;
209     }
210 
211     @Override
212     public synchronized int read(final byte[] b, final int offset, int len) throws IOException {
213         if (offset < 0 || len < 0 || offset + len < 0 || offset + len > b.length) {
214             throw new IndexOutOfBoundsException();
215         }
216         if (!refill()) {
217             return EOF;
218         }
219         len = Math.min(len, byteBuffer.remaining());
220         byteBuffer.get(b, offset, len);
221         return len;
222     }
223 
224     /**
225      * Checks whether data is left to be read from the input stream.
226      *
227      * @return true if data is left, false otherwise
228      * @throws IOException if an I/O error occurs.
229      */
230     private boolean refill() throws IOException {
231         if (!byteBuffer.hasRemaining()) {
232             byteBuffer.clear();
233             int nRead = 0;
234             while (nRead == 0) {
235                 nRead = fileChannel.read(byteBuffer);
236             }
237             byteBuffer.flip();
238             return nRead >= 0;
239         }
240         return true;
241     }
242 
243     @Override
244     public synchronized long skip(final long n) throws IOException {
245         if (n <= 0L) {
246             return 0L;
247         }
248         if (byteBuffer.remaining() >= n) {
249             // The buffered content is enough to skip
250             byteBuffer.position(byteBuffer.position() + (int) n);
251             return n;
252         }
253         final long skippedFromBuffer = byteBuffer.remaining();
254         final long toSkipFromFileChannel = n - skippedFromBuffer;
255         // Discard everything we have read in the buffer.
256         byteBuffer.position(0);
257         byteBuffer.flip();
258         return skippedFromBuffer + skipFromFileChannel(toSkipFromFileChannel);
259     }
260 
261     private long skipFromFileChannel(final long n) throws IOException {
262         final long currentFilePosition = fileChannel.position();
263         final long size = fileChannel.size();
264         if (n > size - currentFilePosition) {
265             fileChannel.position(size);
266             return size - currentFilePosition;
267         }
268         fileChannel.position(currentFilePosition + n);
269         return n;
270     }
271 
272 }