001/*
002 * Licensed under the Apache License, Version 2.0 (the "License");
003 * you may not use this file except in compliance with the License.
004 * You may obtain a copy of the License at
005 *
006 *     https://www.apache.org/licenses/LICENSE-2.0
007 *
008 * Unless required by applicable law or agreed to in writing, software
009 * distributed under the License is distributed on an "AS IS" BASIS,
010 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
011 * See the License for the specific language governing permissions and
012 * limitations under the License.
013 */
014package org.apache.commons.io.input;
015
016import static org.apache.commons.io.IOUtils.EOF;
017
018import java.io.BufferedInputStream;
019import java.io.File;
020import java.io.IOException;
021import java.io.InputStream;
022import java.nio.ByteBuffer;
023import java.nio.channels.FileChannel;
024import java.nio.file.Path;
025import java.nio.file.StandardOpenOption;
026
027import org.apache.commons.io.IOUtils;
028import org.apache.commons.io.build.AbstractStreamBuilder;
029
030/**
031 * {@link InputStream} implementation which uses direct buffer to read a file to avoid extra copy of data between Java and native memory which happens when
032 * using {@link BufferedInputStream}. Unfortunately, this is not something already available in JDK, {@code sun.nio.ch.ChannelInputStream} supports
033 * reading a file using NIO, but does not support buffering.
034 * <p>
035 * To build an instance, use {@link Builder}.
036 * </p>
037 * <p>
038 * This class was ported and adapted from Apache Spark commit 933dc6cb7b3de1d8ccaf73d124d6eb95b947ed19 where it was called {@code NioBufferedFileInputStream}.
039 * </p>
040 *
041 * @see Builder
042 * @since 2.9.0
043 */
044public final class BufferedFileChannelInputStream extends InputStream {
045
046    // @formatter:off
047    /**
048     * Builds a new {@link BufferedFileChannelInputStream}.
049     *
050     * <p>
051     * Using File IO:
052     * </p>
053     * <pre>{@code
054     * BufferedFileChannelInputStream s = BufferedFileChannelInputStream.builder()
055     *   .setFile(file)
056     *   .setBufferSize(4096)
057     *   .get();}
058     * </pre>
059     * <p>
060     * Using NIO Path:
061     * </p>
062     * <pre>{@code
063     * BufferedFileChannelInputStream s = BufferedFileChannelInputStream.builder()
064     *   .setPath(path)
065     *   .setBufferSize(4096)
066     *   .get();}
067     * </pre>
068     *
069     * @see #get()
070     * @since 2.12.0
071     */
072    // @formatter:on
073    public static class Builder extends AbstractStreamBuilder<BufferedFileChannelInputStream, Builder> {
074
075        private FileChannel fileChannel;
076
077        /**
078         * Constructs a new builder of {@link BufferedFileChannelInputStream}.
079         */
080        public Builder() {
081            // empty
082        }
083
084        /**
085         * Builds a new {@link BufferedFileChannelInputStream}.
086         * <p>
087         * You must set an aspect that supports {@link #getInputStream()}, otherwise, this method throws an exception.
088         * </p>
089         * <p>
090         * This builder uses the following aspects:
091         * </p>
092         * <ul>
093         * <li>{@link FileChannel} takes precedence is set. </li>
094         * <li>{@link #getPath()} if the file channel is not set.</li>
095         * <li>{@link #getBufferSize()}</li>
096         * </ul>
097         *
098         * @return a new instance.
099         * @throws IllegalStateException         if the {@code origin} is {@code null}.
100         * @throws UnsupportedOperationException if the origin cannot be converted to a {@link Path}.
101         * @throws IOException                   if an I/O error occurs converting to an {@link Path} using {@link #getPath()}.
102         * @see #getPath()
103         * @see #getBufferSize()
104         * @see #getUnchecked()
105         */
106        @Override
107        public BufferedFileChannelInputStream get() throws IOException {
108            return new BufferedFileChannelInputStream(this);
109        }
110
111        /**
112         * Sets the file channel.
113         * <p>
114         * This setting takes precedence over all others.
115         * </p>
116         *
117         * @param fileChannel the file channel.
118         * @return this instance.
119         * @since 2.18.0
120         */
121        public Builder setFileChannel(final FileChannel fileChannel) {
122            this.fileChannel = fileChannel;
123            return this;
124        }
125
126    }
127
128    /**
129     * Constructs a new {@link Builder}.
130     *
131     * @return a new {@link Builder}.
132     * @since 2.12.0
133     */
134    public static Builder builder() {
135        return new Builder();
136    }
137
138    private final ByteBuffer byteBuffer;
139
140    private final FileChannel fileChannel;
141
142    @SuppressWarnings("resource")
143    private BufferedFileChannelInputStream(final Builder builder) throws IOException {
144        this.fileChannel = builder.fileChannel != null ? builder.fileChannel : FileChannel.open(builder.getPath(), StandardOpenOption.READ);
145        this.byteBuffer = ByteBuffer.allocateDirect(builder.getBufferSize());
146        this.byteBuffer.flip();
147    }
148
149    /**
150     * Constructs a new instance for the given File.
151     *
152     * @param file The file to stream.
153     * @throws IOException If an I/O error occurs
154     * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
155     */
156    @Deprecated
157    public BufferedFileChannelInputStream(final File file) throws IOException {
158        this(file, IOUtils.DEFAULT_BUFFER_SIZE);
159    }
160
161    /**
162     * Constructs a new instance for the given File and buffer size.
163     *
164     * @param file       The file to stream.
165     * @param bufferSize buffer size.
166     * @throws IOException If an I/O error occurs
167     * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
168     */
169    @Deprecated
170    public BufferedFileChannelInputStream(final File file, final int bufferSize) throws IOException {
171        this(file.toPath(), bufferSize);
172    }
173
174    /**
175     * Constructs a new instance for the given Path.
176     *
177     * @param path The path to stream.
178     * @throws IOException If an I/O error occurs
179     * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
180     */
181    @Deprecated
182    public BufferedFileChannelInputStream(final Path path) throws IOException {
183        this(path, IOUtils.DEFAULT_BUFFER_SIZE);
184    }
185
186    /**
187     * Constructs a new instance for the given Path and buffer size.
188     *
189     * @param path       The path to stream.
190     * @param bufferSize buffer size.
191     * @throws IOException If an I/O error occurs
192     * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
193     */
194    @Deprecated
195    public BufferedFileChannelInputStream(final Path path, final int bufferSize) throws IOException {
196        this(builder().setPath(path).setBufferSize(bufferSize));
197    }
198
199    @Override
200    public synchronized int available() throws IOException {
201        if (!fileChannel.isOpen()) {
202            return 0;
203        }
204        if (!refill()) {
205            return 0;
206        }
207        return byteBuffer.remaining();
208    }
209
210    /**
211     * Attempts to clean up a ByteBuffer if it is direct or memory-mapped. This uses an *unsafe* Sun API that will cause errors if one attempts to read from the
212     * disposed buffer. However, neither the bytes allocated to direct buffers nor file descriptors opened for memory-mapped buffers put pressure on the garbage
213     * collector. Waiting for garbage collection may lead to the depletion of off-heap memory or huge numbers of open files. There's unfortunately no standard
214     * API to manually dispose of these kinds of buffers.
215     *
216     * @param buffer the buffer to clean.
217     */
218    private void clean(final ByteBuffer buffer) {
219        if (buffer.isDirect()) {
220            cleanDirectBuffer(buffer);
221        }
222    }
223
224    /**
225     * In Java 8, the type of {@code sun.nio.ch.DirectBuffer.cleaner()} was {@code sun.misc.Cleaner}, and it was possible to access the method
226     * {@code sun.misc.Cleaner.clean()} to invoke it. The type changed to {@code jdk.internal.ref.Cleaner} in later JDKs, and the {@code clean()} method is not
227     * accessible even with reflection. However {@code sun.misc.Unsafe} added an {@code invokeCleaner()} method in JDK 9+ and this is still accessible with
228     * reflection.
229     *
230     * @param buffer the buffer to clean. must be a DirectBuffer.
231     */
232    private void cleanDirectBuffer(final ByteBuffer buffer) {
233        if (ByteBufferCleaner.isSupported()) {
234            ByteBufferCleaner.clean(buffer);
235        }
236    }
237
238    @Override
239    public synchronized void close() throws IOException {
240        try {
241            fileChannel.close();
242        } finally {
243            clean(byteBuffer);
244        }
245    }
246
247    @Override
248    public synchronized int read() throws IOException {
249        if (!refill()) {
250            return EOF;
251        }
252        return byteBuffer.get() & 0xFF;
253    }
254
255    @Override
256    public synchronized int read(final byte[] b, final int offset, int len) throws IOException {
257        if (offset < 0 || len < 0 || offset + len < 0 || offset + len > b.length) {
258            throw new IndexOutOfBoundsException();
259        }
260        if (!refill()) {
261            return EOF;
262        }
263        len = Math.min(len, byteBuffer.remaining());
264        byteBuffer.get(b, offset, len);
265        return len;
266    }
267
268    /**
269     * Checks whether data is left to be read from the input stream.
270     *
271     * @return true if data is left, false otherwise
272     * @throws IOException if an I/O error occurs.
273     */
274    private boolean refill() throws IOException {
275        Input.checkOpen(fileChannel.isOpen());
276        if (!byteBuffer.hasRemaining()) {
277            byteBuffer.clear();
278            int nRead = 0;
279            while (nRead == 0) {
280                nRead = fileChannel.read(byteBuffer);
281            }
282            byteBuffer.flip();
283            return nRead >= 0;
284        }
285        return true;
286    }
287
288    @Override
289    public synchronized long skip(final long n) throws IOException {
290        if (n <= 0L) {
291            return 0L;
292        }
293        if (byteBuffer.remaining() >= n) {
294            // The buffered content is enough to skip
295            byteBuffer.position(byteBuffer.position() + (int) n);
296            return n;
297        }
298        final long skippedFromBuffer = byteBuffer.remaining();
299        final long toSkipFromFileChannel = n - skippedFromBuffer;
300        // Discard everything we have read in the buffer.
301        byteBuffer.position(0);
302        byteBuffer.flip();
303        return skippedFromBuffer + skipFromFileChannel(toSkipFromFileChannel);
304    }
305
306    private long skipFromFileChannel(final long n) throws IOException {
307        final long currentFilePosition = fileChannel.position();
308        final long size = fileChannel.size();
309        if (n > size - currentFilePosition) {
310            fileChannel.position(size);
311            return size - currentFilePosition;
312        }
313        fileChannel.position(currentFilePosition + n);
314        return n;
315    }
316
317}