1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 package org.apache.commons.io.input; 18 19 import static org.apache.commons.io.IOUtils.EOF; 20 21 import java.io.IOException; 22 import java.io.InputStream; 23 import java.util.ArrayList; 24 import java.util.Arrays; 25 import java.util.List; 26 27 import org.apache.commons.io.IOUtils; 28 import org.apache.commons.io.function.IOConsumer; 29 30 /** 31 * The {@link ObservableInputStream} allows, that an InputStream may be consumed by other receivers, apart from the 32 * thread, which is reading it. The other consumers are implemented as instances of {@link Observer}. 33 * <p> 34 * A typical application may be the generation of a {@link java.security.MessageDigest} on the fly. 35 * </p> 36 * <p> 37 * <em>Note</em>: The {@link ObservableInputStream} is <em>not</em> thread safe, as instances of InputStream usually 38 * aren't. If you must access the stream from multiple threads, then synchronization, locking, or a similar means must 39 * be used. 40 * </p> 41 * 42 * @see MessageDigestInputStream 43 */ 44 public class ObservableInputStream extends ProxyInputStream { 45 46 /** 47 * Abstracts observer callback for {@link ObservableInputStream}s. 48 */ 49 public static abstract class Observer { 50 51 /** 52 * Called to indicate that the {@link ObservableInputStream} has been closed. 53 * 54 * @throws IOException if an I/O error occurs. 55 */ 56 @SuppressWarnings("unused") // Possibly thrown from subclasses. 57 public void closed() throws IOException { 58 // noop 59 } 60 61 /** 62 * Called to indicate that {@link InputStream#read(byte[])}, or {@link InputStream#read(byte[], int, int)} have 63 * been called, and are about to invoke data. 64 * 65 * @param buffer The byte array, which has been passed to the read call, and where data has been stored. 66 * @param offset The offset within the byte array, where data has been stored. 67 * @param length The number of bytes, which have been stored in the byte array. 68 * @throws IOException if an I/O error occurs. 69 */ 70 @SuppressWarnings("unused") // Possibly thrown from subclasses. 71 public void data(final byte[] buffer, final int offset, final int length) throws IOException { 72 // noop 73 } 74 75 /** 76 * Called to indicate, that {@link InputStream#read()} has been invoked on the {@link ObservableInputStream}, 77 * and will return a value. 78 * 79 * @param value The value, which is being returned. This will never be -1 (EOF), because, in that case, 80 * {@link #finished()} will be invoked instead. 81 * @throws IOException if an I/O error occurs. 82 */ 83 @SuppressWarnings("unused") // Possibly thrown from subclasses. 84 public void data(final int value) throws IOException { 85 // noop 86 } 87 88 /** 89 * Called to indicate that an error occurred on the underlying stream. 90 * 91 * @param exception the exception to throw 92 * @throws IOException if an I/O error occurs. 93 */ 94 public void error(final IOException exception) throws IOException { 95 throw exception; 96 } 97 98 /** 99 * Called to indicate that EOF has been seen on the underlying stream. This method may be called multiple times, 100 * if the reader keeps invoking either of the read methods, and they will consequently keep returning EOF. 101 * 102 * @throws IOException if an I/O error occurs. 103 */ 104 @SuppressWarnings("unused") // Possibly thrown from subclasses. 105 public void finished() throws IOException { 106 // noop 107 } 108 } 109 110 private final List<Observer> observers; 111 112 /** 113 * Constructs a new ObservableInputStream for the given InputStream. 114 * 115 * @param inputStream the input stream to observe. 116 */ 117 public ObservableInputStream(final InputStream inputStream) { 118 this(inputStream, new ArrayList<>()); 119 } 120 121 /** 122 * Constructs a new ObservableInputStream for the given InputStream. 123 * 124 * @param inputStream the input stream to observe. 125 * @param observers List of observer callbacks. 126 */ 127 private ObservableInputStream(final InputStream inputStream, final List<Observer> observers) { 128 super(inputStream); 129 this.observers = observers; 130 } 131 132 /** 133 * Constructs a new ObservableInputStream for the given InputStream. 134 * 135 * @param inputStream the input stream to observe. 136 * @param observers List of observer callbacks. 137 * @since 2.9.0 138 */ 139 public ObservableInputStream(final InputStream inputStream, final Observer... observers) { 140 this(inputStream, Arrays.asList(observers)); 141 } 142 143 /** 144 * Adds an Observer. 145 * 146 * @param observer the observer to add. 147 */ 148 public void add(final Observer observer) { 149 observers.add(observer); 150 } 151 152 @Override 153 public void close() throws IOException { 154 IOException ioe = null; 155 try { 156 super.close(); 157 } catch (final IOException e) { 158 ioe = e; 159 } 160 if (ioe == null) { 161 noteClosed(); 162 } else { 163 noteError(ioe); 164 } 165 } 166 167 /** 168 * Reads all data from the underlying {@link InputStream}, while notifying the observers. 169 * 170 * @throws IOException The underlying {@link InputStream}, or either of the observers has thrown an exception. 171 */ 172 public void consume() throws IOException { 173 IOUtils.consume(this); 174 } 175 176 private void forEachObserver(final IOConsumer<Observer> action) throws IOException { 177 IOConsumer.forAll(action, observers); 178 } 179 180 /** 181 * Gets a copy of currently registered observers. 182 * 183 * @return a copy of the list of currently registered observers. 184 * @since 2.9.0 185 */ 186 public List<Observer> getObservers() { 187 return new ArrayList<>(observers); 188 } 189 190 /** 191 * Notifies the observers by invoking {@link Observer#finished()}. 192 * 193 * @throws IOException Some observer has thrown an exception, which is being passed down. 194 */ 195 protected void noteClosed() throws IOException { 196 forEachObserver(Observer::closed); 197 } 198 199 /** 200 * Notifies the observers by invoking {@link Observer#data(int)} with the given arguments. 201 * 202 * @param value Passed to the observers. 203 * @throws IOException Some observer has thrown an exception, which is being passed down. 204 */ 205 protected void noteDataByte(final int value) throws IOException { 206 forEachObserver(observer -> observer.data(value)); 207 } 208 209 /** 210 * Notifies the observers by invoking {@link Observer#data(byte[],int,int)} with the given arguments. 211 * 212 * @param buffer Passed to the observers. 213 * @param offset Passed to the observers. 214 * @param length Passed to the observers. 215 * @throws IOException Some observer has thrown an exception, which is being passed down. 216 */ 217 protected void noteDataBytes(final byte[] buffer, final int offset, final int length) throws IOException { 218 forEachObserver(observer -> observer.data(buffer, offset, length)); 219 } 220 221 /** 222 * Notifies the observers by invoking {@link Observer#error(IOException)} with the given argument. 223 * 224 * @param exception Passed to the observers. 225 * @throws IOException Some observer has thrown an exception, which is being passed down. This may be the same 226 * exception, which has been passed as an argument. 227 */ 228 protected void noteError(final IOException exception) throws IOException { 229 forEachObserver(observer -> observer.error(exception)); 230 } 231 232 /** 233 * Notifies the observers by invoking {@link Observer#finished()}. 234 * 235 * @throws IOException Some observer has thrown an exception, which is being passed down. 236 */ 237 protected void noteFinished() throws IOException { 238 forEachObserver(Observer::finished); 239 } 240 241 private void notify(final byte[] buffer, final int offset, final int result, final IOException ioe) throws IOException { 242 if (ioe != null) { 243 noteError(ioe); 244 throw ioe; 245 } 246 if (result == EOF) { 247 noteFinished(); 248 } else if (result > 0) { 249 noteDataBytes(buffer, offset, result); 250 } 251 } 252 253 @Override 254 public int read() throws IOException { 255 int result = 0; 256 IOException ioe = null; 257 try { 258 result = super.read(); 259 } catch (final IOException ex) { 260 ioe = ex; 261 } 262 if (ioe != null) { 263 noteError(ioe); 264 throw ioe; 265 } 266 if (result == EOF) { 267 noteFinished(); 268 } else { 269 noteDataByte(result); 270 } 271 return result; 272 } 273 274 @Override 275 public int read(final byte[] buffer) throws IOException { 276 int result = 0; 277 IOException ioe = null; 278 try { 279 result = super.read(buffer); 280 } catch (final IOException ex) { 281 ioe = ex; 282 } 283 notify(buffer, 0, result, ioe); 284 return result; 285 } 286 287 @Override 288 public int read(final byte[] buffer, final int offset, final int length) throws IOException { 289 int result = 0; 290 IOException ioe = null; 291 try { 292 result = super.read(buffer, offset, length); 293 } catch (final IOException ex) { 294 ioe = ex; 295 } 296 notify(buffer, offset, result, ioe); 297 return result; 298 } 299 300 /** 301 * Removes an Observer. 302 * 303 * @param observer the observer to remove 304 */ 305 public void remove(final Observer observer) { 306 observers.remove(observer); 307 } 308 309 /** 310 * Removes all Observers. 311 */ 312 public void removeAllObservers() { 313 observers.clear(); 314 } 315 316 }