ObservableInputStream.java

  1. /*
  2.  * Licensed to the Apache Software Foundation (ASF) under one or more
  3.  * contributor license agreements.  See the NOTICE file distributed with
  4.  * this work for additional information regarding copyright ownership.
  5.  * The ASF licenses this file to You under the Apache License, Version 2.0
  6.  * (the "License"); you may not use this file except in compliance with
  7.  * the License.  You may obtain a copy of the License at
  8.  *
  9.  *      http://www.apache.org/licenses/LICENSE-2.0
  10.  *
  11.  * Unless required by applicable law or agreed to in writing, software
  12.  * distributed under the License is distributed on an "AS IS" BASIS,
  13.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14.  * See the License for the specific language governing permissions and
  15.  * limitations under the License.
  16.  */
  17. package org.apache.commons.io.input;

  18. import static org.apache.commons.io.IOUtils.EOF;

  19. import java.io.IOException;
  20. import java.io.InputStream;
  21. import java.util.ArrayList;
  22. import java.util.Arrays;
  23. import java.util.List;

  24. import org.apache.commons.io.IOUtils;
  25. import org.apache.commons.io.function.IOConsumer;

  26. /**
  27.  * The {@link ObservableInputStream} allows, that an InputStream may be consumed by other receivers, apart from the
  28.  * thread, which is reading it. The other consumers are implemented as instances of {@link Observer}.
  29.  * <p>
  30.  * A typical application may be the generation of a {@link java.security.MessageDigest} on the fly.
  31.  * </p>
  32.  * <p>
  33.  * <em>Note</em>: The {@link ObservableInputStream} is <em>not</em> thread safe, as instances of InputStream usually
  34.  * aren't. If you must access the stream from multiple threads, then synchronization, locking, or a similar means must
  35.  * be used.
  36.  * </p>
  37.  *
  38.  * @see MessageDigestInputStream
  39.  */
  40. public class ObservableInputStream extends ProxyInputStream {

  41.     /**
  42.      * For subclassing builders from {@link BoundedInputStream} subclassses.
  43.      *
  44.      * @param <T> The subclass.
  45.      * @since 2.18.0
  46.      */
  47.     public abstract static class AbstractBuilder<T extends AbstractBuilder<T>> extends ProxyInputStream.AbstractBuilder<ObservableInputStream, T> {

  48.         private List<Observer> observers;

  49.         /**
  50.          * Constructs a new instance for subclasses.
  51.          */
  52.         public AbstractBuilder() {
  53.             // empty
  54.         }

  55.         /**
  56.          * Sets the list of observer callbacks.
  57.          *
  58.          * @param observers The list of observer callbacks.
  59.          */
  60.         public void setObservers(final List<Observer> observers) {
  61.             this.observers = observers;
  62.         }

  63.     }

  64.     /**
  65.      * Builds instances of {@link ObservableInputStream}.
  66.      *
  67.      * @since 2.18.0
  68.      */
  69.     public static class Builder extends AbstractBuilder<Builder> {

  70.         /**
  71.          * Constructs a new builder of {@link ObservableInputStream}.
  72.          */
  73.         public Builder() {
  74.             // empty
  75.         }

  76.         @Override
  77.         public ObservableInputStream get() throws IOException {
  78.             return new ObservableInputStream(this);
  79.         }

  80.     }

  81.     /**
  82.      * Abstracts observer callback for {@link ObservableInputStream}s.
  83.      */
  84.     public abstract static class Observer {

  85.         /**
  86.          * Constructs a new instance for subclasses.
  87.          */
  88.         public Observer() {
  89.             // empty
  90.         }

  91.         /**
  92.          * Called to indicate that the {@link ObservableInputStream} has been closed.
  93.          *
  94.          * @throws IOException if an I/O error occurs.
  95.          */
  96.         @SuppressWarnings("unused") // Possibly thrown from subclasses.
  97.         public void closed() throws IOException {
  98.             // noop
  99.         }

  100.         /**
  101.          * Called to indicate that {@link InputStream#read(byte[])}, or {@link InputStream#read(byte[], int, int)} have
  102.          * been called, and are about to invoke data.
  103.          *
  104.          * @param buffer The byte array, which has been passed to the read call, and where data has been stored.
  105.          * @param offset The offset within the byte array, where data has been stored.
  106.          * @param length The number of bytes, which have been stored in the byte array.
  107.          * @throws IOException if an I/O error occurs.
  108.          */
  109.         @SuppressWarnings("unused") // Possibly thrown from subclasses.
  110.         public void data(final byte[] buffer, final int offset, final int length) throws IOException {
  111.             // noop
  112.         }

  113.         /**
  114.          * Called to indicate, that {@link InputStream#read()} has been invoked on the {@link ObservableInputStream},
  115.          * and will return a value.
  116.          *
  117.          * @param value The value, which is being returned. This will never be -1 (EOF), because, in that case,
  118.          *        {@link #finished()} will be invoked instead.
  119.          * @throws IOException if an I/O error occurs.
  120.          */
  121.         @SuppressWarnings("unused") // Possibly thrown from subclasses.
  122.         public void data(final int value) throws IOException {
  123.             // noop
  124.         }

  125.         /**
  126.          * Called to indicate that an error occurred on the underlying stream.
  127.          *
  128.          * @param exception the exception to throw
  129.          * @throws IOException if an I/O error occurs.
  130.          */
  131.         public void error(final IOException exception) throws IOException {
  132.             throw exception;
  133.         }

  134.         /**
  135.          * Called to indicate that EOF has been seen on the underlying stream. This method may be called multiple times,
  136.          * if the reader keeps invoking either of the read methods, and they will consequently keep returning EOF.
  137.          *
  138.          * @throws IOException if an I/O error occurs.
  139.          */
  140.         @SuppressWarnings("unused") // Possibly thrown from subclasses.
  141.         public void finished() throws IOException {
  142.             // noop
  143.         }
  144.     }

  145.     private final List<Observer> observers;

  146.     ObservableInputStream(final AbstractBuilder builder) throws IOException {
  147.         super(builder);
  148.         this.observers = builder.observers;
  149.     }

  150.     /**
  151.      * Constructs a new ObservableInputStream for the given InputStream.
  152.      *
  153.      * @param inputStream the input stream to observe.
  154.      */
  155.     public ObservableInputStream(final InputStream inputStream) {
  156.         this(inputStream, new ArrayList<>());
  157.     }

  158.     /**
  159.      * Constructs a new ObservableInputStream for the given InputStream.
  160.      *
  161.      * @param inputStream the input stream to observe.
  162.      * @param observers List of observer callbacks.
  163.      */
  164.     private ObservableInputStream(final InputStream inputStream, final List<Observer> observers) {
  165.         super(inputStream);
  166.         this.observers = observers;
  167.     }

  168.     /**
  169.      * Constructs a new ObservableInputStream for the given InputStream.
  170.      *
  171.      * @param inputStream the input stream to observe.
  172.      * @param observers List of observer callbacks.
  173.      * @since 2.9.0
  174.      */
  175.     public ObservableInputStream(final InputStream inputStream, final Observer... observers) {
  176.         this(inputStream, Arrays.asList(observers));
  177.     }

  178.     /**
  179.      * Adds an Observer.
  180.      *
  181.      * @param observer the observer to add.
  182.      */
  183.     public void add(final Observer observer) {
  184.         observers.add(observer);
  185.     }

  186.     @Override
  187.     public void close() throws IOException {
  188.         IOException ioe = null;
  189.         try {
  190.             super.close();
  191.         } catch (final IOException e) {
  192.             ioe = e;
  193.         }
  194.         if (ioe == null) {
  195.             noteClosed();
  196.         } else {
  197.             noteError(ioe);
  198.         }
  199.     }

  200.     /**
  201.      * Reads all data from the underlying {@link InputStream}, while notifying the observers.
  202.      *
  203.      * @throws IOException The underlying {@link InputStream}, or either of the observers has thrown an exception.
  204.      */
  205.     public void consume() throws IOException {
  206.         IOUtils.consume(this);
  207.     }

  208.     private void forEachObserver(final IOConsumer<Observer> action) throws IOException {
  209.         IOConsumer.forAll(action, observers);
  210.     }

  211.     /**
  212.      * Gets a copy of currently registered observers.
  213.      *
  214.      * @return a copy of the list of currently registered observers.
  215.      * @since 2.9.0
  216.      */
  217.     public List<Observer> getObservers() {
  218.         return new ArrayList<>(observers);
  219.     }

  220.     /**
  221.      * Notifies the observers by invoking {@link Observer#finished()}.
  222.      *
  223.      * @throws IOException Some observer has thrown an exception, which is being passed down.
  224.      */
  225.     protected void noteClosed() throws IOException {
  226.         forEachObserver(Observer::closed);
  227.     }

  228.     /**
  229.      * Notifies the observers by invoking {@link Observer#data(int)} with the given arguments.
  230.      *
  231.      * @param value Passed to the observers.
  232.      * @throws IOException Some observer has thrown an exception, which is being passed down.
  233.      */
  234.     protected void noteDataByte(final int value) throws IOException {
  235.         forEachObserver(observer -> observer.data(value));
  236.     }

  237.     /**
  238.      * Notifies the observers by invoking {@link Observer#data(byte[],int,int)} with the given arguments.
  239.      *
  240.      * @param buffer Passed to the observers.
  241.      * @param offset Passed to the observers.
  242.      * @param length Passed to the observers.
  243.      * @throws IOException Some observer has thrown an exception, which is being passed down.
  244.      */
  245.     protected void noteDataBytes(final byte[] buffer, final int offset, final int length) throws IOException {
  246.         forEachObserver(observer -> observer.data(buffer, offset, length));
  247.     }

  248.     /**
  249.      * Notifies the observers by invoking {@link Observer#error(IOException)} with the given argument.
  250.      *
  251.      * @param exception Passed to the observers.
  252.      * @throws IOException Some observer has thrown an exception, which is being passed down. This may be the same
  253.      *         exception, which has been passed as an argument.
  254.      */
  255.     protected void noteError(final IOException exception) throws IOException {
  256.         forEachObserver(observer -> observer.error(exception));
  257.     }

  258.     /**
  259.      * Notifies the observers by invoking {@link Observer#finished()}.
  260.      *
  261.      * @throws IOException Some observer has thrown an exception, which is being passed down.
  262.      */
  263.     protected void noteFinished() throws IOException {
  264.         forEachObserver(Observer::finished);
  265.     }

  266.     private void notify(final byte[] buffer, final int offset, final int result, final IOException ioe) throws IOException {
  267.         if (ioe != null) {
  268.             noteError(ioe);
  269.             throw ioe;
  270.         }
  271.         if (result == EOF) {
  272.             noteFinished();
  273.         } else if (result > 0) {
  274.             noteDataBytes(buffer, offset, result);
  275.         }
  276.     }

  277.     @Override
  278.     public int read() throws IOException {
  279.         int result = 0;
  280.         IOException ioe = null;
  281.         try {
  282.             result = super.read();
  283.         } catch (final IOException ex) {
  284.             ioe = ex;
  285.         }
  286.         if (ioe != null) {
  287.             noteError(ioe);
  288.             throw ioe;
  289.         }
  290.         if (result == EOF) {
  291.             noteFinished();
  292.         } else {
  293.             noteDataByte(result);
  294.         }
  295.         return result;
  296.     }

  297.     @Override
  298.     public int read(final byte[] buffer) throws IOException {
  299.         int result = 0;
  300.         IOException ioe = null;
  301.         try {
  302.             result = super.read(buffer);
  303.         } catch (final IOException ex) {
  304.             ioe = ex;
  305.         }
  306.         notify(buffer, 0, result, ioe);
  307.         return result;
  308.     }

  309.     @Override
  310.     public int read(final byte[] buffer, final int offset, final int length) throws IOException {
  311.         int result = 0;
  312.         IOException ioe = null;
  313.         try {
  314.             result = super.read(buffer, offset, length);
  315.         } catch (final IOException ex) {
  316.             ioe = ex;
  317.         }
  318.         notify(buffer, offset, result, ioe);
  319.         return result;
  320.     }

  321.     /**
  322.      * Removes an Observer.
  323.      *
  324.      * @param observer the observer to remove
  325.      */
  326.     public void remove(final Observer observer) {
  327.         observers.remove(observer);
  328.     }

  329.     /**
  330.      * Removes all Observers.
  331.      */
  332.     public void removeAllObservers() {
  333.         observers.clear();
  334.     }

  335. }