001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.commons.io.input; 018 019import static org.apache.commons.io.IOUtils.EOF; 020 021import java.io.IOException; 022import java.io.InputStream; 023import java.util.ArrayList; 024import java.util.Arrays; 025import java.util.List; 026 027import org.apache.commons.io.IOUtils; 028import org.apache.commons.io.function.IOConsumer; 029 030/** 031 * The {@link ObservableInputStream} allows, that an InputStream may be consumed by other receivers, apart from the 032 * thread, which is reading it. The other consumers are implemented as instances of {@link Observer}. 033 * <p> 034 * A typical application may be the generation of a {@link java.security.MessageDigest} on the fly. 035 * </p> 036 * <p> 037 * <em>Note</em>: The {@link ObservableInputStream} is <em>not</em> thread safe, as instances of InputStream usually 038 * aren't. If you must access the stream from multiple threads, then synchronization, locking, or a similar means must 039 * be used. 040 * </p> 041 * 042 * @see MessageDigestInputStream 043 */ 044public class ObservableInputStream extends ProxyInputStream { 045 046 /** 047 * For subclassing builders from {@link BoundedInputStream} subclassses. 048 * 049 * @param <T> The subclass. 050 * @since 2.18.0 051 */ 052 public static abstract class AbstractBuilder<T extends AbstractBuilder<T>> extends ProxyInputStream.AbstractBuilder<ObservableInputStream, T> { 053 054 private List<Observer> observers; 055 056 /** 057 * Sets the list of observer callbacks. 058 * 059 * @param observers The list of observer callbacks. 060 */ 061 public void setObservers(final List<Observer> observers) { 062 this.observers = observers; 063 } 064 065 } 066 067 068 /** 069 * Builds instances of {@link ObservableInputStream}. 070 * 071 * @since 2.18.0 072 */ 073 public static class Builder extends AbstractBuilder<Builder> { 074 075 @Override 076 public ObservableInputStream get() throws IOException { 077 return new ObservableInputStream(this); 078 } 079 080 } 081 082 /** 083 * Abstracts observer callback for {@link ObservableInputStream}s. 084 */ 085 public static abstract class Observer { 086 087 /** 088 * Called to indicate that the {@link ObservableInputStream} has been closed. 089 * 090 * @throws IOException if an I/O error occurs. 091 */ 092 @SuppressWarnings("unused") // Possibly thrown from subclasses. 093 public void closed() throws IOException { 094 // noop 095 } 096 097 /** 098 * Called to indicate that {@link InputStream#read(byte[])}, or {@link InputStream#read(byte[], int, int)} have 099 * been called, and are about to invoke data. 100 * 101 * @param buffer The byte array, which has been passed to the read call, and where data has been stored. 102 * @param offset The offset within the byte array, where data has been stored. 103 * @param length The number of bytes, which have been stored in the byte array. 104 * @throws IOException if an I/O error occurs. 105 */ 106 @SuppressWarnings("unused") // Possibly thrown from subclasses. 107 public void data(final byte[] buffer, final int offset, final int length) throws IOException { 108 // noop 109 } 110 111 /** 112 * Called to indicate, that {@link InputStream#read()} has been invoked on the {@link ObservableInputStream}, 113 * and will return a value. 114 * 115 * @param value The value, which is being returned. This will never be -1 (EOF), because, in that case, 116 * {@link #finished()} will be invoked instead. 117 * @throws IOException if an I/O error occurs. 118 */ 119 @SuppressWarnings("unused") // Possibly thrown from subclasses. 120 public void data(final int value) throws IOException { 121 // noop 122 } 123 124 /** 125 * Called to indicate that an error occurred on the underlying stream. 126 * 127 * @param exception the exception to throw 128 * @throws IOException if an I/O error occurs. 129 */ 130 public void error(final IOException exception) throws IOException { 131 throw exception; 132 } 133 134 /** 135 * Called to indicate that EOF has been seen on the underlying stream. This method may be called multiple times, 136 * if the reader keeps invoking either of the read methods, and they will consequently keep returning EOF. 137 * 138 * @throws IOException if an I/O error occurs. 139 */ 140 @SuppressWarnings("unused") // Possibly thrown from subclasses. 141 public void finished() throws IOException { 142 // noop 143 } 144 } 145 146 private final List<Observer> observers; 147 148 ObservableInputStream(final AbstractBuilder builder) throws IOException { 149 super(builder); 150 this.observers = builder.observers; 151 } 152 153 /** 154 * Constructs a new ObservableInputStream for the given InputStream. 155 * 156 * @param inputStream the input stream to observe. 157 */ 158 public ObservableInputStream(final InputStream inputStream) { 159 this(inputStream, new ArrayList<>()); 160 } 161 162 /** 163 * Constructs a new ObservableInputStream for the given InputStream. 164 * 165 * @param inputStream the input stream to observe. 166 * @param observers List of observer callbacks. 167 */ 168 private ObservableInputStream(final InputStream inputStream, final List<Observer> observers) { 169 super(inputStream); 170 this.observers = observers; 171 } 172 173 /** 174 * Constructs a new ObservableInputStream for the given InputStream. 175 * 176 * @param inputStream the input stream to observe. 177 * @param observers List of observer callbacks. 178 * @since 2.9.0 179 */ 180 public ObservableInputStream(final InputStream inputStream, final Observer... observers) { 181 this(inputStream, Arrays.asList(observers)); 182 } 183 184 /** 185 * Adds an Observer. 186 * 187 * @param observer the observer to add. 188 */ 189 public void add(final Observer observer) { 190 observers.add(observer); 191 } 192 193 @Override 194 public void close() throws IOException { 195 IOException ioe = null; 196 try { 197 super.close(); 198 } catch (final IOException e) { 199 ioe = e; 200 } 201 if (ioe == null) { 202 noteClosed(); 203 } else { 204 noteError(ioe); 205 } 206 } 207 208 /** 209 * Reads all data from the underlying {@link InputStream}, while notifying the observers. 210 * 211 * @throws IOException The underlying {@link InputStream}, or either of the observers has thrown an exception. 212 */ 213 public void consume() throws IOException { 214 IOUtils.consume(this); 215 } 216 217 private void forEachObserver(final IOConsumer<Observer> action) throws IOException { 218 IOConsumer.forAll(action, observers); 219 } 220 221 /** 222 * Gets a copy of currently registered observers. 223 * 224 * @return a copy of the list of currently registered observers. 225 * @since 2.9.0 226 */ 227 public List<Observer> getObservers() { 228 return new ArrayList<>(observers); 229 } 230 231 /** 232 * Notifies the observers by invoking {@link Observer#finished()}. 233 * 234 * @throws IOException Some observer has thrown an exception, which is being passed down. 235 */ 236 protected void noteClosed() throws IOException { 237 forEachObserver(Observer::closed); 238 } 239 240 /** 241 * Notifies the observers by invoking {@link Observer#data(int)} with the given arguments. 242 * 243 * @param value Passed to the observers. 244 * @throws IOException Some observer has thrown an exception, which is being passed down. 245 */ 246 protected void noteDataByte(final int value) throws IOException { 247 forEachObserver(observer -> observer.data(value)); 248 } 249 250 /** 251 * Notifies the observers by invoking {@link Observer#data(byte[],int,int)} with the given arguments. 252 * 253 * @param buffer Passed to the observers. 254 * @param offset Passed to the observers. 255 * @param length Passed to the observers. 256 * @throws IOException Some observer has thrown an exception, which is being passed down. 257 */ 258 protected void noteDataBytes(final byte[] buffer, final int offset, final int length) throws IOException { 259 forEachObserver(observer -> observer.data(buffer, offset, length)); 260 } 261 262 /** 263 * Notifies the observers by invoking {@link Observer#error(IOException)} with the given argument. 264 * 265 * @param exception Passed to the observers. 266 * @throws IOException Some observer has thrown an exception, which is being passed down. This may be the same 267 * exception, which has been passed as an argument. 268 */ 269 protected void noteError(final IOException exception) throws IOException { 270 forEachObserver(observer -> observer.error(exception)); 271 } 272 273 /** 274 * Notifies the observers by invoking {@link Observer#finished()}. 275 * 276 * @throws IOException Some observer has thrown an exception, which is being passed down. 277 */ 278 protected void noteFinished() throws IOException { 279 forEachObserver(Observer::finished); 280 } 281 282 private void notify(final byte[] buffer, final int offset, final int result, final IOException ioe) throws IOException { 283 if (ioe != null) { 284 noteError(ioe); 285 throw ioe; 286 } 287 if (result == EOF) { 288 noteFinished(); 289 } else if (result > 0) { 290 noteDataBytes(buffer, offset, result); 291 } 292 } 293 294 @Override 295 public int read() throws IOException { 296 int result = 0; 297 IOException ioe = null; 298 try { 299 result = super.read(); 300 } catch (final IOException ex) { 301 ioe = ex; 302 } 303 if (ioe != null) { 304 noteError(ioe); 305 throw ioe; 306 } 307 if (result == EOF) { 308 noteFinished(); 309 } else { 310 noteDataByte(result); 311 } 312 return result; 313 } 314 315 @Override 316 public int read(final byte[] buffer) throws IOException { 317 int result = 0; 318 IOException ioe = null; 319 try { 320 result = super.read(buffer); 321 } catch (final IOException ex) { 322 ioe = ex; 323 } 324 notify(buffer, 0, result, ioe); 325 return result; 326 } 327 328 @Override 329 public int read(final byte[] buffer, final int offset, final int length) throws IOException { 330 int result = 0; 331 IOException ioe = null; 332 try { 333 result = super.read(buffer, offset, length); 334 } catch (final IOException ex) { 335 ioe = ex; 336 } 337 notify(buffer, offset, result, ioe); 338 return result; 339 } 340 341 /** 342 * Removes an Observer. 343 * 344 * @param observer the observer to remove 345 */ 346 public void remove(final Observer observer) { 347 observers.remove(observer); 348 } 349 350 /** 351 * Removes all Observers. 352 */ 353 public void removeAllObservers() { 354 observers.clear(); 355 } 356 357}