BufferedFileChannelInputStream.java

  1. /*
  2.  * Licensed under the Apache License, Version 2.0 (the "License");
  3.  * you may not use this file except in compliance with the License.
  4.  * You may obtain a copy of the License at
  5.  *
  6.  *     http://www.apache.org/licenses/LICENSE-2.0
  7.  *
  8.  * Unless required by applicable law or agreed to in writing, software
  9.  * distributed under the License is distributed on an "AS IS" BASIS,
  10.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11.  * See the License for the specific language governing permissions and
  12.  * limitations under the License.
  13.  */
  14. package org.apache.commons.io.input;

  15. import static org.apache.commons.io.IOUtils.EOF;

  16. import java.io.BufferedInputStream;
  17. import java.io.File;
  18. import java.io.IOException;
  19. import java.io.InputStream;
  20. import java.nio.ByteBuffer;
  21. import java.nio.channels.FileChannel;
  22. import java.nio.file.Path;
  23. import java.nio.file.StandardOpenOption;
  24. import java.util.Objects;

  25. import org.apache.commons.io.IOUtils;
  26. import org.apache.commons.io.build.AbstractStreamBuilder;

  27. /**
  28.  * {@link InputStream} implementation which uses direct buffer to read a file to avoid extra copy of data between Java and native memory which happens when
  29.  * using {@link BufferedInputStream}. Unfortunately, this is not something already available in JDK, {@code sun.nio.ch.ChannelInputStream} supports
  30.  * reading a file using NIO, but does not support buffering.
  31.  * <p>
  32.  * To build an instance, use {@link Builder}.
  33.  * </p>
  34.  * <p>
  35.  * This class was ported and adapted from Apache Spark commit 933dc6cb7b3de1d8ccaf73d124d6eb95b947ed19 where it was called {@code NioBufferedFileInputStream}.
  36.  * </p>
  37.  *
  38.  * @see Builder
  39.  * @since 2.9.0
  40.  */
  41. public final class BufferedFileChannelInputStream extends InputStream {

  42.     // @formatter:off
  43.     /**
  44.      * Builds a new {@link BufferedFileChannelInputStream}.
  45.      *
  46.      * <p>
  47.      * Using File IO:
  48.      * </p>
  49.      * <pre>{@code
  50.      * BufferedFileChannelInputStream s = BufferedFileChannelInputStream.builder()
  51.      *   .setFile(file)
  52.      *   .setBufferSize(4096)
  53.      *   .get();}
  54.      * </pre>
  55.      * <p>
  56.      * Using NIO Path:
  57.      * </p>
  58.      * <pre>{@code
  59.      * BufferedFileChannelInputStream s = BufferedFileChannelInputStream.builder()
  60.      *   .setPath(path)
  61.      *   .setBufferSize(4096)
  62.      *   .get();}
  63.      * </pre>
  64.      *
  65.      * @see #get()
  66.      * @since 2.12.0
  67.      */
  68.     // @formatter:on
  69.     public static class Builder extends AbstractStreamBuilder<BufferedFileChannelInputStream, Builder> {

  70.         private FileChannel fileChannel;

  71.         /**
  72.          * Constructs a new builder of {@link BufferedFileChannelInputStream}.
  73.          */
  74.         public Builder() {
  75.             // empty
  76.         }

  77.         /**
  78.          * Builds a new {@link BufferedFileChannelInputStream}.
  79.          * <p>
  80.          * You must set an aspect that supports {@link #getInputStream()}, otherwise, this method throws an exception.
  81.          * </p>
  82.          * <p>
  83.          * This builder uses the following aspects:
  84.          * </p>
  85.          * <ul>
  86.          * <li>{@link FileChannel} takes precedence is set. </li>
  87.          * <li>{@link #getPath()} if the file channel is not set.</li>
  88.          * <li>{@link #getBufferSize()}</li>
  89.          * </ul>
  90.          *
  91.          * @return a new instance.
  92.          * @throws IllegalStateException         if the {@code origin} is {@code null}.
  93.          * @throws UnsupportedOperationException if the origin cannot be converted to a {@link Path}.
  94.          * @throws IOException                   if an I/O error occurs converting to an {@link Path} using {@link #getPath()}.
  95.          * @see #getPath()
  96.          * @see #getBufferSize()
  97.          * @see #getUnchecked()
  98.          */
  99.         @Override
  100.         public BufferedFileChannelInputStream get() throws IOException {
  101.             return fileChannel != null ? new BufferedFileChannelInputStream(fileChannel, getBufferSize())
  102.                     : new BufferedFileChannelInputStream(getPath(), getBufferSize());
  103.         }

  104.         /**
  105.          * Sets the file channel.
  106.          * <p>
  107.          * This setting takes precedence over all others.
  108.          * </p>
  109.          *
  110.          * @param fileChannel the file channel.
  111.          * @return this instance.
  112.          * @since 2.18.0
  113.          */
  114.         public Builder setFileChannel(final FileChannel fileChannel) {
  115.             this.fileChannel = fileChannel;
  116.             return this;
  117.         }

  118.     }

  119.     /**
  120.      * Constructs a new {@link Builder}.
  121.      *
  122.      * @return a new {@link Builder}.
  123.      * @since 2.12.0
  124.      */
  125.     public static Builder builder() {
  126.         return new Builder();
  127.     }

  128.     private final ByteBuffer byteBuffer;

  129.     private final FileChannel fileChannel;

  130.     /**
  131.      * Constructs a new instance for the given File.
  132.      *
  133.      * @param file The file to stream.
  134.      * @throws IOException If an I/O error occurs
  135.      * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
  136.      */
  137.     @Deprecated
  138.     public BufferedFileChannelInputStream(final File file) throws IOException {
  139.         this(file, IOUtils.DEFAULT_BUFFER_SIZE);
  140.     }

  141.     /**
  142.      * Constructs a new instance for the given File and buffer size.
  143.      *
  144.      * @param file       The file to stream.
  145.      * @param bufferSize buffer size.
  146.      * @throws IOException If an I/O error occurs
  147.      * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
  148.      */
  149.     @Deprecated
  150.     public BufferedFileChannelInputStream(final File file, final int bufferSize) throws IOException {
  151.         this(file.toPath(), bufferSize);
  152.     }

  153.     private BufferedFileChannelInputStream(final FileChannel fileChannel, final int bufferSize) {
  154.         this.fileChannel = Objects.requireNonNull(fileChannel, "path");
  155.         byteBuffer = ByteBuffer.allocateDirect(bufferSize);
  156.         byteBuffer.flip();
  157.     }

  158.     /**
  159.      * Constructs a new instance for the given Path.
  160.      *
  161.      * @param path The path to stream.
  162.      * @throws IOException If an I/O error occurs
  163.      * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
  164.      */
  165.     @Deprecated
  166.     public BufferedFileChannelInputStream(final Path path) throws IOException {
  167.         this(path, IOUtils.DEFAULT_BUFFER_SIZE);
  168.     }

  169.     /**
  170.      * Constructs a new instance for the given Path and buffer size.
  171.      *
  172.      * @param path       The path to stream.
  173.      * @param bufferSize buffer size.
  174.      * @throws IOException If an I/O error occurs
  175.      * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
  176.      */
  177.     @SuppressWarnings("resource")
  178.     @Deprecated
  179.     public BufferedFileChannelInputStream(final Path path, final int bufferSize) throws IOException {
  180.         this(FileChannel.open(path, StandardOpenOption.READ), bufferSize);
  181.     }

  182.     @Override
  183.     public synchronized int available() throws IOException {
  184.         if (!fileChannel.isOpen()) {
  185.             return 0;
  186.         }
  187.         if (!refill()) {
  188.             return 0;
  189.         }
  190.         return byteBuffer.remaining();
  191.     }

  192.     /**
  193.      * Attempts to clean up a ByteBuffer if it is direct or memory-mapped. This uses an *unsafe* Sun API that will cause errors if one attempts to read from the
  194.      * disposed buffer. However, neither the bytes allocated to direct buffers nor file descriptors opened for memory-mapped buffers put pressure on the garbage
  195.      * collector. Waiting for garbage collection may lead to the depletion of off-heap memory or huge numbers of open files. There's unfortunately no standard
  196.      * API to manually dispose of these kinds of buffers.
  197.      *
  198.      * @param buffer the buffer to clean.
  199.      */
  200.     private void clean(final ByteBuffer buffer) {
  201.         if (buffer.isDirect()) {
  202.             cleanDirectBuffer(buffer);
  203.         }
  204.     }

  205.     /**
  206.      * In Java 8, the type of {@code sun.nio.ch.DirectBuffer.cleaner()} was {@code sun.misc.Cleaner}, and it was possible to access the method
  207.      * {@code sun.misc.Cleaner.clean()} to invoke it. The type changed to {@code jdk.internal.ref.Cleaner} in later JDKs, and the {@code clean()} method is not
  208.      * accessible even with reflection. However {@code sun.misc.Unsafe} added an {@code invokeCleaner()} method in JDK 9+ and this is still accessible with
  209.      * reflection.
  210.      *
  211.      * @param buffer the buffer to clean. must be a DirectBuffer.
  212.      */
  213.     private void cleanDirectBuffer(final ByteBuffer buffer) {
  214.         if (ByteBufferCleaner.isSupported()) {
  215.             ByteBufferCleaner.clean(buffer);
  216.         }
  217.     }

  218.     @Override
  219.     public synchronized void close() throws IOException {
  220.         try {
  221.             fileChannel.close();
  222.         } finally {
  223.             clean(byteBuffer);
  224.         }
  225.     }

  226.     @Override
  227.     public synchronized int read() throws IOException {
  228.         if (!refill()) {
  229.             return EOF;
  230.         }
  231.         return byteBuffer.get() & 0xFF;
  232.     }

  233.     @Override
  234.     public synchronized int read(final byte[] b, final int offset, int len) throws IOException {
  235.         if (offset < 0 || len < 0 || offset + len < 0 || offset + len > b.length) {
  236.             throw new IndexOutOfBoundsException();
  237.         }
  238.         if (!refill()) {
  239.             return EOF;
  240.         }
  241.         len = Math.min(len, byteBuffer.remaining());
  242.         byteBuffer.get(b, offset, len);
  243.         return len;
  244.     }

  245.     /**
  246.      * Checks whether data is left to be read from the input stream.
  247.      *
  248.      * @return true if data is left, false otherwise
  249.      * @throws IOException if an I/O error occurs.
  250.      */
  251.     private boolean refill() throws IOException {
  252.         Input.checkOpen(fileChannel.isOpen());
  253.         if (!byteBuffer.hasRemaining()) {
  254.             byteBuffer.clear();
  255.             int nRead = 0;
  256.             while (nRead == 0) {
  257.                 nRead = fileChannel.read(byteBuffer);
  258.             }
  259.             byteBuffer.flip();
  260.             return nRead >= 0;
  261.         }
  262.         return true;
  263.     }

  264.     @Override
  265.     public synchronized long skip(final long n) throws IOException {
  266.         if (n <= 0L) {
  267.             return 0L;
  268.         }
  269.         if (byteBuffer.remaining() >= n) {
  270.             // The buffered content is enough to skip
  271.             byteBuffer.position(byteBuffer.position() + (int) n);
  272.             return n;
  273.         }
  274.         final long skippedFromBuffer = byteBuffer.remaining();
  275.         final long toSkipFromFileChannel = n - skippedFromBuffer;
  276.         // Discard everything we have read in the buffer.
  277.         byteBuffer.position(0);
  278.         byteBuffer.flip();
  279.         return skippedFromBuffer + skipFromFileChannel(toSkipFromFileChannel);
  280.     }

  281.     private long skipFromFileChannel(final long n) throws IOException {
  282.         final long currentFilePosition = fileChannel.position();
  283.         final long size = fileChannel.size();
  284.         if (n > size - currentFilePosition) {
  285.             fileChannel.position(size);
  286.             return size - currentFilePosition;
  287.         }
  288.         fileChannel.position(currentFilePosition + n);
  289.         return n;
  290.     }

  291. }