ObservableInputStream.java

  1. /*
  2.  * Licensed to the Apache Software Foundation (ASF) under one or more
  3.  * contributor license agreements.  See the NOTICE file distributed with
  4.  * this work for additional information regarding copyright ownership.
  5.  * The ASF licenses this file to You under the Apache License, Version 2.0
  6.  * (the "License"); you may not use this file except in compliance with
  7.  * the License.  You may obtain a copy of the License at
  8.  *
  9.  *      http://www.apache.org/licenses/LICENSE-2.0
  10.  *
  11.  * Unless required by applicable law or agreed to in writing, software
  12.  * distributed under the License is distributed on an "AS IS" BASIS,
  13.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14.  * See the License for the specific language governing permissions and
  15.  * limitations under the License.
  16.  */
  17. package org.apache.commons.io.input;

  18. import static org.apache.commons.io.IOUtils.EOF;

  19. import java.io.IOException;
  20. import java.io.InputStream;
  21. import java.util.ArrayList;
  22. import java.util.Arrays;
  23. import java.util.List;

  24. import org.apache.commons.io.IOUtils;
  25. import org.apache.commons.io.function.IOConsumer;

  26. /**
  27.  * The {@link ObservableInputStream} allows, that an InputStream may be consumed by other receivers, apart from the
  28.  * thread, which is reading it. The other consumers are implemented as instances of {@link Observer}.
  29.  * <p>
  30.  * A typical application may be the generation of a {@link java.security.MessageDigest} on the fly.
  31.  * </p>
  32.  * <p>
  33.  * <em>Note</em>: The {@link ObservableInputStream} is <em>not</em> thread safe, as instances of InputStream usually
  34.  * aren't. If you must access the stream from multiple threads, then synchronization, locking, or a similar means must
  35.  * be used.
  36.  * </p>
  37.  *
  38.  * @see MessageDigestInputStream
  39.  */
  40. public class ObservableInputStream extends ProxyInputStream {

  41.     /**
  42.      * For subclassing builders from {@link BoundedInputStream} subclassses.
  43.      *
  44.      * @param <T> The subclass.
  45.      * @since 2.18.0
  46.      */
  47.     public static abstract class AbstractBuilder<T extends AbstractBuilder<T>> extends ProxyInputStream.AbstractBuilder<ObservableInputStream, T> {

  48.         private List<Observer> observers;

  49.         /**
  50.          * Sets the list of observer callbacks.
  51.          *
  52.          * @param observers The list of observer callbacks.
  53.          */
  54.         public void setObservers(final List<Observer> observers) {
  55.             this.observers = observers;
  56.         }

  57.     }


  58.     /**
  59.      * Builds instances of {@link ObservableInputStream}.
  60.      *
  61.      * @since 2.18.0
  62.      */
  63.     public static class Builder extends AbstractBuilder<Builder> {

  64.         @Override
  65.         public ObservableInputStream get() throws IOException {
  66.             return new ObservableInputStream(this);
  67.         }

  68.     }

  69.     /**
  70.      * Abstracts observer callback for {@link ObservableInputStream}s.
  71.      */
  72.     public static abstract class Observer {

  73.         /**
  74.          * Called to indicate that the {@link ObservableInputStream} has been closed.
  75.          *
  76.          * @throws IOException if an I/O error occurs.
  77.          */
  78.         @SuppressWarnings("unused") // Possibly thrown from subclasses.
  79.         public void closed() throws IOException {
  80.             // noop
  81.         }

  82.         /**
  83.          * Called to indicate that {@link InputStream#read(byte[])}, or {@link InputStream#read(byte[], int, int)} have
  84.          * been called, and are about to invoke data.
  85.          *
  86.          * @param buffer The byte array, which has been passed to the read call, and where data has been stored.
  87.          * @param offset The offset within the byte array, where data has been stored.
  88.          * @param length The number of bytes, which have been stored in the byte array.
  89.          * @throws IOException if an I/O error occurs.
  90.          */
  91.         @SuppressWarnings("unused") // Possibly thrown from subclasses.
  92.         public void data(final byte[] buffer, final int offset, final int length) throws IOException {
  93.             // noop
  94.         }

  95.         /**
  96.          * Called to indicate, that {@link InputStream#read()} has been invoked on the {@link ObservableInputStream},
  97.          * and will return a value.
  98.          *
  99.          * @param value The value, which is being returned. This will never be -1 (EOF), because, in that case,
  100.          *        {@link #finished()} will be invoked instead.
  101.          * @throws IOException if an I/O error occurs.
  102.          */
  103.         @SuppressWarnings("unused") // Possibly thrown from subclasses.
  104.         public void data(final int value) throws IOException {
  105.             // noop
  106.         }

  107.         /**
  108.          * Called to indicate that an error occurred on the underlying stream.
  109.          *
  110.          * @param exception the exception to throw
  111.          * @throws IOException if an I/O error occurs.
  112.          */
  113.         public void error(final IOException exception) throws IOException {
  114.             throw exception;
  115.         }

  116.         /**
  117.          * Called to indicate that EOF has been seen on the underlying stream. This method may be called multiple times,
  118.          * if the reader keeps invoking either of the read methods, and they will consequently keep returning EOF.
  119.          *
  120.          * @throws IOException if an I/O error occurs.
  121.          */
  122.         @SuppressWarnings("unused") // Possibly thrown from subclasses.
  123.         public void finished() throws IOException {
  124.             // noop
  125.         }
  126.     }

  127.     private final List<Observer> observers;

  128.     ObservableInputStream(final AbstractBuilder builder) throws IOException {
  129.         super(builder);
  130.         this.observers = builder.observers;
  131.     }

  132.     /**
  133.      * Constructs a new ObservableInputStream for the given InputStream.
  134.      *
  135.      * @param inputStream the input stream to observe.
  136.      */
  137.     public ObservableInputStream(final InputStream inputStream) {
  138.         this(inputStream, new ArrayList<>());
  139.     }

  140.     /**
  141.      * Constructs a new ObservableInputStream for the given InputStream.
  142.      *
  143.      * @param inputStream the input stream to observe.
  144.      * @param observers List of observer callbacks.
  145.      */
  146.     private ObservableInputStream(final InputStream inputStream, final List<Observer> observers) {
  147.         super(inputStream);
  148.         this.observers = observers;
  149.     }

  150.     /**
  151.      * Constructs a new ObservableInputStream for the given InputStream.
  152.      *
  153.      * @param inputStream the input stream to observe.
  154.      * @param observers List of observer callbacks.
  155.      * @since 2.9.0
  156.      */
  157.     public ObservableInputStream(final InputStream inputStream, final Observer... observers) {
  158.         this(inputStream, Arrays.asList(observers));
  159.     }

  160.     /**
  161.      * Adds an Observer.
  162.      *
  163.      * @param observer the observer to add.
  164.      */
  165.     public void add(final Observer observer) {
  166.         observers.add(observer);
  167.     }

  168.     @Override
  169.     public void close() throws IOException {
  170.         IOException ioe = null;
  171.         try {
  172.             super.close();
  173.         } catch (final IOException e) {
  174.             ioe = e;
  175.         }
  176.         if (ioe == null) {
  177.             noteClosed();
  178.         } else {
  179.             noteError(ioe);
  180.         }
  181.     }

  182.     /**
  183.      * Reads all data from the underlying {@link InputStream}, while notifying the observers.
  184.      *
  185.      * @throws IOException The underlying {@link InputStream}, or either of the observers has thrown an exception.
  186.      */
  187.     public void consume() throws IOException {
  188.         IOUtils.consume(this);
  189.     }

  190.     private void forEachObserver(final IOConsumer<Observer> action) throws IOException {
  191.         IOConsumer.forAll(action, observers);
  192.     }

  193.     /**
  194.      * Gets a copy of currently registered observers.
  195.      *
  196.      * @return a copy of the list of currently registered observers.
  197.      * @since 2.9.0
  198.      */
  199.     public List<Observer> getObservers() {
  200.         return new ArrayList<>(observers);
  201.     }

  202.     /**
  203.      * Notifies the observers by invoking {@link Observer#finished()}.
  204.      *
  205.      * @throws IOException Some observer has thrown an exception, which is being passed down.
  206.      */
  207.     protected void noteClosed() throws IOException {
  208.         forEachObserver(Observer::closed);
  209.     }

  210.     /**
  211.      * Notifies the observers by invoking {@link Observer#data(int)} with the given arguments.
  212.      *
  213.      * @param value Passed to the observers.
  214.      * @throws IOException Some observer has thrown an exception, which is being passed down.
  215.      */
  216.     protected void noteDataByte(final int value) throws IOException {
  217.         forEachObserver(observer -> observer.data(value));
  218.     }

  219.     /**
  220.      * Notifies the observers by invoking {@link Observer#data(byte[],int,int)} with the given arguments.
  221.      *
  222.      * @param buffer Passed to the observers.
  223.      * @param offset Passed to the observers.
  224.      * @param length Passed to the observers.
  225.      * @throws IOException Some observer has thrown an exception, which is being passed down.
  226.      */
  227.     protected void noteDataBytes(final byte[] buffer, final int offset, final int length) throws IOException {
  228.         forEachObserver(observer -> observer.data(buffer, offset, length));
  229.     }

  230.     /**
  231.      * Notifies the observers by invoking {@link Observer#error(IOException)} with the given argument.
  232.      *
  233.      * @param exception Passed to the observers.
  234.      * @throws IOException Some observer has thrown an exception, which is being passed down. This may be the same
  235.      *         exception, which has been passed as an argument.
  236.      */
  237.     protected void noteError(final IOException exception) throws IOException {
  238.         forEachObserver(observer -> observer.error(exception));
  239.     }

  240.     /**
  241.      * Notifies the observers by invoking {@link Observer#finished()}.
  242.      *
  243.      * @throws IOException Some observer has thrown an exception, which is being passed down.
  244.      */
  245.     protected void noteFinished() throws IOException {
  246.         forEachObserver(Observer::finished);
  247.     }

  248.     private void notify(final byte[] buffer, final int offset, final int result, final IOException ioe) throws IOException {
  249.         if (ioe != null) {
  250.             noteError(ioe);
  251.             throw ioe;
  252.         }
  253.         if (result == EOF) {
  254.             noteFinished();
  255.         } else if (result > 0) {
  256.             noteDataBytes(buffer, offset, result);
  257.         }
  258.     }

  259.     @Override
  260.     public int read() throws IOException {
  261.         int result = 0;
  262.         IOException ioe = null;
  263.         try {
  264.             result = super.read();
  265.         } catch (final IOException ex) {
  266.             ioe = ex;
  267.         }
  268.         if (ioe != null) {
  269.             noteError(ioe);
  270.             throw ioe;
  271.         }
  272.         if (result == EOF) {
  273.             noteFinished();
  274.         } else {
  275.             noteDataByte(result);
  276.         }
  277.         return result;
  278.     }

  279.     @Override
  280.     public int read(final byte[] buffer) throws IOException {
  281.         int result = 0;
  282.         IOException ioe = null;
  283.         try {
  284.             result = super.read(buffer);
  285.         } catch (final IOException ex) {
  286.             ioe = ex;
  287.         }
  288.         notify(buffer, 0, result, ioe);
  289.         return result;
  290.     }

  291.     @Override
  292.     public int read(final byte[] buffer, final int offset, final int length) throws IOException {
  293.         int result = 0;
  294.         IOException ioe = null;
  295.         try {
  296.             result = super.read(buffer, offset, length);
  297.         } catch (final IOException ex) {
  298.             ioe = ex;
  299.         }
  300.         notify(buffer, offset, result, ioe);
  301.         return result;
  302.     }

  303.     /**
  304.      * Removes an Observer.
  305.      *
  306.      * @param observer the observer to remove
  307.      */
  308.     public void remove(final Observer observer) {
  309.         observers.remove(observer);
  310.     }

  311.     /**
  312.      * Removes all Observers.
  313.      */
  314.     public void removeAllObservers() {
  315.         observers.clear();
  316.     }

  317. }