001/*
002 * Licensed under the Apache License, Version 2.0 (the "License");
003 * you may not use this file except in compliance with the License.
004 * You may obtain a copy of the License at
005 *
006 *     http://www.apache.org/licenses/LICENSE-2.0
007 *
008 * Unless required by applicable law or agreed to in writing, software
009 * distributed under the License is distributed on an "AS IS" BASIS,
010 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
011 * See the License for the specific language governing permissions and
012 * limitations under the License.
013 */
014package org.apache.commons.io.input;
015
016import static org.apache.commons.io.IOUtils.EOF;
017
018import java.io.File;
019import java.io.IOException;
020import java.io.InputStream;
021import java.nio.ByteBuffer;
022import java.nio.channels.FileChannel;
023import java.nio.file.Path;
024import java.nio.file.StandardOpenOption;
025import java.util.Objects;
026
027import org.apache.commons.io.IOUtils;
028import org.apache.commons.io.build.AbstractStreamBuilder;
029
030/**
031 * {@link InputStream} implementation which uses direct buffer to read a file to avoid extra copy of data between Java and native memory which happens when
032 * using {@link java.io.BufferedInputStream}. Unfortunately, this is not something already available in JDK, {@code sun.nio.ch.ChannelInputStream} supports
033 * reading a file using NIO, but does not support buffering.
034 * <p>
035 * To build an instance, use {@link Builder}.
036 * </p>
037 * <p>
038 * This class was ported and adapted from Apache Spark commit 933dc6cb7b3de1d8ccaf73d124d6eb95b947ed19 where it was called {@code NioBufferedFileInputStream}.
039 * </p>
040 *
041 * @see Builder
042 * @since 2.9.0
043 */
044public final class BufferedFileChannelInputStream extends InputStream {
045
046    // @formatter:off
047    /**
048     * Builds a new {@link BufferedFileChannelInputStream}.
049     *
050     * <p>
051     * Using File IO:
052     * </p>
053     * <pre>{@code
054     * BufferedFileChannelInputStream s = BufferedFileChannelInputStream.builder()
055     *   .setFile(file)
056     *   .setBufferSize(4096)
057     *   .get();}
058     * </pre>
059     * <p>
060     * Using NIO Path:
061     * </p>
062     * <pre>{@code
063     * BufferedFileChannelInputStream s = BufferedFileChannelInputStream.builder()
064     *   .setPath(path)
065     *   .setBufferSize(4096)
066     *   .get();}
067     * </pre>
068     *
069     * @see #get()
070     * @since 2.12.0
071     */
072    // @formatter:on
073    public static class Builder extends AbstractStreamBuilder<BufferedFileChannelInputStream, Builder> {
074
075        /**
076         * Builds a new {@link BufferedFileChannelInputStream}.
077         * <p>
078         * You must set input that supports {@link #getInputStream()}, otherwise, this method throws an exception.
079         * </p>
080         * <p>
081         * This builder use the following aspects:
082         * </p>
083         * <ul>
084         * <li>{@link #getInputStream()}</li>
085         * <li>{@link #getBufferSize()}</li>
086         * </ul>
087         *
088         * @return a new instance.
089         * @throws IllegalStateException         if the {@code origin} is {@code null}.
090         * @throws UnsupportedOperationException if the origin cannot be converted to a {@link Path}.
091         * @throws IOException If an I/O error occurs
092         * @see #getPath()
093         * @see #getBufferSize()
094         */
095        @Override
096        public BufferedFileChannelInputStream get() throws IOException {
097            return new BufferedFileChannelInputStream(getPath(), getBufferSize());
098        }
099
100    }
101
102    /**
103     * Constructs a new {@link Builder}.
104     *
105     * @return a new {@link Builder}.
106     * @since 2.12.0
107     */
108    public static Builder builder() {
109        return new Builder();
110    }
111
112    private final ByteBuffer byteBuffer;
113
114    private final FileChannel fileChannel;
115
116    /**
117     * Constructs a new instance for the given File.
118     *
119     * @param file The file to stream.
120     * @throws IOException If an I/O error occurs
121     * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
122     */
123    @Deprecated
124    public BufferedFileChannelInputStream(final File file) throws IOException {
125        this(file, IOUtils.DEFAULT_BUFFER_SIZE);
126    }
127
128    /**
129     * Constructs a new instance for the given File and buffer size.
130     *
131     * @param file       The file to stream.
132     * @param bufferSize buffer size.
133     * @throws IOException If an I/O error occurs
134     * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
135     */
136    @Deprecated
137    public BufferedFileChannelInputStream(final File file, final int bufferSize) throws IOException {
138        this(file.toPath(), bufferSize);
139    }
140
141    /**
142     * Constructs a new instance for the given Path.
143     *
144     * @param path The path to stream.
145     * @throws IOException If an I/O error occurs
146     * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
147     */
148    @Deprecated
149    public BufferedFileChannelInputStream(final Path path) throws IOException {
150        this(path, IOUtils.DEFAULT_BUFFER_SIZE);
151    }
152
153    /**
154     * Constructs a new instance for the given Path and buffer size.
155     *
156     * @param path       The path to stream.
157     * @param bufferSize buffer size.
158     * @throws IOException If an I/O error occurs
159     * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
160     */
161    @Deprecated
162    public BufferedFileChannelInputStream(final Path path, final int bufferSize) throws IOException {
163        Objects.requireNonNull(path, "path");
164        fileChannel = FileChannel.open(path, StandardOpenOption.READ);
165        byteBuffer = ByteBuffer.allocateDirect(bufferSize);
166        byteBuffer.flip();
167    }
168
169    @Override
170    public synchronized int available() throws IOException {
171        return byteBuffer.remaining();
172    }
173
174    /**
175     * Attempts to clean up a ByteBuffer if it is direct or memory-mapped. This uses an *unsafe* Sun API that will cause errors if one attempts to read from the
176     * disposed buffer. However, neither the bytes allocated to direct buffers nor file descriptors opened for memory-mapped buffers put pressure on the garbage
177     * collector. Waiting for garbage collection may lead to the depletion of off-heap memory or huge numbers of open files. There's unfortunately no standard
178     * API to manually dispose of these kinds of buffers.
179     *
180     * @param buffer the buffer to clean.
181     */
182    private void clean(final ByteBuffer buffer) {
183        if (buffer.isDirect()) {
184            cleanDirectBuffer(buffer);
185        }
186    }
187
188    /**
189     * In Java 8, the type of {@code sun.nio.ch.DirectBuffer.cleaner()} was {@code sun.misc.Cleaner}, and it was possible to access the method
190     * {@code sun.misc.Cleaner.clean()} to invoke it. The type changed to {@code jdk.internal.ref.Cleaner} in later JDKs, and the {@code clean()} method is not
191     * accessible even with reflection. However {@code sun.misc.Unsafe} added an {@code invokeCleaner()} method in JDK 9+ and this is still accessible with
192     * reflection.
193     *
194     * @param buffer the buffer to clean. must be a DirectBuffer.
195     */
196    private void cleanDirectBuffer(final ByteBuffer buffer) {
197        if (ByteBufferCleaner.isSupported()) {
198            ByteBufferCleaner.clean(buffer);
199        }
200    }
201
202    @Override
203    public synchronized void close() throws IOException {
204        try {
205            fileChannel.close();
206        } finally {
207            clean(byteBuffer);
208        }
209    }
210
211    @Override
212    public synchronized int read() throws IOException {
213        if (!refill()) {
214            return EOF;
215        }
216        return byteBuffer.get() & 0xFF;
217    }
218
219    @Override
220    public synchronized int read(final byte[] b, final int offset, int len) throws IOException {
221        if (offset < 0 || len < 0 || offset + len < 0 || offset + len > b.length) {
222            throw new IndexOutOfBoundsException();
223        }
224        if (!refill()) {
225            return EOF;
226        }
227        len = Math.min(len, byteBuffer.remaining());
228        byteBuffer.get(b, offset, len);
229        return len;
230    }
231
232    /**
233     * Checks whether data is left to be read from the input stream.
234     *
235     * @return true if data is left, false otherwise
236     * @throws IOException if an I/O error occurs.
237     */
238    private boolean refill() throws IOException {
239        if (!byteBuffer.hasRemaining()) {
240            byteBuffer.clear();
241            int nRead = 0;
242            while (nRead == 0) {
243                nRead = fileChannel.read(byteBuffer);
244            }
245            byteBuffer.flip();
246            return nRead >= 0;
247        }
248        return true;
249    }
250
251    @Override
252    public synchronized long skip(final long n) throws IOException {
253        if (n <= 0L) {
254            return 0L;
255        }
256        if (byteBuffer.remaining() >= n) {
257            // The buffered content is enough to skip
258            byteBuffer.position(byteBuffer.position() + (int) n);
259            return n;
260        }
261        final long skippedFromBuffer = byteBuffer.remaining();
262        final long toSkipFromFileChannel = n - skippedFromBuffer;
263        // Discard everything we have read in the buffer.
264        byteBuffer.position(0);
265        byteBuffer.flip();
266        return skippedFromBuffer + skipFromFileChannel(toSkipFromFileChannel);
267    }
268
269    private long skipFromFileChannel(final long n) throws IOException {
270        final long currentFilePosition = fileChannel.position();
271        final long size = fileChannel.size();
272        if (n > size - currentFilePosition) {
273            fileChannel.position(size);
274            return size - currentFilePosition;
275        }
276        fileChannel.position(currentFilePosition + n);
277        return n;
278    }
279
280}