001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * https://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.commons.io.input; 018 019import static org.apache.commons.io.IOUtils.EOF; 020 021import java.io.FilterInputStream; 022import java.io.IOException; 023import java.io.InputStream; 024 025import org.apache.commons.io.IOUtils; 026import org.apache.commons.io.build.AbstractStreamBuilder; 027import org.apache.commons.io.function.IOIntConsumer; 028 029/** 030 * A proxy stream which acts as a {@link FilterInputStream}, by passing all method calls on to the proxied stream, not changing which methods are called. 031 * <p> 032 * It is an alternative base class to {@link FilterInputStream} to increase reusability, because {@link FilterInputStream} changes the methods being called, 033 * such as read(byte[]) to read(byte[], int, int). 034 * </p> 035 * <p> 036 * In addition, this class allows you to: 037 * </p> 038 * <ul> 039 * <li>notify a subclass that <em>n</em> bytes are about to be read through {@link #beforeRead(int)}</li> 040 * <li>notify a subclass that <em>n</em> bytes were read through {@link #afterRead(int)}</li> 041 * <li>notify a subclass that an exception was caught through {@link #handleIOException(IOException)}</li> 042 * <li>{@link #unwrap()} itself</li> 043 * </ul> 044 */ 045public abstract class ProxyInputStream extends FilterInputStream { 046 047 /** 048 * Abstracts builder properties for subclasses. 049 * 050 * @param <T> The InputStream type. 051 * @param <B> The builder type. 052 * @since 2.18.0 053 */ 054 protected abstract static class AbstractBuilder<T, B extends AbstractStreamBuilder<T, B>> extends AbstractStreamBuilder<T, B> { 055 056 private IOIntConsumer afterRead; 057 058 /** 059 * Constructs a builder of {@code T}. 060 */ 061 protected AbstractBuilder() { 062 // empty 063 } 064 065 /** 066 * Gets the {@link ProxyInputStream#afterRead(int)} consumer. 067 * 068 * @return the {@link ProxyInputStream#afterRead(int)} consumer. 069 */ 070 public IOIntConsumer getAfterRead() { 071 return afterRead; 072 } 073 074 /** 075 * Sets the {@link ProxyInputStream#afterRead(int)} behavior, null resets to a NOOP. 076 * <p> 077 * Setting this value causes the {@link ProxyInputStream#afterRead(int) afterRead} method to delegate to the given consumer. 078 * </p> 079 * <p> 080 * If a subclass overrides {@link ProxyInputStream#afterRead(int) afterRead} and does not call {@code super.afterRead(int)}, then the given consumer is 081 * not called. 082 * </p> 083 * <p> 084 * This does <em>not</em> override a {@code ProxyInputStream} subclass' implementation of the {@link ProxyInputStream#afterRead(int)} method, it can 085 * supplement it. 086 * </p> 087 * 088 * @param afterRead the {@link ProxyInputStream#afterRead(int)} behavior. 089 * @return {@code this} instance. 090 */ 091 public B setAfterRead(final IOIntConsumer afterRead) { 092 this.afterRead = afterRead; 093 return asThis(); 094 } 095 096 } 097 098 /** 099 * Tracks whether {@link #close()} has been called or not. 100 */ 101 private volatile boolean closed; 102 103 private final IOIntConsumer afterRead; 104 105 /** 106 * Constructs a new ProxyInputStream. 107 * 108 * @param builder How to build an instance. 109 * @throws IOException if an I/O error occurs. 110 * @since 2.18.0 111 */ 112 @SuppressWarnings("resource") 113 protected ProxyInputStream(final AbstractBuilder<?, ?> builder) throws IOException { 114 // the delegate is stored in a protected superclass instance variable named 'in'. 115 this(builder.getInputStream(), builder); 116 } 117 118 /** 119 * Constructs a new ProxyInputStream. 120 * 121 * @param proxy the InputStream to proxy. 122 */ 123 public ProxyInputStream(final InputStream proxy) { 124 // the delegate is stored in a protected superclass variable named 'in'. 125 super(proxy); 126 this.afterRead = IOIntConsumer.NOOP; 127 } 128 129 /** 130 * Constructs a new ProxyInputStream. 131 * 132 * @param proxy the InputStream to proxy. 133 * @param builder How to build an instance. 134 * @since 2.18.0 135 */ 136 protected ProxyInputStream(final InputStream proxy, final AbstractBuilder<?, ?> builder) { 137 // the delegate is stored in a protected superclass instance variable named 'in'. 138 super(proxy); 139 this.afterRead = builder.getAfterRead() != null ? builder.getAfterRead() : IOIntConsumer.NOOP; 140 } 141 142 /** 143 * Called by the {@code read} methods after the proxied call has returned successfully. The argument is the number of bytes returned to the caller or 144 * {@link IOUtils#EOF EOF} if the end of stream was reached. 145 * <p> 146 * The default delegates to the consumer given to {@link AbstractBuilder#setAfterRead(IOIntConsumer)}. 147 * </p> 148 * <p> 149 * Alternatively, a subclasses can override this method to add post-processing functionality without having to override all the read methods. 150 * </p> 151 * <p> 152 * Note this method is <em>not</em> called from {@link #skip(long)} or {@link #reset()}. You need to explicitly override those methods if you want to add 153 * post-processing steps also to them. 154 * </p> 155 * 156 * @param n number of bytes read, or {@link IOUtils#EOF EOF} if the end of stream was reached. 157 * @throws IOException Thrown by a subclass or the consumer given to {@link AbstractBuilder#setAfterRead(IOIntConsumer)}. 158 * @since 2.0 159 */ 160 protected void afterRead(final int n) throws IOException { 161 afterRead.accept(n); 162 } 163 164 /** 165 * Invokes the delegate's {@link InputStream#available()} method. 166 * 167 * @return the number of available bytes, 0 if the stream is closed. 168 * @throws IOException if an I/O error occurs. 169 */ 170 @Override 171 public int available() throws IOException { 172 if (in != null && !isClosed()) { 173 try { 174 return in.available(); 175 } catch (final IOException e) { 176 handleIOException(e); 177 } 178 } 179 return 0; 180 } 181 182 /** 183 * Invoked by the {@code read} methods before the call is proxied. The number 184 * of bytes that the caller wanted to read (1 for the {@link #read()} 185 * method, buffer length for {@link #read(byte[])}, etc.) is given as 186 * an argument. 187 * <p> 188 * Subclasses can override this method to add common pre-processing 189 * functionality without having to override all the read methods. 190 * The default implementation does nothing. 191 * </p> 192 * <p> 193 * Note this method is <em>not</em> called from {@link #skip(long)} or 194 * {@link #reset()}. You need to explicitly override those methods if 195 * you want to add pre-processing steps also to them. 196 * </p> 197 * 198 * @param n number of bytes that the caller asked to be read. 199 * @throws IOException if the pre-processing fails in a subclass. 200 * @since 2.0 201 */ 202 @SuppressWarnings("unused") // Possibly thrown from subclasses. 203 protected void beforeRead(final int n) throws IOException { 204 // no-op default 205 } 206 207 /** 208 * Checks if this instance is closed and throws an IOException if so. 209 * 210 * @throws IOException if this instance is closed. 211 */ 212 void checkOpen() throws IOException { 213 Input.checkOpen(!isClosed()); 214 } 215 216 /** 217 * Invokes the delegate's {@link InputStream#close()} method. 218 * 219 * @throws IOException if an I/O error occurs. 220 */ 221 @Override 222 public void close() throws IOException { 223 IOUtils.close(in, this::handleIOException); 224 closed = true; 225 } 226 227 /** 228 * Handles any IOExceptions thrown; by default, throws the given exception. 229 * <p> 230 * This method provides a point to implement custom exception 231 * handling. The default behavior is to re-throw the exception. 232 * </p> 233 * 234 * @param e The IOException thrown. 235 * @throws IOException if an I/O error occurs. 236 * @since 2.0 237 */ 238 protected void handleIOException(final IOException e) throws IOException { 239 throw e; 240 } 241 242 /** 243 * Tests whether this instance is closed. 244 * 245 * @return whether this instance is closed. 246 */ 247 boolean isClosed() { 248 return closed; 249 } 250 251 /** 252 * Invokes the delegate's {@link InputStream#mark(int)} method. 253 * 254 * @param readLimit read ahead limit. 255 */ 256 @Override 257 public synchronized void mark(final int readLimit) { 258 if (in != null) { 259 in.mark(readLimit); 260 } 261 } 262 263 /** 264 * Invokes the delegate's {@link InputStream#markSupported()} method. 265 * 266 * @return {@code true} if this stream instance supports the mark and reset methods; {@code false} otherwise. 267 * @see #mark(int) 268 * @see #reset() 269 */ 270 @Override 271 public boolean markSupported() { 272 return in != null && in.markSupported(); 273 } 274 275 /** 276 * Invokes the delegate's {@link InputStream#read()} method unless the stream is closed. 277 * 278 * @return the byte read or {@link IOUtils#EOF EOF} if we reached the end of stream. 279 * @throws IOException if an I/O error occurs. 280 */ 281 @Override 282 public int read() throws IOException { 283 try { 284 beforeRead(1); 285 final int b = in.read(); 286 afterRead(b != EOF ? 1 : EOF); 287 return b; 288 } catch (final IOException e) { 289 handleIOException(e); 290 return EOF; 291 } 292 } 293 294 /** 295 * Invokes the delegate's {@link InputStream#read(byte[])} method. 296 * 297 * @param b the buffer to read the bytes into. 298 * @return the number of bytes read or {@link IOUtils#EOF EOF} if we reached the end of stream. 299 * @throws IOException 300 * <ul> 301 * <li>If the first byte cannot be read for any reason other than the end of the file, 302 * <li>if the input stream has been closed, or</li> 303 * <li>if some other I/O error occurs.</li> 304 * </ul> 305 */ 306 @Override 307 public int read(final byte[] b) throws IOException { 308 try { 309 beforeRead(IOUtils.length(b)); 310 final int n = in.read(b); 311 afterRead(n); 312 return n; 313 } catch (final IOException e) { 314 handleIOException(e); 315 return EOF; 316 } 317 } 318 319 /** 320 * Invokes the delegate's {@link InputStream#read(byte[], int, int)} method. 321 * 322 * @param b the buffer to read the bytes into. 323 * @param off The start offset. 324 * @param len The number of bytes to read. 325 * @return the number of bytes read or {@link IOUtils#EOF EOF} if we reached the end of stream. 326 * @throws IOException 327 * <ul> 328 * <li>If the first byte cannot be read for any reason other than the end of the file, 329 * <li>if the input stream has been closed, or</li> 330 * <li>if some other I/O error occurs.</li> 331 * </ul> 332 */ 333 @Override 334 public int read(final byte[] b, final int off, final int len) throws IOException { 335 try { 336 beforeRead(len); 337 final int n = in.read(b, off, len); 338 afterRead(n); 339 return n; 340 } catch (final IOException e) { 341 handleIOException(e); 342 return EOF; 343 } 344 } 345 346 /** 347 * Invokes the delegate's {@link InputStream#reset()} method. 348 * 349 * @throws IOException if this stream has not been marked or if the mark has been invalidated. 350 */ 351 @Override 352 public synchronized void reset() throws IOException { 353 try { 354 in.reset(); 355 } catch (final IOException e) { 356 handleIOException(e); 357 } 358 } 359 360 /** 361 * Sets the underlying input stream. 362 * 363 * @param in The input stream to set in {@link java.io.FilterInputStream#in}. 364 * @return {@code this} instance. 365 * @since 2.19.0 366 */ 367 public ProxyInputStream setReference(final InputStream in) { 368 this.in = in; 369 return this; 370 } 371 372 /** 373 * Invokes the delegate's {@link InputStream#skip(long)} method. 374 * 375 * @param n the number of bytes to skip. 376 * @return the actual number of bytes skipped. 377 * @throws IOException if the stream does not support seek, or if some other I/O error occurs. 378 */ 379 @Override 380 public long skip(final long n) throws IOException { 381 try { 382 return in.skip(n); 383 } catch (final IOException e) { 384 handleIOException(e); 385 return 0; 386 } 387 } 388 389 /** 390 * Unwraps this instance by returning the underlying {@link InputStream}. 391 * <p> 392 * Use with caution; useful to query the underlying {@link InputStream}. 393 * </p> 394 * 395 * @return the underlying {@link InputStream}. 396 * @since 2.16.0 397 */ 398 public InputStream unwrap() { 399 return in; 400 } 401 402}