View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.commons.io.input;
18  
19  import static org.apache.commons.io.IOUtils.EOF;
20  
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.util.ArrayList;
24  import java.util.Arrays;
25  import java.util.List;
26  
27  import org.apache.commons.io.IOUtils;
28  
29  /**
30   * The {@link ObservableInputStream} allows, that an InputStream may be consumed by other receivers, apart from the
31   * thread, which is reading it. The other consumers are implemented as instances of {@link Observer}.
32   * <p>
33   * A typical application may be the generation of a {@link java.security.MessageDigest} on the fly.
34   * </p>
35   * <p>
36   * <em>Note</em>: The {@link ObservableInputStream} is <em>not</em> thread safe, as instances of InputStream usually
37   * aren't. If you must access the stream from multiple threads, then synchronization, locking, or a similar means must
38   * be used.
39   * </p>
40   *
41   * @see MessageDigestCalculatingInputStream
42   */
43  public class ObservableInputStream extends ProxyInputStream {
44  
45      /**
46       * Abstracts observer callback for {@code ObservableInputStream}s.
47       */
48      public static abstract class Observer {
49  
50          /**
51           * Called to indicate that the {@link ObservableInputStream} has been closed.
52           *
53           * @throws IOException if an I/O error occurs.
54           */
55          @SuppressWarnings("unused") // Possibly thrown from subclasses.
56          public void closed() throws IOException {
57              // noop
58          }
59  
60          /**
61           * Called to indicate that {@link InputStream#read(byte[])}, or {@link InputStream#read(byte[], int, int)} have
62           * been called, and are about to invoke data.
63           *
64           * @param buffer The byte array, which has been passed to the read call, and where data has been stored.
65           * @param offset The offset within the byte array, where data has been stored.
66           * @param length The number of bytes, which have been stored in the byte array.
67           * @throws IOException if an I/O error occurs.
68           */
69          @SuppressWarnings("unused") // Possibly thrown from subclasses.
70          public void data(final byte[] buffer, final int offset, final int length) throws IOException {
71              // noop
72          }
73  
74          /**
75           * Called to indicate, that {@link InputStream#read()} has been invoked on the {@link ObservableInputStream},
76           * and will return a value.
77           *
78           * @param value The value, which is being returned. This will never be -1 (EOF), because, in that case,
79           *        {@link #finished()} will be invoked instead.
80           * @throws IOException if an I/O error occurs.
81           */
82          @SuppressWarnings("unused") // Possibly thrown from subclasses.
83          public void data(final int value) throws IOException {
84              // noop
85          }
86  
87          /**
88           * Called to indicate that an error occurred on the underlying stream.
89           *
90           * @param exception the exception to throw
91           * @throws IOException if an I/O error occurs.
92           */
93          public void error(final IOException exception) throws IOException {
94              throw exception;
95          }
96  
97          /**
98           * Called to indicate that EOF has been seen on the underlying stream. This method may be called multiple times,
99           * if the reader keeps invoking either of the read methods, and they will consequently keep returning EOF.
100          *
101          * @throws IOException if an I/O error occurs.
102          */
103         @SuppressWarnings("unused") // Possibly thrown from subclasses.
104         public void finished() throws IOException {
105             // noop
106         }
107     }
108 
109     private final List<Observer> observers;
110 
111     /**
112      * Creates a new ObservableInputStream for the given InputStream.
113      *
114      * @param inputStream the input stream to observe.
115      */
116     public ObservableInputStream(final InputStream inputStream) {
117         this(inputStream, new ArrayList<>());
118     }
119 
120     /**
121      * Creates a new ObservableInputStream for the given InputStream.
122      *
123      * @param inputStream the input stream to observe.
124      * @param observers List of observer callbacks.
125      */
126     private ObservableInputStream(final InputStream inputStream, final List<Observer> observers) {
127         super(inputStream);
128         this.observers = observers;
129     }
130 
131     /**
132      * Creates a new ObservableInputStream for the given InputStream.
133      *
134      * @param inputStream the input stream to observe.
135      * @param observers List of observer callbacks.
136      * @since 2.9.0
137      */
138     public ObservableInputStream(final InputStream inputStream, final Observer... observers) {
139         this(inputStream, Arrays.asList(observers));
140     }
141 
142     /**
143      * Adds an Observer.
144      *
145      * @param observer the observer to add.
146      */
147     public void add(final Observer observer) {
148         observers.add(observer);
149     }
150 
151     @Override
152     public void close() throws IOException {
153         IOException ioe = null;
154         try {
155             super.close();
156         } catch (final IOException e) {
157             ioe = e;
158         }
159         if (ioe == null) {
160             noteClosed();
161         } else {
162             noteError(ioe);
163         }
164     }
165 
166     /**
167      * Reads all data from the underlying {@link InputStream}, while notifying the observers.
168      *
169      * @throws IOException The underlying {@link InputStream}, or either of the observers has thrown an exception.
170      */
171     public void consume() throws IOException {
172         final byte[] buffer = IOUtils.byteArray();
173         while (read(buffer) != EOF) {
174             // empty
175         }
176     }
177 
178     /**
179      * Gets all currently registered observers.
180      *
181      * @return a list of the currently registered observers.
182      * @since 2.9.0
183      */
184     public List<Observer> getObservers() {
185         return observers;
186     }
187 
188     /**
189      * Notifies the observers by invoking {@link Observer#finished()}.
190      *
191      * @throws IOException Some observer has thrown an exception, which is being passed down.
192      */
193     protected void noteClosed() throws IOException {
194         for (final Observer observer : getObservers()) {
195             observer.closed();
196         }
197     }
198 
199     /**
200      * Notifies the observers by invoking {@link Observer#data(int)} with the given arguments.
201      *
202      * @param value Passed to the observers.
203      * @throws IOException Some observer has thrown an exception, which is being passed down.
204      */
205     protected void noteDataByte(final int value) throws IOException {
206         for (final Observer observer : getObservers()) {
207             observer.data(value);
208         }
209     }
210 
211     /**
212      * Notifies the observers by invoking {@link Observer#data(byte[],int,int)} with the given arguments.
213      *
214      * @param buffer Passed to the observers.
215      * @param offset Passed to the observers.
216      * @param length Passed to the observers.
217      * @throws IOException Some observer has thrown an exception, which is being passed down.
218      */
219     protected void noteDataBytes(final byte[] buffer, final int offset, final int length) throws IOException {
220         for (final Observer observer : getObservers()) {
221             observer.data(buffer, offset, length);
222         }
223     }
224 
225     /**
226      * Notifies the observers by invoking {@link Observer#error(IOException)} with the given argument.
227      *
228      * @param exception Passed to the observers.
229      * @throws IOException Some observer has thrown an exception, which is being passed down. This may be the same
230      *         exception, which has been passed as an argument.
231      */
232     protected void noteError(final IOException exception) throws IOException {
233         for (final Observer observer : getObservers()) {
234             observer.error(exception);
235         }
236     }
237 
238     /**
239      * Notifies the observers by invoking {@link Observer#finished()}.
240      *
241      * @throws IOException Some observer has thrown an exception, which is being passed down.
242      */
243     protected void noteFinished() throws IOException {
244         for (final Observer observer : getObservers()) {
245             observer.finished();
246         }
247     }
248 
249     private void notify(final byte[] buffer, final int offset, final int result, final IOException ioe) throws IOException {
250         if (ioe != null) {
251             noteError(ioe);
252             throw ioe;
253         }
254         if (result == EOF) {
255             noteFinished();
256         } else if (result > 0) {
257             noteDataBytes(buffer, offset, result);
258         }
259     }
260 
261     @Override
262     public int read() throws IOException {
263         int result = 0;
264         IOException ioe = null;
265         try {
266             result = super.read();
267         } catch (final IOException ex) {
268             ioe = ex;
269         }
270         if (ioe != null) {
271             noteError(ioe);
272             throw ioe;
273         }
274         if (result == EOF) {
275             noteFinished();
276         } else {
277             noteDataByte(result);
278         }
279         return result;
280     }
281 
282     @Override
283     public int read(final byte[] buffer) throws IOException {
284         int result = 0;
285         IOException ioe = null;
286         try {
287             result = super.read(buffer);
288         } catch (final IOException ex) {
289             ioe = ex;
290         }
291         notify(buffer, 0, result, ioe);
292         return result;
293     }
294 
295     @Override
296     public int read(final byte[] buffer, final int offset, final int length) throws IOException {
297         int result = 0;
298         IOException ioe = null;
299         try {
300             result = super.read(buffer, offset, length);
301         } catch (final IOException ex) {
302             ioe = ex;
303         }
304         notify(buffer, offset, result, ioe);
305         return result;
306     }
307 
308     /**
309      * Removes an Observer.
310      *
311      * @param observer the observer to remove
312      */
313     public void remove(final Observer observer) {
314         observers.remove(observer);
315     }
316 
317     /**
318      * Removes all Observers.
319      */
320     public void removeAllObservers() {
321         observers.clear();
322     }
323 
324 }