001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.commons.io.input;
018
019import static org.apache.commons.io.IOUtils.EOF;
020
021import java.io.IOException;
022import java.io.InputStream;
023import java.util.ArrayList;
024import java.util.Arrays;
025import java.util.List;
026
027import org.apache.commons.io.IOUtils;
028import org.apache.commons.io.function.IOConsumer;
029
030/**
031 * The {@link ObservableInputStream} allows, that an InputStream may be consumed by other receivers, apart from the
032 * thread, which is reading it. The other consumers are implemented as instances of {@link Observer}.
033 * <p>
034 * A typical application may be the generation of a {@link java.security.MessageDigest} on the fly.
035 * </p>
036 * <p>
037 * <em>Note</em>: The {@link ObservableInputStream} is <em>not</em> thread safe, as instances of InputStream usually
038 * aren't. If you must access the stream from multiple threads, then synchronization, locking, or a similar means must
039 * be used.
040 * </p>
041 *
042 * @see MessageDigestInputStream
043 */
044public class ObservableInputStream extends ProxyInputStream {
045
046    /**
047     * For subclassing builders from {@link BoundedInputStream} subclassses.
048     *
049     * @param <T> The subclass.
050     * @since 2.18.0
051     */
052    public static abstract class AbstractBuilder<T extends AbstractBuilder<T>> extends ProxyInputStream.AbstractBuilder<ObservableInputStream, T> {
053
054        private List<Observer> observers;
055
056        /**
057         * Sets the list of observer callbacks.
058         *
059         * @param observers The list of observer callbacks.
060         */
061        public void setObservers(final List<Observer> observers) {
062            this.observers = observers;
063        }
064
065    }
066
067
068    /**
069     * Builds instances of {@link ObservableInputStream}.
070     *
071     * @since 2.18.0
072     */
073    public static class Builder extends AbstractBuilder<Builder> {
074
075        @Override
076        public ObservableInputStream get() throws IOException {
077            return new ObservableInputStream(this);
078        }
079
080    }
081
082    /**
083     * Abstracts observer callback for {@link ObservableInputStream}s.
084     */
085    public static abstract class Observer {
086
087        /**
088         * Called to indicate that the {@link ObservableInputStream} has been closed.
089         *
090         * @throws IOException if an I/O error occurs.
091         */
092        @SuppressWarnings("unused") // Possibly thrown from subclasses.
093        public void closed() throws IOException {
094            // noop
095        }
096
097        /**
098         * Called to indicate that {@link InputStream#read(byte[])}, or {@link InputStream#read(byte[], int, int)} have
099         * been called, and are about to invoke data.
100         *
101         * @param buffer The byte array, which has been passed to the read call, and where data has been stored.
102         * @param offset The offset within the byte array, where data has been stored.
103         * @param length The number of bytes, which have been stored in the byte array.
104         * @throws IOException if an I/O error occurs.
105         */
106        @SuppressWarnings("unused") // Possibly thrown from subclasses.
107        public void data(final byte[] buffer, final int offset, final int length) throws IOException {
108            // noop
109        }
110
111        /**
112         * Called to indicate, that {@link InputStream#read()} has been invoked on the {@link ObservableInputStream},
113         * and will return a value.
114         *
115         * @param value The value, which is being returned. This will never be -1 (EOF), because, in that case,
116         *        {@link #finished()} will be invoked instead.
117         * @throws IOException if an I/O error occurs.
118         */
119        @SuppressWarnings("unused") // Possibly thrown from subclasses.
120        public void data(final int value) throws IOException {
121            // noop
122        }
123
124        /**
125         * Called to indicate that an error occurred on the underlying stream.
126         *
127         * @param exception the exception to throw
128         * @throws IOException if an I/O error occurs.
129         */
130        public void error(final IOException exception) throws IOException {
131            throw exception;
132        }
133
134        /**
135         * Called to indicate that EOF has been seen on the underlying stream. This method may be called multiple times,
136         * if the reader keeps invoking either of the read methods, and they will consequently keep returning EOF.
137         *
138         * @throws IOException if an I/O error occurs.
139         */
140        @SuppressWarnings("unused") // Possibly thrown from subclasses.
141        public void finished() throws IOException {
142            // noop
143        }
144    }
145
146    private final List<Observer> observers;
147
148    ObservableInputStream(final AbstractBuilder builder) throws IOException {
149        super(builder);
150        this.observers = builder.observers;
151    }
152
153    /**
154     * Constructs a new ObservableInputStream for the given InputStream.
155     *
156     * @param inputStream the input stream to observe.
157     */
158    public ObservableInputStream(final InputStream inputStream) {
159        this(inputStream, new ArrayList<>());
160    }
161
162    /**
163     * Constructs a new ObservableInputStream for the given InputStream.
164     *
165     * @param inputStream the input stream to observe.
166     * @param observers List of observer callbacks.
167     */
168    private ObservableInputStream(final InputStream inputStream, final List<Observer> observers) {
169        super(inputStream);
170        this.observers = observers;
171    }
172
173    /**
174     * Constructs a new ObservableInputStream for the given InputStream.
175     *
176     * @param inputStream the input stream to observe.
177     * @param observers List of observer callbacks.
178     * @since 2.9.0
179     */
180    public ObservableInputStream(final InputStream inputStream, final Observer... observers) {
181        this(inputStream, Arrays.asList(observers));
182    }
183
184    /**
185     * Adds an Observer.
186     *
187     * @param observer the observer to add.
188     */
189    public void add(final Observer observer) {
190        observers.add(observer);
191    }
192
193    @Override
194    public void close() throws IOException {
195        IOException ioe = null;
196        try {
197            super.close();
198        } catch (final IOException e) {
199            ioe = e;
200        }
201        if (ioe == null) {
202            noteClosed();
203        } else {
204            noteError(ioe);
205        }
206    }
207
208    /**
209     * Reads all data from the underlying {@link InputStream}, while notifying the observers.
210     *
211     * @throws IOException The underlying {@link InputStream}, or either of the observers has thrown an exception.
212     */
213    public void consume() throws IOException {
214        IOUtils.consume(this);
215    }
216
217    private void forEachObserver(final IOConsumer<Observer> action) throws IOException {
218        IOConsumer.forAll(action, observers);
219    }
220
221    /**
222     * Gets a copy of currently registered observers.
223     *
224     * @return a copy of the list of currently registered observers.
225     * @since 2.9.0
226     */
227    public List<Observer> getObservers() {
228        return new ArrayList<>(observers);
229    }
230
231    /**
232     * Notifies the observers by invoking {@link Observer#finished()}.
233     *
234     * @throws IOException Some observer has thrown an exception, which is being passed down.
235     */
236    protected void noteClosed() throws IOException {
237        forEachObserver(Observer::closed);
238    }
239
240    /**
241     * Notifies the observers by invoking {@link Observer#data(int)} with the given arguments.
242     *
243     * @param value Passed to the observers.
244     * @throws IOException Some observer has thrown an exception, which is being passed down.
245     */
246    protected void noteDataByte(final int value) throws IOException {
247        forEachObserver(observer -> observer.data(value));
248    }
249
250    /**
251     * Notifies the observers by invoking {@link Observer#data(byte[],int,int)} with the given arguments.
252     *
253     * @param buffer Passed to the observers.
254     * @param offset Passed to the observers.
255     * @param length Passed to the observers.
256     * @throws IOException Some observer has thrown an exception, which is being passed down.
257     */
258    protected void noteDataBytes(final byte[] buffer, final int offset, final int length) throws IOException {
259        forEachObserver(observer -> observer.data(buffer, offset, length));
260    }
261
262    /**
263     * Notifies the observers by invoking {@link Observer#error(IOException)} with the given argument.
264     *
265     * @param exception Passed to the observers.
266     * @throws IOException Some observer has thrown an exception, which is being passed down. This may be the same
267     *         exception, which has been passed as an argument.
268     */
269    protected void noteError(final IOException exception) throws IOException {
270        forEachObserver(observer -> observer.error(exception));
271    }
272
273    /**
274     * Notifies the observers by invoking {@link Observer#finished()}.
275     *
276     * @throws IOException Some observer has thrown an exception, which is being passed down.
277     */
278    protected void noteFinished() throws IOException {
279        forEachObserver(Observer::finished);
280    }
281
282    private void notify(final byte[] buffer, final int offset, final int result, final IOException ioe) throws IOException {
283        if (ioe != null) {
284            noteError(ioe);
285            throw ioe;
286        }
287        if (result == EOF) {
288            noteFinished();
289        } else if (result > 0) {
290            noteDataBytes(buffer, offset, result);
291        }
292    }
293
294    @Override
295    public int read() throws IOException {
296        int result = 0;
297        IOException ioe = null;
298        try {
299            result = super.read();
300        } catch (final IOException ex) {
301            ioe = ex;
302        }
303        if (ioe != null) {
304            noteError(ioe);
305            throw ioe;
306        }
307        if (result == EOF) {
308            noteFinished();
309        } else {
310            noteDataByte(result);
311        }
312        return result;
313    }
314
315    @Override
316    public int read(final byte[] buffer) throws IOException {
317        int result = 0;
318        IOException ioe = null;
319        try {
320            result = super.read(buffer);
321        } catch (final IOException ex) {
322            ioe = ex;
323        }
324        notify(buffer, 0, result, ioe);
325        return result;
326    }
327
328    @Override
329    public int read(final byte[] buffer, final int offset, final int length) throws IOException {
330        int result = 0;
331        IOException ioe = null;
332        try {
333            result = super.read(buffer, offset, length);
334        } catch (final IOException ex) {
335            ioe = ex;
336        }
337        notify(buffer, offset, result, ioe);
338        return result;
339    }
340
341    /**
342     * Removes an Observer.
343     *
344     * @param observer the observer to remove
345     */
346    public void remove(final Observer observer) {
347        observers.remove(observer);
348    }
349
350    /**
351     * Removes all Observers.
352     */
353    public void removeAllObservers() {
354        observers.clear();
355    }
356
357}