ProxyInputStream.java

  1. /*
  2.  * Licensed to the Apache Software Foundation (ASF) under one or more
  3.  * contributor license agreements.  See the NOTICE file distributed with
  4.  * this work for additional information regarding copyright ownership.
  5.  * The ASF licenses this file to You under the Apache License, Version 2.0
  6.  * (the "License"); you may not use this file except in compliance with
  7.  * the License.  You may obtain a copy of the License at
  8.  *
  9.  *      http://www.apache.org/licenses/LICENSE-2.0
  10.  *
  11.  * Unless required by applicable law or agreed to in writing, software
  12.  * distributed under the License is distributed on an "AS IS" BASIS,
  13.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14.  * See the License for the specific language governing permissions and
  15.  * limitations under the License.
  16.  */
  17. package org.apache.commons.io.input;

  18. import static org.apache.commons.io.IOUtils.EOF;

  19. import java.io.FilterInputStream;
  20. import java.io.IOException;
  21. import java.io.InputStream;

  22. import org.apache.commons.io.IOUtils;
  23. import org.apache.commons.io.build.AbstractStreamBuilder;
  24. import org.apache.commons.io.function.Erase;
  25. import org.apache.commons.io.function.IOConsumer;
  26. import org.apache.commons.io.function.IOIntConsumer;

  27. /**
  28.  * A proxy stream which acts as a {@link FilterInputStream}, by passing all method calls on to the proxied stream, not changing which methods are called.
  29.  * <p>
  30.  * It is an alternative base class to {@link FilterInputStream} to increase reusability, because {@link FilterInputStream} changes the methods being called,
  31.  * such as read(byte[]) to read(byte[], int, int).
  32.  * </p>
  33.  * <p>
  34.  * In addition, this class allows you to:
  35.  * </p>
  36.  * <ul>
  37.  * <li>notify a subclass that <em>n</em> bytes are about to be read through {@link #beforeRead(int)}</li>
  38.  * <li>notify a subclass that <em>n</em> bytes were read through {@link #afterRead(int)}</li>
  39.  * <li>notify a subclass that an exception was caught through {@link #handleIOException(IOException)}</li>
  40.  * <li>{@link #unwrap()} itself</li>
  41.  * </ul>
  42.  */
  43. public abstract class ProxyInputStream extends FilterInputStream {

  44.     /**
  45.      * Abstracts builder properties for subclasses.
  46.      *
  47.      * @param <T> The InputStream type.
  48.      * @param <B> The builder type.
  49.      * @since 2.18.0
  50.      */
  51.     protected abstract static class AbstractBuilder<T, B extends AbstractStreamBuilder<T, B>> extends AbstractStreamBuilder<T, B> {

  52.         private IOIntConsumer afterRead;

  53.         /**
  54.          * Constructs a builder of {@code T}.
  55.          */
  56.         protected AbstractBuilder() {
  57.             // empty
  58.         }

  59.         /**
  60.          * Gets the {@link ProxyInputStream#afterRead(int)} consumer.
  61.          *
  62.          * @return the {@link ProxyInputStream#afterRead(int)} consumer.
  63.          */
  64.         public IOIntConsumer getAfterRead() {
  65.             return afterRead;
  66.         }

  67.         /**
  68.          * Sets the {@link ProxyInputStream#afterRead(int)} behavior, null resets to a NOOP.
  69.          * <p>
  70.          * Setting this value causes the {@link ProxyInputStream#afterRead(int) afterRead} method to delegate to the given consumer.
  71.          * </p>
  72.          * <p>
  73.          * If a subclass overrides {@link ProxyInputStream#afterRead(int) afterRead} and does not call {@code super.afterRead(int)}, then the given consumer is
  74.          * not called.
  75.          * </p>
  76.          * <p>
  77.          * This does <em>not</em> override a {@code ProxyInputStream} subclass' implementation of the {@link ProxyInputStream#afterRead(int)} method, it can
  78.          * supplement it.
  79.          * </p>
  80.          *
  81.          * @param afterRead the {@link ProxyInputStream#afterRead(int)} behavior.
  82.          * @return this instance.
  83.          */
  84.         public B setAfterRead(final IOIntConsumer afterRead) {
  85.             this.afterRead = afterRead;
  86.             return asThis();
  87.         }

  88.     }

  89.     /**
  90.      * Tracks whether {@link #close()} has been called or not.
  91.      */
  92.     private boolean closed;

  93.     /**
  94.      * Handles exceptions.
  95.      */
  96.     private final IOConsumer<IOException> exceptionHandler;

  97.     private final IOIntConsumer afterRead;

  98.     /**
  99.      * Constructs a new ProxyInputStream.
  100.      *
  101.      * @param builder  How to build an instance.
  102.      * @throws IOException if an I/O error occurs.
  103.      * @since 2.18.0
  104.      */
  105.     @SuppressWarnings("resource")
  106.     protected ProxyInputStream(final AbstractBuilder<?, ?> builder) throws IOException {
  107.         // the delegate is stored in a protected superclass instance variable named 'in'.
  108.         this(builder.getInputStream(), builder);
  109.     }

  110.     /**
  111.      * Constructs a new ProxyInputStream.
  112.      *
  113.      * @param proxy  the InputStream to proxy.
  114.      */
  115.     public ProxyInputStream(final InputStream proxy) {
  116.         // the delegate is stored in a protected superclass variable named 'in'.
  117.         super(proxy);
  118.         this.exceptionHandler = Erase::rethrow;
  119.         this.afterRead = IOIntConsumer.NOOP;
  120.     }

  121.     /**
  122.      * Constructs a new ProxyInputStream.
  123.      *
  124.      * @param proxy  the InputStream to proxy.
  125.      * @param builder  How to build an instance.
  126.      * @since 2.18.0
  127.      */
  128.     protected ProxyInputStream(final InputStream proxy, final AbstractBuilder<?, ?> builder) {
  129.         // the delegate is stored in a protected superclass instance variable named 'in'.
  130.         super(proxy);
  131.         this.exceptionHandler = Erase::rethrow;
  132.         this.afterRead = builder.getAfterRead() != null ? builder.getAfterRead() : IOIntConsumer.NOOP;
  133.     }

  134.     /**
  135.      * Called by the {@code read} methods after the proxied call has returned successfully. The argument is the number of bytes returned to the caller or
  136.      * {@link IOUtils#EOF EOF} if the end of stream was reached.
  137.      * <p>
  138.      * The default delegates to the consumer given to {@link AbstractBuilder#setAfterRead(IOIntConsumer)}.
  139.      * </p>
  140.      * <p>
  141.      * Alternatively, a subclasses can override this method to add post-processing functionality without having to override all the read methods.
  142.      * </p>
  143.      * <p>
  144.      * Note this method is <em>not</em> called from {@link #skip(long)} or {@link #reset()}. You need to explicitly override those methods if you want to add
  145.      * post-processing steps also to them.
  146.      * </p>
  147.      *
  148.      * @param n number of bytes read, or {@link IOUtils#EOF EOF} if the end of stream was reached.
  149.      * @throws IOException Thrown by a subclass or the consumer given to {@link AbstractBuilder#setAfterRead(IOIntConsumer)}.
  150.      * @since 2.0
  151.      */
  152.     protected void afterRead(final int n) throws IOException {
  153.         afterRead.accept(n);
  154.     }

  155.     /**
  156.      * Invokes the delegate's {@link InputStream#available()} method.
  157.      *
  158.      * @return the number of available bytes, 0 if the stream is closed.
  159.      * @throws IOException if an I/O error occurs.
  160.      */
  161.     @Override
  162.     public int available() throws IOException {
  163.         if (in != null && !isClosed()) {
  164.             try {
  165.                 return in.available();
  166.             } catch (final IOException e) {
  167.                 handleIOException(e);
  168.             }
  169.         }
  170.         return 0;
  171.     }

  172.     /**
  173.      * Invoked by the {@code read} methods before the call is proxied. The number
  174.      * of bytes that the caller wanted to read (1 for the {@link #read()}
  175.      * method, buffer length for {@link #read(byte[])}, etc.) is given as
  176.      * an argument.
  177.      * <p>
  178.      * Subclasses can override this method to add common pre-processing
  179.      * functionality without having to override all the read methods.
  180.      * The default implementation does nothing.
  181.      * </p>
  182.      * <p>
  183.      * Note this method is <em>not</em> called from {@link #skip(long)} or
  184.      * {@link #reset()}. You need to explicitly override those methods if
  185.      * you want to add pre-processing steps also to them.
  186.      * </p>
  187.      *
  188.      * @param n number of bytes that the caller asked to be read.
  189.      * @throws IOException if the pre-processing fails in a subclass.
  190.      * @since 2.0
  191.      */
  192.     @SuppressWarnings("unused") // Possibly thrown from subclasses.
  193.     protected void beforeRead(final int n) throws IOException {
  194.         // no-op default
  195.     }

  196.     /**
  197.      * Checks if this instance is closed and throws an IOException if so.
  198.      *
  199.      * @throws IOException if this instance is closed.
  200.      */
  201.     void checkOpen() throws IOException {
  202.         Input.checkOpen(!isClosed());
  203.     }

  204.     /**
  205.      * Invokes the delegate's {@link InputStream#close()} method.
  206.      *
  207.      * @throws IOException if an I/O error occurs.
  208.      */
  209.     @Override
  210.     public void close() throws IOException {
  211.         IOUtils.close(in, this::handleIOException);
  212.         closed = true;
  213.     }

  214.     /**
  215.      * Handles any IOExceptions thrown; by default, throws the given exception.
  216.      * <p>
  217.      * This method provides a point to implement custom exception
  218.      * handling. The default behavior is to re-throw the exception.
  219.      * </p>
  220.      *
  221.      * @param e The IOException thrown.
  222.      * @throws IOException if an I/O error occurs.
  223.      * @since 2.0
  224.      */
  225.     protected void handleIOException(final IOException e) throws IOException {
  226.         exceptionHandler.accept(e);
  227.     }

  228.     /**
  229.      * Tests whether this instance is closed.
  230.      *
  231.      * @return whether this instance is closed.
  232.      */
  233.     boolean isClosed() {
  234.         return closed;
  235.     }

  236.     /**
  237.      * Invokes the delegate's {@link InputStream#mark(int)} method.
  238.      *
  239.      * @param readLimit read ahead limit.
  240.      */
  241.     @Override
  242.     public synchronized void mark(final int readLimit) {
  243.         if (in != null) {
  244.             in.mark(readLimit);
  245.         }
  246.     }

  247.     /**
  248.      * Invokes the delegate's {@link InputStream#markSupported()} method.
  249.      *
  250.      * @return {@code true} if this stream instance supports the mark and reset methods; {@code false} otherwise.
  251.      * @see #mark(int)
  252.      * @see #reset()
  253.      */
  254.     @Override
  255.     public boolean markSupported() {
  256.         return in != null && in.markSupported();
  257.     }

  258.     /**
  259.      * Invokes the delegate's {@link InputStream#read()} method unless the stream is closed.
  260.      *
  261.      * @return the byte read or {@link IOUtils#EOF EOF} if we reached the end of stream.
  262.      * @throws IOException if an I/O error occurs.
  263.      */
  264.     @Override
  265.     public int read() throws IOException {
  266.         try {
  267.             beforeRead(1);
  268.             final int b = in.read();
  269.             afterRead(b != EOF ? 1 : EOF);
  270.             return b;
  271.         } catch (final IOException e) {
  272.             handleIOException(e);
  273.             return EOF;
  274.         }
  275.     }

  276.     /**
  277.      * Invokes the delegate's {@link InputStream#read(byte[])} method.
  278.      *
  279.      * @param b the buffer to read the bytes into.
  280.      * @return the number of bytes read or {@link IOUtils#EOF EOF} if we reached the end of stream.
  281.      * @throws IOException
  282.      *                     <ul>
  283.      *                     <li>If the first byte cannot be read for any reason other than the end of the file,
  284.      *                     <li>if the input stream has been closed, or</li>
  285.      *                     <li>if some other I/O error occurs.</li>
  286.      *                     </ul>
  287.      */
  288.     @Override
  289.     public int read(final byte[] b) throws IOException {
  290.         try {
  291.             beforeRead(IOUtils.length(b));
  292.             final int n = in.read(b);
  293.             afterRead(n);
  294.             return n;
  295.         } catch (final IOException e) {
  296.             handleIOException(e);
  297.             return EOF;
  298.         }
  299.     }

  300.     /**
  301.      * Invokes the delegate's {@link InputStream#read(byte[], int, int)} method.
  302.      *
  303.      * @param b   the buffer to read the bytes into.
  304.      * @param off The start offset.
  305.      * @param len The number of bytes to read.
  306.      * @return the number of bytes read or {@link IOUtils#EOF EOF} if we reached the end of stream.
  307.      * @throws IOException
  308.      *                     <ul>
  309.      *                     <li>If the first byte cannot be read for any reason other than the end of the file,
  310.      *                     <li>if the input stream has been closed, or</li>
  311.      *                     <li>if some other I/O error occurs.</li>
  312.      *                     </ul>
  313.      */
  314.     @Override
  315.     public int read(final byte[] b, final int off, final int len) throws IOException {
  316.         try {
  317.             beforeRead(len);
  318.             final int n = in.read(b, off, len);
  319.             afterRead(n);
  320.             return n;
  321.         } catch (final IOException e) {
  322.             handleIOException(e);
  323.             return EOF;
  324.         }
  325.     }

  326.     /**
  327.      * Invokes the delegate's {@link InputStream#reset()} method.
  328.      *
  329.      * @throws IOException if this stream has not been marked or if the mark has been invalidated.
  330.      */
  331.     @Override
  332.     public synchronized void reset() throws IOException {
  333.         try {
  334.             in.reset();
  335.         } catch (final IOException e) {
  336.             handleIOException(e);
  337.         }
  338.     }

  339.     /**
  340.      * Sets the underlying input stream.
  341.      *
  342.      * @param in The input stream to set in {@link java.io.FilterInputStream#in}.
  343.      * @return this instance.
  344.      * @since 2.19.0
  345.      */
  346.     public ProxyInputStream setReference(final InputStream in) {
  347.         this.in = in;
  348.         return this;
  349.     }

  350.     /**
  351.      * Invokes the delegate's {@link InputStream#skip(long)} method.
  352.      *
  353.      * @param n the number of bytes to skip.
  354.      * @return the actual number of bytes skipped.
  355.      * @throws IOException if the stream does not support seek, or if some other I/O error occurs.
  356.      */
  357.     @Override
  358.     public long skip(final long n) throws IOException {
  359.         try {
  360.             return in.skip(n);
  361.         } catch (final IOException e) {
  362.             handleIOException(e);
  363.             return 0;
  364.         }
  365.     }

  366.     /**
  367.      * Unwraps this instance by returning the underlying {@link InputStream}.
  368.      * <p>
  369.      * Use with caution; useful to query the underlying {@link InputStream}.
  370.      * </p>
  371.      *
  372.      * @return the underlying {@link InputStream}.
  373.      * @since 2.16.0
  374.      */
  375.     public InputStream unwrap() {
  376.         return in;
  377.     }

  378. }