1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17 package org.apache.commons.io.input;
18
19 import static org.apache.commons.io.IOUtils.EOF;
20
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.List;
26
27 import org.apache.commons.io.IOUtils;
28 import org.apache.commons.io.function.IOConsumer;
29
30 /**
31 * The {@link ObservableInputStream} allows, that an InputStream may be consumed by other receivers, apart from the
32 * thread, which is reading it. The other consumers are implemented as instances of {@link Observer}.
33 * <p>
34 * A typical application may be the generation of a {@link java.security.MessageDigest} on the fly.
35 * </p>
36 * <p>
37 * <em>Note</em>: The {@link ObservableInputStream} is <em>not</em> thread safe, as instances of InputStream usually
38 * aren't. If you must access the stream from multiple threads, then synchronization, locking, or a similar means must
39 * be used.
40 * </p>
41 *
42 * @see MessageDigestInputStream
43 */
44 public class ObservableInputStream extends ProxyInputStream {
45
46 /**
47 * For subclassing builders from {@link BoundedInputStream} subclassses.
48 *
49 * @param <T> The subclass.
50 * @since 2.18.0
51 */
52 public abstract static class AbstractBuilder<T extends AbstractBuilder<T>> extends ProxyInputStream.AbstractBuilder<ObservableInputStream, T> {
53
54 private List<Observer> observers;
55
56 /**
57 * Constructs a new instance for subclasses.
58 */
59 public AbstractBuilder() {
60 // empty
61 }
62
63 /**
64 * Sets the list of observer callbacks.
65 *
66 * @param observers The list of observer callbacks.
67 */
68 public void setObservers(final List<Observer> observers) {
69 this.observers = observers;
70 }
71
72 }
73
74 /**
75 * Builds instances of {@link ObservableInputStream}.
76 *
77 * @since 2.18.0
78 */
79 public static class Builder extends AbstractBuilder<Builder> {
80
81 /**
82 * Constructs a new builder of {@link ObservableInputStream}.
83 */
84 public Builder() {
85 // empty
86 }
87
88 @Override
89 public ObservableInputStream get() throws IOException {
90 return new ObservableInputStream(this);
91 }
92
93 }
94
95 /**
96 * Abstracts observer callback for {@link ObservableInputStream}s.
97 */
98 public abstract static class Observer {
99
100 /**
101 * Constructs a new instance for subclasses.
102 */
103 public Observer() {
104 // empty
105 }
106
107 /**
108 * Called to indicate that the {@link ObservableInputStream} has been closed.
109 *
110 * @throws IOException if an I/O error occurs.
111 */
112 @SuppressWarnings("unused") // Possibly thrown from subclasses.
113 public void closed() throws IOException {
114 // noop
115 }
116
117 /**
118 * Called to indicate that {@link InputStream#read(byte[])}, or {@link InputStream#read(byte[], int, int)} have
119 * been called, and are about to invoke data.
120 *
121 * @param buffer The byte array, which has been passed to the read call, and where data has been stored.
122 * @param offset The offset within the byte array, where data has been stored.
123 * @param length The number of bytes, which have been stored in the byte array.
124 * @throws IOException if an I/O error occurs.
125 */
126 @SuppressWarnings("unused") // Possibly thrown from subclasses.
127 public void data(final byte[] buffer, final int offset, final int length) throws IOException {
128 // noop
129 }
130
131 /**
132 * Called to indicate, that {@link InputStream#read()} has been invoked on the {@link ObservableInputStream},
133 * and will return a value.
134 *
135 * @param value The value, which is being returned. This will never be -1 (EOF), because, in that case,
136 * {@link #finished()} will be invoked instead.
137 * @throws IOException if an I/O error occurs.
138 */
139 @SuppressWarnings("unused") // Possibly thrown from subclasses.
140 public void data(final int value) throws IOException {
141 // noop
142 }
143
144 /**
145 * Called to indicate that an error occurred on the underlying stream.
146 *
147 * @param exception the exception to throw
148 * @throws IOException if an I/O error occurs.
149 */
150 public void error(final IOException exception) throws IOException {
151 throw exception;
152 }
153
154 /**
155 * Called to indicate that EOF has been seen on the underlying stream. This method may be called multiple times,
156 * if the reader keeps invoking either of the read methods, and they will consequently keep returning EOF.
157 *
158 * @throws IOException if an I/O error occurs.
159 */
160 @SuppressWarnings("unused") // Possibly thrown from subclasses.
161 public void finished() throws IOException {
162 // noop
163 }
164 }
165
166 private final List<Observer> observers;
167
168 ObservableInputStream(final AbstractBuilder<?> builder) throws IOException {
169 super(builder);
170 this.observers = builder.observers;
171 }
172
173 /**
174 * Constructs a new ObservableInputStream for the given InputStream.
175 *
176 * @param inputStream the input stream to observe.
177 */
178 public ObservableInputStream(final InputStream inputStream) {
179 this(inputStream, new ArrayList<>());
180 }
181
182 /**
183 * Constructs a new ObservableInputStream for the given InputStream.
184 *
185 * @param inputStream the input stream to observe.
186 * @param observers List of observer callbacks.
187 */
188 private ObservableInputStream(final InputStream inputStream, final List<Observer> observers) {
189 super(inputStream);
190 this.observers = observers;
191 }
192
193 /**
194 * Constructs a new ObservableInputStream for the given InputStream.
195 *
196 * @param inputStream the input stream to observe.
197 * @param observers List of observer callbacks.
198 * @since 2.9.0
199 */
200 public ObservableInputStream(final InputStream inputStream, final Observer... observers) {
201 this(inputStream, Arrays.asList(observers));
202 }
203
204 /**
205 * Adds an Observer.
206 *
207 * @param observer the observer to add.
208 */
209 public void add(final Observer observer) {
210 observers.add(observer);
211 }
212
213 @Override
214 public void close() throws IOException {
215 IOException ioe = null;
216 try {
217 super.close();
218 } catch (final IOException e) {
219 ioe = e;
220 }
221 if (ioe == null) {
222 noteClosed();
223 } else {
224 noteError(ioe);
225 }
226 }
227
228 /**
229 * Reads all data from the underlying {@link InputStream}, while notifying the observers.
230 *
231 * @throws IOException The underlying {@link InputStream}, or either of the observers has thrown an exception.
232 */
233 public void consume() throws IOException {
234 IOUtils.consume(this);
235 }
236
237 private void forEachObserver(final IOConsumer<Observer> action) throws IOException {
238 IOConsumer.forAll(action, observers);
239 }
240
241 /**
242 * Gets a copy of currently registered observers.
243 *
244 * @return a copy of the list of currently registered observers.
245 * @since 2.9.0
246 */
247 public List<Observer> getObservers() {
248 return new ArrayList<>(observers);
249 }
250
251 /**
252 * Notifies the observers by invoking {@link Observer#finished()}.
253 *
254 * @throws IOException Some observer has thrown an exception, which is being passed down.
255 */
256 protected void noteClosed() throws IOException {
257 forEachObserver(Observer::closed);
258 }
259
260 /**
261 * Notifies the observers by invoking {@link Observer#data(int)} with the given arguments.
262 *
263 * @param value Passed to the observers.
264 * @throws IOException Some observer has thrown an exception, which is being passed down.
265 */
266 protected void noteDataByte(final int value) throws IOException {
267 forEachObserver(observer -> observer.data(value));
268 }
269
270 /**
271 * Notifies the observers by invoking {@link Observer#data(byte[],int,int)} with the given arguments.
272 *
273 * @param buffer Passed to the observers.
274 * @param offset Passed to the observers.
275 * @param length Passed to the observers.
276 * @throws IOException Some observer has thrown an exception, which is being passed down.
277 */
278 protected void noteDataBytes(final byte[] buffer, final int offset, final int length) throws IOException {
279 forEachObserver(observer -> observer.data(buffer, offset, length));
280 }
281
282 /**
283 * Notifies the observers by invoking {@link Observer#error(IOException)} with the given argument.
284 *
285 * @param exception Passed to the observers.
286 * @throws IOException Some observer has thrown an exception, which is being passed down. This may be the same
287 * exception, which has been passed as an argument.
288 */
289 protected void noteError(final IOException exception) throws IOException {
290 forEachObserver(observer -> observer.error(exception));
291 }
292
293 /**
294 * Notifies the observers by invoking {@link Observer#finished()}.
295 *
296 * @throws IOException Some observer has thrown an exception, which is being passed down.
297 */
298 protected void noteFinished() throws IOException {
299 forEachObserver(Observer::finished);
300 }
301
302 private void notify(final byte[] buffer, final int offset, final int result, final IOException ioe) throws IOException {
303 if (ioe != null) {
304 noteError(ioe);
305 throw ioe;
306 }
307 if (result == EOF) {
308 noteFinished();
309 } else if (result > 0) {
310 noteDataBytes(buffer, offset, result);
311 }
312 }
313
314 @Override
315 public int read() throws IOException {
316 int result = 0;
317 IOException ioe = null;
318 try {
319 result = super.read();
320 } catch (final IOException ex) {
321 ioe = ex;
322 }
323 if (ioe != null) {
324 noteError(ioe);
325 throw ioe;
326 }
327 if (result == EOF) {
328 noteFinished();
329 } else {
330 noteDataByte(result);
331 }
332 return result;
333 }
334
335 @Override
336 public int read(final byte[] buffer) throws IOException {
337 int result = 0;
338 IOException ioe = null;
339 try {
340 result = super.read(buffer);
341 } catch (final IOException ex) {
342 ioe = ex;
343 }
344 notify(buffer, 0, result, ioe);
345 return result;
346 }
347
348 @Override
349 public int read(final byte[] buffer, final int offset, final int length) throws IOException {
350 int result = 0;
351 IOException ioe = null;
352 try {
353 result = super.read(buffer, offset, length);
354 } catch (final IOException ex) {
355 ioe = ex;
356 }
357 notify(buffer, offset, result, ioe);
358 return result;
359 }
360
361 /**
362 * Removes an Observer.
363 *
364 * @param observer the observer to remove
365 */
366 public void remove(final Observer observer) {
367 observers.remove(observer);
368 }
369
370 /**
371 * Removes all Observers.
372 */
373 public void removeAllObservers() {
374 observers.clear();
375 }
376
377 }