View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.commons.io.input;
18  
19  import static org.apache.commons.io.IOUtils.EOF;
20  
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.util.ArrayList;
24  import java.util.Arrays;
25  import java.util.List;
26  
27  import org.apache.commons.io.IOUtils;
28  import org.apache.commons.io.function.IOConsumer;
29  
30  /**
31   * The {@link ObservableInputStream} allows, that an InputStream may be consumed by other receivers, apart from the
32   * thread, which is reading it. The other consumers are implemented as instances of {@link Observer}.
33   * <p>
34   * A typical application may be the generation of a {@link java.security.MessageDigest} on the fly.
35   * </p>
36   * <p>
37   * <em>Note</em>: The {@link ObservableInputStream} is <em>not</em> thread safe, as instances of InputStream usually
38   * aren't. If you must access the stream from multiple threads, then synchronization, locking, or a similar means must
39   * be used.
40   * </p>
41   *
42   * @see MessageDigestInputStream
43   */
44  public class ObservableInputStream extends ProxyInputStream {
45  
46      /**
47       * For subclassing builders from {@link BoundedInputStream} subclassses.
48       *
49       * @param <T> The subclass.
50       * @since 2.18.0
51       */
52      public abstract static class AbstractBuilder<T extends AbstractBuilder<T>> extends ProxyInputStream.AbstractBuilder<ObservableInputStream, T> {
53  
54          private List<Observer> observers;
55  
56          /**
57           * Constructs a new instance for subclasses.
58           */
59          public AbstractBuilder() {
60              // empty
61          }
62  
63          /**
64           * Sets the list of observer callbacks.
65           *
66           * @param observers The list of observer callbacks.
67           */
68          public void setObservers(final List<Observer> observers) {
69              this.observers = observers;
70          }
71  
72      }
73  
74      /**
75       * Builds instances of {@link ObservableInputStream}.
76       *
77       * @since 2.18.0
78       */
79      public static class Builder extends AbstractBuilder<Builder> {
80  
81          /**
82           * Constructs a new builder of {@link ObservableInputStream}.
83           */
84          public Builder() {
85              // empty
86          }
87  
88          @Override
89          public ObservableInputStream get() throws IOException {
90              return new ObservableInputStream(this);
91          }
92  
93      }
94  
95      /**
96       * Abstracts observer callback for {@link ObservableInputStream}s.
97       */
98      public abstract static class Observer {
99  
100         /**
101          * Constructs a new instance for subclasses.
102          */
103         public Observer() {
104             // empty
105         }
106 
107         /**
108          * Called to indicate that the {@link ObservableInputStream} has been closed.
109          *
110          * @throws IOException if an I/O error occurs.
111          */
112         @SuppressWarnings("unused") // Possibly thrown from subclasses.
113         public void closed() throws IOException {
114             // noop
115         }
116 
117         /**
118          * Called to indicate that {@link InputStream#read(byte[])}, or {@link InputStream#read(byte[], int, int)} have
119          * been called, and are about to invoke data.
120          *
121          * @param buffer The byte array, which has been passed to the read call, and where data has been stored.
122          * @param offset The offset within the byte array, where data has been stored.
123          * @param length The number of bytes, which have been stored in the byte array.
124          * @throws IOException if an I/O error occurs.
125          */
126         @SuppressWarnings("unused") // Possibly thrown from subclasses.
127         public void data(final byte[] buffer, final int offset, final int length) throws IOException {
128             // noop
129         }
130 
131         /**
132          * Called to indicate, that {@link InputStream#read()} has been invoked on the {@link ObservableInputStream},
133          * and will return a value.
134          *
135          * @param value The value, which is being returned. This will never be -1 (EOF), because, in that case,
136          *        {@link #finished()} will be invoked instead.
137          * @throws IOException if an I/O error occurs.
138          */
139         @SuppressWarnings("unused") // Possibly thrown from subclasses.
140         public void data(final int value) throws IOException {
141             // noop
142         }
143 
144         /**
145          * Called to indicate that an error occurred on the underlying stream.
146          *
147          * @param exception the exception to throw
148          * @throws IOException if an I/O error occurs.
149          */
150         public void error(final IOException exception) throws IOException {
151             throw exception;
152         }
153 
154         /**
155          * Called to indicate that EOF has been seen on the underlying stream. This method may be called multiple times,
156          * if the reader keeps invoking either of the read methods, and they will consequently keep returning EOF.
157          *
158          * @throws IOException if an I/O error occurs.
159          */
160         @SuppressWarnings("unused") // Possibly thrown from subclasses.
161         public void finished() throws IOException {
162             // noop
163         }
164     }
165 
166     private final List<Observer> observers;
167 
168     ObservableInputStream(final AbstractBuilder builder) throws IOException {
169         super(builder);
170         this.observers = builder.observers;
171     }
172 
173     /**
174      * Constructs a new ObservableInputStream for the given InputStream.
175      *
176      * @param inputStream the input stream to observe.
177      */
178     public ObservableInputStream(final InputStream inputStream) {
179         this(inputStream, new ArrayList<>());
180     }
181 
182     /**
183      * Constructs a new ObservableInputStream for the given InputStream.
184      *
185      * @param inputStream the input stream to observe.
186      * @param observers List of observer callbacks.
187      */
188     private ObservableInputStream(final InputStream inputStream, final List<Observer> observers) {
189         super(inputStream);
190         this.observers = observers;
191     }
192 
193     /**
194      * Constructs a new ObservableInputStream for the given InputStream.
195      *
196      * @param inputStream the input stream to observe.
197      * @param observers List of observer callbacks.
198      * @since 2.9.0
199      */
200     public ObservableInputStream(final InputStream inputStream, final Observer... observers) {
201         this(inputStream, Arrays.asList(observers));
202     }
203 
204     /**
205      * Adds an Observer.
206      *
207      * @param observer the observer to add.
208      */
209     public void add(final Observer observer) {
210         observers.add(observer);
211     }
212 
213     @Override
214     public void close() throws IOException {
215         IOException ioe = null;
216         try {
217             super.close();
218         } catch (final IOException e) {
219             ioe = e;
220         }
221         if (ioe == null) {
222             noteClosed();
223         } else {
224             noteError(ioe);
225         }
226     }
227 
228     /**
229      * Reads all data from the underlying {@link InputStream}, while notifying the observers.
230      *
231      * @throws IOException The underlying {@link InputStream}, or either of the observers has thrown an exception.
232      */
233     public void consume() throws IOException {
234         IOUtils.consume(this);
235     }
236 
237     private void forEachObserver(final IOConsumer<Observer> action) throws IOException {
238         IOConsumer.forAll(action, observers);
239     }
240 
241     /**
242      * Gets a copy of currently registered observers.
243      *
244      * @return a copy of the list of currently registered observers.
245      * @since 2.9.0
246      */
247     public List<Observer> getObservers() {
248         return new ArrayList<>(observers);
249     }
250 
251     /**
252      * Notifies the observers by invoking {@link Observer#finished()}.
253      *
254      * @throws IOException Some observer has thrown an exception, which is being passed down.
255      */
256     protected void noteClosed() throws IOException {
257         forEachObserver(Observer::closed);
258     }
259 
260     /**
261      * Notifies the observers by invoking {@link Observer#data(int)} with the given arguments.
262      *
263      * @param value Passed to the observers.
264      * @throws IOException Some observer has thrown an exception, which is being passed down.
265      */
266     protected void noteDataByte(final int value) throws IOException {
267         forEachObserver(observer -> observer.data(value));
268     }
269 
270     /**
271      * Notifies the observers by invoking {@link Observer#data(byte[],int,int)} with the given arguments.
272      *
273      * @param buffer Passed to the observers.
274      * @param offset Passed to the observers.
275      * @param length Passed to the observers.
276      * @throws IOException Some observer has thrown an exception, which is being passed down.
277      */
278     protected void noteDataBytes(final byte[] buffer, final int offset, final int length) throws IOException {
279         forEachObserver(observer -> observer.data(buffer, offset, length));
280     }
281 
282     /**
283      * Notifies the observers by invoking {@link Observer#error(IOException)} with the given argument.
284      *
285      * @param exception Passed to the observers.
286      * @throws IOException Some observer has thrown an exception, which is being passed down. This may be the same
287      *         exception, which has been passed as an argument.
288      */
289     protected void noteError(final IOException exception) throws IOException {
290         forEachObserver(observer -> observer.error(exception));
291     }
292 
293     /**
294      * Notifies the observers by invoking {@link Observer#finished()}.
295      *
296      * @throws IOException Some observer has thrown an exception, which is being passed down.
297      */
298     protected void noteFinished() throws IOException {
299         forEachObserver(Observer::finished);
300     }
301 
302     private void notify(final byte[] buffer, final int offset, final int result, final IOException ioe) throws IOException {
303         if (ioe != null) {
304             noteError(ioe);
305             throw ioe;
306         }
307         if (result == EOF) {
308             noteFinished();
309         } else if (result > 0) {
310             noteDataBytes(buffer, offset, result);
311         }
312     }
313 
314     @Override
315     public int read() throws IOException {
316         int result = 0;
317         IOException ioe = null;
318         try {
319             result = super.read();
320         } catch (final IOException ex) {
321             ioe = ex;
322         }
323         if (ioe != null) {
324             noteError(ioe);
325             throw ioe;
326         }
327         if (result == EOF) {
328             noteFinished();
329         } else {
330             noteDataByte(result);
331         }
332         return result;
333     }
334 
335     @Override
336     public int read(final byte[] buffer) throws IOException {
337         int result = 0;
338         IOException ioe = null;
339         try {
340             result = super.read(buffer);
341         } catch (final IOException ex) {
342             ioe = ex;
343         }
344         notify(buffer, 0, result, ioe);
345         return result;
346     }
347 
348     @Override
349     public int read(final byte[] buffer, final int offset, final int length) throws IOException {
350         int result = 0;
351         IOException ioe = null;
352         try {
353             result = super.read(buffer, offset, length);
354         } catch (final IOException ex) {
355             ioe = ex;
356         }
357         notify(buffer, offset, result, ioe);
358         return result;
359     }
360 
361     /**
362      * Removes an Observer.
363      *
364      * @param observer the observer to remove
365      */
366     public void remove(final Observer observer) {
367         observers.remove(observer);
368     }
369 
370     /**
371      * Removes all Observers.
372      */
373     public void removeAllObservers() {
374         observers.clear();
375     }
376 
377 }