001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.commons.io.input;
018
019import static org.apache.commons.io.IOUtils.EOF;
020
021import java.io.IOException;
022import java.io.InputStream;
023import java.util.ArrayList;
024import java.util.Arrays;
025import java.util.List;
026
027import org.apache.commons.io.IOUtils;
028import org.apache.commons.io.function.IOConsumer;
029
030/**
031 * The {@link ObservableInputStream} allows, that an InputStream may be consumed by other receivers, apart from the
032 * thread, which is reading it. The other consumers are implemented as instances of {@link Observer}.
033 * <p>
034 * A typical application may be the generation of a {@link java.security.MessageDigest} on the fly.
035 * </p>
036 * <p>
037 * <em>Note</em>: The {@link ObservableInputStream} is <em>not</em> thread safe, as instances of InputStream usually
038 * aren't. If you must access the stream from multiple threads, then synchronization, locking, or a similar means must
039 * be used.
040 * </p>
041 *
042 * @see MessageDigestInputStream
043 */
044public class ObservableInputStream extends ProxyInputStream {
045
046    /**
047     * Abstracts observer callback for {@link ObservableInputStream}s.
048     */
049    public static abstract class Observer {
050
051        /**
052         * Called to indicate that the {@link ObservableInputStream} has been closed.
053         *
054         * @throws IOException if an I/O error occurs.
055         */
056        @SuppressWarnings("unused") // Possibly thrown from subclasses.
057        public void closed() throws IOException {
058            // noop
059        }
060
061        /**
062         * Called to indicate that {@link InputStream#read(byte[])}, or {@link InputStream#read(byte[], int, int)} have
063         * been called, and are about to invoke data.
064         *
065         * @param buffer The byte array, which has been passed to the read call, and where data has been stored.
066         * @param offset The offset within the byte array, where data has been stored.
067         * @param length The number of bytes, which have been stored in the byte array.
068         * @throws IOException if an I/O error occurs.
069         */
070        @SuppressWarnings("unused") // Possibly thrown from subclasses.
071        public void data(final byte[] buffer, final int offset, final int length) throws IOException {
072            // noop
073        }
074
075        /**
076         * Called to indicate, that {@link InputStream#read()} has been invoked on the {@link ObservableInputStream},
077         * and will return a value.
078         *
079         * @param value The value, which is being returned. This will never be -1 (EOF), because, in that case,
080         *        {@link #finished()} will be invoked instead.
081         * @throws IOException if an I/O error occurs.
082         */
083        @SuppressWarnings("unused") // Possibly thrown from subclasses.
084        public void data(final int value) throws IOException {
085            // noop
086        }
087
088        /**
089         * Called to indicate that an error occurred on the underlying stream.
090         *
091         * @param exception the exception to throw
092         * @throws IOException if an I/O error occurs.
093         */
094        public void error(final IOException exception) throws IOException {
095            throw exception;
096        }
097
098        /**
099         * Called to indicate that EOF has been seen on the underlying stream. This method may be called multiple times,
100         * if the reader keeps invoking either of the read methods, and they will consequently keep returning EOF.
101         *
102         * @throws IOException if an I/O error occurs.
103         */
104        @SuppressWarnings("unused") // Possibly thrown from subclasses.
105        public void finished() throws IOException {
106            // noop
107        }
108    }
109
110    private final List<Observer> observers;
111
112    /**
113     * Constructs a new ObservableInputStream for the given InputStream.
114     *
115     * @param inputStream the input stream to observe.
116     */
117    public ObservableInputStream(final InputStream inputStream) {
118        this(inputStream, new ArrayList<>());
119    }
120
121    /**
122     * Constructs a new ObservableInputStream for the given InputStream.
123     *
124     * @param inputStream the input stream to observe.
125     * @param observers List of observer callbacks.
126     */
127    private ObservableInputStream(final InputStream inputStream, final List<Observer> observers) {
128        super(inputStream);
129        this.observers = observers;
130    }
131
132    /**
133     * Constructs a new ObservableInputStream for the given InputStream.
134     *
135     * @param inputStream the input stream to observe.
136     * @param observers List of observer callbacks.
137     * @since 2.9.0
138     */
139    public ObservableInputStream(final InputStream inputStream, final Observer... observers) {
140        this(inputStream, Arrays.asList(observers));
141    }
142
143    /**
144     * Adds an Observer.
145     *
146     * @param observer the observer to add.
147     */
148    public void add(final Observer observer) {
149        observers.add(observer);
150    }
151
152    @Override
153    public void close() throws IOException {
154        IOException ioe = null;
155        try {
156            super.close();
157        } catch (final IOException e) {
158            ioe = e;
159        }
160        if (ioe == null) {
161            noteClosed();
162        } else {
163            noteError(ioe);
164        }
165    }
166
167    /**
168     * Reads all data from the underlying {@link InputStream}, while notifying the observers.
169     *
170     * @throws IOException The underlying {@link InputStream}, or either of the observers has thrown an exception.
171     */
172    public void consume() throws IOException {
173        IOUtils.consume(this);
174    }
175
176    private void forEachObserver(final IOConsumer<Observer> action) throws IOException {
177        IOConsumer.forAll(action, observers);
178    }
179
180    /**
181     * Gets a copy of currently registered observers.
182     *
183     * @return a copy of the list of currently registered observers.
184     * @since 2.9.0
185     */
186    public List<Observer> getObservers() {
187        return new ArrayList<>(observers);
188    }
189
190    /**
191     * Notifies the observers by invoking {@link Observer#finished()}.
192     *
193     * @throws IOException Some observer has thrown an exception, which is being passed down.
194     */
195    protected void noteClosed() throws IOException {
196        forEachObserver(Observer::closed);
197    }
198
199    /**
200     * Notifies the observers by invoking {@link Observer#data(int)} with the given arguments.
201     *
202     * @param value Passed to the observers.
203     * @throws IOException Some observer has thrown an exception, which is being passed down.
204     */
205    protected void noteDataByte(final int value) throws IOException {
206        forEachObserver(observer -> observer.data(value));
207    }
208
209    /**
210     * Notifies the observers by invoking {@link Observer#data(byte[],int,int)} with the given arguments.
211     *
212     * @param buffer Passed to the observers.
213     * @param offset Passed to the observers.
214     * @param length Passed to the observers.
215     * @throws IOException Some observer has thrown an exception, which is being passed down.
216     */
217    protected void noteDataBytes(final byte[] buffer, final int offset, final int length) throws IOException {
218        forEachObserver(observer -> observer.data(buffer, offset, length));
219    }
220
221    /**
222     * Notifies the observers by invoking {@link Observer#error(IOException)} with the given argument.
223     *
224     * @param exception Passed to the observers.
225     * @throws IOException Some observer has thrown an exception, which is being passed down. This may be the same
226     *         exception, which has been passed as an argument.
227     */
228    protected void noteError(final IOException exception) throws IOException {
229        forEachObserver(observer -> observer.error(exception));
230    }
231
232    /**
233     * Notifies the observers by invoking {@link Observer#finished()}.
234     *
235     * @throws IOException Some observer has thrown an exception, which is being passed down.
236     */
237    protected void noteFinished() throws IOException {
238        forEachObserver(Observer::finished);
239    }
240
241    private void notify(final byte[] buffer, final int offset, final int result, final IOException ioe) throws IOException {
242        if (ioe != null) {
243            noteError(ioe);
244            throw ioe;
245        }
246        if (result == EOF) {
247            noteFinished();
248        } else if (result > 0) {
249            noteDataBytes(buffer, offset, result);
250        }
251    }
252
253    @Override
254    public int read() throws IOException {
255        int result = 0;
256        IOException ioe = null;
257        try {
258            result = super.read();
259        } catch (final IOException ex) {
260            ioe = ex;
261        }
262        if (ioe != null) {
263            noteError(ioe);
264            throw ioe;
265        }
266        if (result == EOF) {
267            noteFinished();
268        } else {
269            noteDataByte(result);
270        }
271        return result;
272    }
273
274    @Override
275    public int read(final byte[] buffer) throws IOException {
276        int result = 0;
277        IOException ioe = null;
278        try {
279            result = super.read(buffer);
280        } catch (final IOException ex) {
281            ioe = ex;
282        }
283        notify(buffer, 0, result, ioe);
284        return result;
285    }
286
287    @Override
288    public int read(final byte[] buffer, final int offset, final int length) throws IOException {
289        int result = 0;
290        IOException ioe = null;
291        try {
292            result = super.read(buffer, offset, length);
293        } catch (final IOException ex) {
294            ioe = ex;
295        }
296        notify(buffer, offset, result, ioe);
297        return result;
298    }
299
300    /**
301     * Removes an Observer.
302     *
303     * @param observer the observer to remove
304     */
305    public void remove(final Observer observer) {
306        observers.remove(observer);
307    }
308
309    /**
310     * Removes all Observers.
311     */
312    public void removeAllObservers() {
313        observers.clear();
314    }
315
316}