View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.commons.io.input;
18  
19  import static org.apache.commons.io.IOUtils.EOF;
20  
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.util.ArrayList;
24  import java.util.Arrays;
25  import java.util.List;
26  
27  import org.apache.commons.io.IOUtils;
28  import org.apache.commons.io.function.IOConsumer;
29  
30  /**
31   * The {@link ObservableInputStream} allows, that an InputStream may be consumed by other receivers, apart from the
32   * thread, which is reading it. The other consumers are implemented as instances of {@link Observer}.
33   * <p>
34   * A typical application may be the generation of a {@link java.security.MessageDigest} on the fly.
35   * </p>
36   * <p>
37   * <em>Note</em>: The {@link ObservableInputStream} is <em>not</em> thread safe, as instances of InputStream usually
38   * aren't. If you must access the stream from multiple threads, then synchronization, locking, or a similar means must
39   * be used.
40   * </p>
41   *
42   * @see MessageDigestInputStream
43   */
44  public class ObservableInputStream extends ProxyInputStream {
45  
46      /**
47       * Abstracts observer callback for {@link ObservableInputStream}s.
48       */
49      public static abstract class Observer {
50  
51          /**
52           * Called to indicate that the {@link ObservableInputStream} has been closed.
53           *
54           * @throws IOException if an I/O error occurs.
55           */
56          @SuppressWarnings("unused") // Possibly thrown from subclasses.
57          public void closed() throws IOException {
58              // noop
59          }
60  
61          /**
62           * Called to indicate that {@link InputStream#read(byte[])}, or {@link InputStream#read(byte[], int, int)} have
63           * been called, and are about to invoke data.
64           *
65           * @param buffer The byte array, which has been passed to the read call, and where data has been stored.
66           * @param offset The offset within the byte array, where data has been stored.
67           * @param length The number of bytes, which have been stored in the byte array.
68           * @throws IOException if an I/O error occurs.
69           */
70          @SuppressWarnings("unused") // Possibly thrown from subclasses.
71          public void data(final byte[] buffer, final int offset, final int length) throws IOException {
72              // noop
73          }
74  
75          /**
76           * Called to indicate, that {@link InputStream#read()} has been invoked on the {@link ObservableInputStream},
77           * and will return a value.
78           *
79           * @param value The value, which is being returned. This will never be -1 (EOF), because, in that case,
80           *        {@link #finished()} will be invoked instead.
81           * @throws IOException if an I/O error occurs.
82           */
83          @SuppressWarnings("unused") // Possibly thrown from subclasses.
84          public void data(final int value) throws IOException {
85              // noop
86          }
87  
88          /**
89           * Called to indicate that an error occurred on the underlying stream.
90           *
91           * @param exception the exception to throw
92           * @throws IOException if an I/O error occurs.
93           */
94          public void error(final IOException exception) throws IOException {
95              throw exception;
96          }
97  
98          /**
99           * Called to indicate that EOF has been seen on the underlying stream. This method may be called multiple times,
100          * if the reader keeps invoking either of the read methods, and they will consequently keep returning EOF.
101          *
102          * @throws IOException if an I/O error occurs.
103          */
104         @SuppressWarnings("unused") // Possibly thrown from subclasses.
105         public void finished() throws IOException {
106             // noop
107         }
108     }
109 
110     private final List<Observer> observers;
111 
112     /**
113      * Constructs a new ObservableInputStream for the given InputStream.
114      *
115      * @param inputStream the input stream to observe.
116      */
117     public ObservableInputStream(final InputStream inputStream) {
118         this(inputStream, new ArrayList<>());
119     }
120 
121     /**
122      * Constructs a new ObservableInputStream for the given InputStream.
123      *
124      * @param inputStream the input stream to observe.
125      * @param observers List of observer callbacks.
126      */
127     private ObservableInputStream(final InputStream inputStream, final List<Observer> observers) {
128         super(inputStream);
129         this.observers = observers;
130     }
131 
132     /**
133      * Constructs a new ObservableInputStream for the given InputStream.
134      *
135      * @param inputStream the input stream to observe.
136      * @param observers List of observer callbacks.
137      * @since 2.9.0
138      */
139     public ObservableInputStream(final InputStream inputStream, final Observer... observers) {
140         this(inputStream, Arrays.asList(observers));
141     }
142 
143     /**
144      * Adds an Observer.
145      *
146      * @param observer the observer to add.
147      */
148     public void add(final Observer observer) {
149         observers.add(observer);
150     }
151 
152     @Override
153     public void close() throws IOException {
154         IOException ioe = null;
155         try {
156             super.close();
157         } catch (final IOException e) {
158             ioe = e;
159         }
160         if (ioe == null) {
161             noteClosed();
162         } else {
163             noteError(ioe);
164         }
165     }
166 
167     /**
168      * Reads all data from the underlying {@link InputStream}, while notifying the observers.
169      *
170      * @throws IOException The underlying {@link InputStream}, or either of the observers has thrown an exception.
171      */
172     public void consume() throws IOException {
173         IOUtils.consume(this);
174     }
175 
176     private void forEachObserver(final IOConsumer<Observer> action) throws IOException {
177         IOConsumer.forAll(action, observers);
178     }
179 
180     /**
181      * Gets a copy of currently registered observers.
182      *
183      * @return a copy of the list of currently registered observers.
184      * @since 2.9.0
185      */
186     public List<Observer> getObservers() {
187         return new ArrayList<>(observers);
188     }
189 
190     /**
191      * Notifies the observers by invoking {@link Observer#finished()}.
192      *
193      * @throws IOException Some observer has thrown an exception, which is being passed down.
194      */
195     protected void noteClosed() throws IOException {
196         forEachObserver(Observer::closed);
197     }
198 
199     /**
200      * Notifies the observers by invoking {@link Observer#data(int)} with the given arguments.
201      *
202      * @param value Passed to the observers.
203      * @throws IOException Some observer has thrown an exception, which is being passed down.
204      */
205     protected void noteDataByte(final int value) throws IOException {
206         forEachObserver(observer -> observer.data(value));
207     }
208 
209     /**
210      * Notifies the observers by invoking {@link Observer#data(byte[],int,int)} with the given arguments.
211      *
212      * @param buffer Passed to the observers.
213      * @param offset Passed to the observers.
214      * @param length Passed to the observers.
215      * @throws IOException Some observer has thrown an exception, which is being passed down.
216      */
217     protected void noteDataBytes(final byte[] buffer, final int offset, final int length) throws IOException {
218         forEachObserver(observer -> observer.data(buffer, offset, length));
219     }
220 
221     /**
222      * Notifies the observers by invoking {@link Observer#error(IOException)} with the given argument.
223      *
224      * @param exception Passed to the observers.
225      * @throws IOException Some observer has thrown an exception, which is being passed down. This may be the same
226      *         exception, which has been passed as an argument.
227      */
228     protected void noteError(final IOException exception) throws IOException {
229         forEachObserver(observer -> observer.error(exception));
230     }
231 
232     /**
233      * Notifies the observers by invoking {@link Observer#finished()}.
234      *
235      * @throws IOException Some observer has thrown an exception, which is being passed down.
236      */
237     protected void noteFinished() throws IOException {
238         forEachObserver(Observer::finished);
239     }
240 
241     private void notify(final byte[] buffer, final int offset, final int result, final IOException ioe) throws IOException {
242         if (ioe != null) {
243             noteError(ioe);
244             throw ioe;
245         }
246         if (result == EOF) {
247             noteFinished();
248         } else if (result > 0) {
249             noteDataBytes(buffer, offset, result);
250         }
251     }
252 
253     @Override
254     public int read() throws IOException {
255         int result = 0;
256         IOException ioe = null;
257         try {
258             result = super.read();
259         } catch (final IOException ex) {
260             ioe = ex;
261         }
262         if (ioe != null) {
263             noteError(ioe);
264             throw ioe;
265         }
266         if (result == EOF) {
267             noteFinished();
268         } else {
269             noteDataByte(result);
270         }
271         return result;
272     }
273 
274     @Override
275     public int read(final byte[] buffer) throws IOException {
276         int result = 0;
277         IOException ioe = null;
278         try {
279             result = super.read(buffer);
280         } catch (final IOException ex) {
281             ioe = ex;
282         }
283         notify(buffer, 0, result, ioe);
284         return result;
285     }
286 
287     @Override
288     public int read(final byte[] buffer, final int offset, final int length) throws IOException {
289         int result = 0;
290         IOException ioe = null;
291         try {
292             result = super.read(buffer, offset, length);
293         } catch (final IOException ex) {
294             ioe = ex;
295         }
296         notify(buffer, offset, result, ioe);
297         return result;
298     }
299 
300     /**
301      * Removes an Observer.
302      *
303      * @param observer the observer to remove
304      */
305     public void remove(final Observer observer) {
306         observers.remove(observer);
307     }
308 
309     /**
310      * Removes all Observers.
311      */
312     public void removeAllObservers() {
313         observers.clear();
314     }
315 
316 }