001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.commons.io.input; 018 019import java.io.IOException; 020import java.io.InputStream; 021import java.util.ArrayList; 022import java.util.List; 023 024import org.apache.commons.io.IOUtils; 025 026 027/** 028 * The {@link ObservableInputStream} allows, that an InputStream may be consumed 029 * by other receivers, apart from the thread, which is reading it. 030 * The other consumers are implemented as instances of {@link Observer}. A 031 * typical application may be the generation of a {@link java.security.MessageDigest} on the 032 * fly. 033 * {@code Note}: The {@link ObservableInputStream} is <em>not</em> thread safe, 034 * as instances of InputStream usually aren't. 035 * If you must access the stream from multiple threads, then synchronization, locking, 036 * or a similar means must be used. 037 * @see MessageDigestCalculatingInputStream 038 */ 039public class ObservableInputStream extends ProxyInputStream { 040 041 /** 042 * Abstracts observer callback for {@code ObservableInputStream}s. 043 */ 044 public static abstract class Observer { 045 046 /** Called to indicate, that {@link InputStream#read()} has been invoked 047 * on the {@link ObservableInputStream}, and will return a value. 048 * @param pByte The value, which is being returned. This will never be -1 (EOF), 049 * because, in that case, {@link #finished()} will be invoked instead. 050 * @throws IOException if an i/o-error occurs 051 */ 052 public void data(final int pByte) throws IOException { 053 // noop 054 } 055 056 /** Called to indicate, that {@link InputStream#read(byte[])}, or 057 * {@link InputStream#read(byte[], int, int)} have been called, and are about to 058 * invoke data. 059 * @param pBuffer The byte array, which has been passed to the read call, and where 060 * data has been stored. 061 * @param pOffset The offset within the byte array, where data has been stored. 062 * @param pLength The number of bytes, which have been stored in the byte array. 063 * @throws IOException if an i/o-error occurs 064 */ 065 public void data(final byte[] pBuffer, final int pOffset, final int pLength) throws IOException { 066 // noop 067 } 068 069 /** Called to indicate, that EOF has been seen on the underlying stream. 070 * This method may be called multiple times, if the reader keeps invoking 071 * either of the read methods, and they will consequently keep returning 072 * EOF. 073 * @throws IOException if an i/o-error occurs 074 */ 075 public void finished() throws IOException { 076 // noop 077 } 078 079 /** Called to indicate, that the {@link ObservableInputStream} has been closed. 080 * @throws IOException if an i/o-error occurs 081 */ 082 public void closed() throws IOException { 083 // noop 084 } 085 086 /** 087 * Called to indicate, that an error occurred on the underlying stream. 088 * @param pException the exception to throw 089 * @throws IOException if an i/o-error occurs 090 */ 091 public void error(final IOException pException) throws IOException { throw pException; } 092 } 093 094 private final List<Observer> observers = new ArrayList<>(); 095 096 /** 097 * Creates a new ObservableInputStream for the given InputStream. 098 * @param pProxy the input stream to proxy 099 */ 100 public ObservableInputStream(final InputStream pProxy) { 101 super(pProxy); 102 } 103 104 /** 105 * Adds an Observer. 106 * @param pObserver the observer to add 107 */ 108 public void add(final Observer pObserver) { 109 observers.add(pObserver); 110 } 111 112 /** 113 * Removes an Observer. 114 * @param pObserver the observer to remove 115 */ 116 public void remove(final Observer pObserver) { 117 observers.remove(pObserver); 118 } 119 120 /** 121 * Removes all Observers. 122 */ 123 public void removeAllObservers() { 124 observers.clear(); 125 } 126 127 @Override 128 public int read() throws IOException { 129 int result = 0; 130 IOException ioe = null; 131 try { 132 result = super.read(); 133 } catch (final IOException pException) { 134 ioe = pException; 135 } 136 if (ioe != null) { 137 noteError(ioe); 138 } else if (result == -1) { 139 noteFinished(); 140 } else { 141 noteDataByte(result); 142 } 143 return result; 144 } 145 146 @Override 147 public int read(final byte[] pBuffer) throws IOException { 148 int result = 0; 149 IOException ioe = null; 150 try { 151 result = super.read(pBuffer); 152 } catch (final IOException pException) { 153 ioe = pException; 154 } 155 if (ioe != null) { 156 noteError(ioe); 157 } else if (result == -1) { 158 noteFinished(); 159 } else if (result > 0) { 160 noteDataBytes(pBuffer, 0, result); 161 } 162 return result; 163 } 164 165 @Override 166 public int read(final byte[] pBuffer, final int pOffset, final int pLength) throws IOException { 167 int result = 0; 168 IOException ioe = null; 169 try { 170 result = super.read(pBuffer, pOffset, pLength); 171 } catch (final IOException pException) { 172 ioe = pException; 173 } 174 if (ioe != null) { 175 noteError(ioe); 176 } else if (result == -1) { 177 noteFinished(); 178 } else if (result > 0) { 179 noteDataBytes(pBuffer, pOffset, result); 180 } 181 return result; 182 } 183 184 /** Notifies the observers by invoking {@link Observer#data(byte[],int,int)} 185 * with the given arguments. 186 * @param pBuffer Passed to the observers. 187 * @param pOffset Passed to the observers. 188 * @param pLength Passed to the observers. 189 * @throws IOException Some observer has thrown an exception, which is being 190 * passed down. 191 */ 192 protected void noteDataBytes(final byte[] pBuffer, final int pOffset, final int pLength) throws IOException { 193 for (final Observer observer : getObservers()) { 194 observer.data(pBuffer, pOffset, pLength); 195 } 196 } 197 198 /** Notifies the observers by invoking {@link Observer#finished()}. 199 * @throws IOException Some observer has thrown an exception, which is being 200 * passed down. 201 */ 202 protected void noteFinished() throws IOException { 203 for (final Observer observer : getObservers()) { 204 observer.finished(); 205 } 206 } 207 208 /** Notifies the observers by invoking {@link Observer#data(int)} 209 * with the given arguments. 210 * @param pDataByte Passed to the observers. 211 * @throws IOException Some observer has thrown an exception, which is being 212 * passed down. 213 */ 214 protected void noteDataByte(final int pDataByte) throws IOException { 215 for (final Observer observer : getObservers()) { 216 observer.data(pDataByte); 217 } 218 } 219 220 /** Notifies the observers by invoking {@link Observer#error(IOException)} 221 * with the given argument. 222 * @param pException Passed to the observers. 223 * @throws IOException Some observer has thrown an exception, which is being 224 * passed down. This may be the same exception, which has been passed as an 225 * argument. 226 */ 227 protected void noteError(final IOException pException) throws IOException { 228 for (final Observer observer : getObservers()) { 229 observer.error(pException); 230 } 231 } 232 233 /** Notifies the observers by invoking {@link Observer#finished()}. 234 * @throws IOException Some observer has thrown an exception, which is being 235 * passed down. 236 */ 237 protected void noteClosed() throws IOException { 238 for (final Observer observer : getObservers()) { 239 observer.closed(); 240 } 241 } 242 243 /** Gets all currently registered observers. 244 * @return a list of the currently registered observers 245 */ 246 protected List<Observer> getObservers() { 247 return observers; 248 } 249 250 @Override 251 public void close() throws IOException { 252 IOException ioe = null; 253 try { 254 super.close(); 255 } catch (final IOException e) { 256 ioe = e; 257 } 258 if (ioe == null) { 259 noteClosed(); 260 } else { 261 noteError(ioe); 262 } 263 } 264 265 /** Reads all data from the underlying {@link InputStream}, while notifying the 266 * observers. 267 * @throws IOException The underlying {@link InputStream}, or either of the 268 * observers has thrown an exception. 269 */ 270 public void consume() throws IOException { 271 final byte[] buffer = new byte[IOUtils.DEFAULT_BUFFER_SIZE]; 272 for (;;) { 273 final int res = read(buffer); 274 if (res == -1) { 275 return; 276 } 277 } 278 } 279 280}