001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.commons.io.input;
018
019import java.io.IOException;
020import java.io.InputStream;
021import java.util.ArrayList;
022import java.util.List;
023
024import org.apache.commons.io.IOUtils;
025
026
027/**
028 * The {@link ObservableInputStream} allows, that an InputStream may be consumed
029 * by other receivers, apart from the thread, which is reading it.
030 * The other consumers are implemented as instances of {@link Observer}. A
031 * typical application may be the generation of a {@link java.security.MessageDigest} on the
032 * fly.
033 * {@code Note}: The {@link ObservableInputStream} is <em>not</em> thread safe,
034 * as instances of InputStream usually aren't.
035 * If you must access the stream from multiple threads, then synchronization, locking,
036 * or a similar means must be used.
037 * @see MessageDigestCalculatingInputStream
038 */
039public class ObservableInputStream extends ProxyInputStream {
040
041    /**
042     * Abstracts observer callback for {@code ObservableInputStream}s.
043     */
044    public static abstract class Observer {
045
046        /** Called to indicate, that {@link InputStream#read()} has been invoked
047         * on the {@link ObservableInputStream}, and will return a value.
048         * @param pByte The value, which is being returned. This will never be -1 (EOF),
049         *    because, in that case, {@link #finished()} will be invoked instead.
050         * @throws IOException if an i/o-error occurs
051         */
052        public void data(final int pByte) throws IOException {
053            // noop
054        }
055
056        /** Called to indicate, that {@link InputStream#read(byte[])}, or
057         * {@link InputStream#read(byte[], int, int)} have been called, and are about to
058         * invoke data.
059         * @param pBuffer The byte array, which has been passed to the read call, and where
060         *   data has been stored.
061         * @param pOffset The offset within the byte array, where data has been stored.
062         * @param pLength The number of bytes, which have been stored in the byte array.
063         * @throws IOException if an i/o-error occurs
064         */
065        public void data(final byte[] pBuffer, final int pOffset, final int pLength) throws IOException {
066            // noop
067        }
068
069        /** Called to indicate, that EOF has been seen on the underlying stream.
070         * This method may be called multiple times, if the reader keeps invoking
071         * either of the read methods, and they will consequently keep returning
072         * EOF.
073         * @throws IOException if an i/o-error occurs
074         */
075        public void finished() throws IOException {
076            // noop
077        }
078
079        /** Called to indicate, that the {@link ObservableInputStream} has been closed.
080         * @throws IOException if an i/o-error occurs
081         */
082        public void closed() throws IOException {
083            // noop
084        }
085
086        /**
087         * Called to indicate, that an error occurred on the underlying stream.
088         * @param pException the exception to throw
089         * @throws IOException if an i/o-error occurs
090         */
091        public void error(final IOException pException) throws IOException { throw pException; }
092    }
093
094    private final List<Observer> observers = new ArrayList<>();
095
096    /**
097     * Creates a new ObservableInputStream for the given InputStream.
098     * @param pProxy the input stream to proxy
099     */
100    public ObservableInputStream(final InputStream pProxy) {
101        super(pProxy);
102    }
103
104    /**
105     * Adds an Observer.
106     * @param pObserver the observer to add
107     */
108    public void add(final Observer pObserver) {
109        observers.add(pObserver);
110    }
111
112    /**
113     * Removes an Observer.
114     * @param pObserver the observer to remove
115     */
116    public void remove(final Observer pObserver) {
117        observers.remove(pObserver);
118    }
119
120    /**
121     * Removes all Observers.
122     */
123    public void removeAllObservers() {
124        observers.clear();
125    }
126
127    @Override
128    public int read() throws IOException {
129        int result = 0;
130        IOException ioe = null;
131        try {
132            result = super.read();
133        } catch (final IOException pException) {
134            ioe = pException;
135        }
136        if (ioe != null) {
137            noteError(ioe);
138        } else if (result == -1) {
139            noteFinished();
140        } else {
141            noteDataByte(result);
142        }
143        return result;
144    }
145
146    @Override
147    public int read(final byte[] pBuffer) throws IOException {
148        int result = 0;
149        IOException ioe = null;
150        try {
151            result = super.read(pBuffer);
152        } catch (final IOException pException) {
153            ioe = pException;
154        }
155        if (ioe != null) {
156            noteError(ioe);
157        } else if (result == -1) {
158            noteFinished();
159        } else if (result > 0) {
160            noteDataBytes(pBuffer, 0, result);
161        }
162        return result;
163    }
164
165    @Override
166    public int read(final byte[] pBuffer, final int pOffset, final int pLength) throws IOException {
167        int result = 0;
168        IOException ioe = null;
169        try {
170            result = super.read(pBuffer, pOffset, pLength);
171        } catch (final IOException pException) {
172            ioe = pException;
173        }
174        if (ioe != null) {
175            noteError(ioe);
176        } else if (result == -1) {
177            noteFinished();
178        } else if (result > 0) {
179            noteDataBytes(pBuffer, pOffset, result);
180        }
181        return result;
182    }
183
184    /** Notifies the observers by invoking {@link Observer#data(byte[],int,int)}
185     * with the given arguments.
186     * @param pBuffer Passed to the observers.
187     * @param pOffset Passed to the observers.
188     * @param pLength Passed to the observers.
189     * @throws IOException Some observer has thrown an exception, which is being
190     *   passed down.
191     */
192    protected void noteDataBytes(final byte[] pBuffer, final int pOffset, final int pLength) throws IOException {
193        for (final Observer observer : getObservers()) {
194            observer.data(pBuffer, pOffset, pLength);
195        }
196    }
197
198    /** Notifies the observers by invoking {@link Observer#finished()}.
199     * @throws IOException Some observer has thrown an exception, which is being
200     *   passed down.
201     */
202    protected void noteFinished() throws IOException {
203        for (final Observer observer : getObservers()) {
204            observer.finished();
205        }
206    }
207
208    /** Notifies the observers by invoking {@link Observer#data(int)}
209     * with the given arguments.
210     * @param pDataByte Passed to the observers.
211     * @throws IOException Some observer has thrown an exception, which is being
212     *   passed down.
213     */
214    protected void noteDataByte(final int pDataByte) throws IOException {
215        for (final Observer observer : getObservers()) {
216            observer.data(pDataByte);
217        }
218    }
219
220    /** Notifies the observers by invoking {@link Observer#error(IOException)}
221     * with the given argument.
222     * @param pException Passed to the observers.
223     * @throws IOException Some observer has thrown an exception, which is being
224     *   passed down. This may be the same exception, which has been passed as an
225     *   argument.
226     */
227    protected void noteError(final IOException pException) throws IOException {
228        for (final Observer observer : getObservers()) {
229            observer.error(pException);
230        }
231    }
232
233    /** Notifies the observers by invoking {@link Observer#finished()}.
234     * @throws IOException Some observer has thrown an exception, which is being
235     *   passed down.
236     */
237    protected void noteClosed() throws IOException {
238        for (final Observer observer : getObservers()) {
239            observer.closed();
240        }
241    }
242
243    /** Gets all currently registered observers.
244     * @return a list of the currently registered observers
245     */
246    protected List<Observer> getObservers() {
247        return observers;
248    }
249
250    @Override
251    public void close() throws IOException {
252        IOException ioe = null;
253        try {
254            super.close();
255        } catch (final IOException e) {
256            ioe = e;
257        }
258        if (ioe == null) {
259            noteClosed();
260        } else {
261            noteError(ioe);
262        }
263    }
264
265    /** Reads all data from the underlying {@link InputStream}, while notifying the
266     * observers.
267     * @throws IOException The underlying {@link InputStream}, or either of the
268     *   observers has thrown an exception.
269     */
270    public void consume() throws IOException {
271        final byte[] buffer = new byte[IOUtils.DEFAULT_BUFFER_SIZE];
272        for (;;) {
273            final int res = read(buffer);
274            if (res == -1) {
275                return;
276            }
277        }
278    }
279
280}