View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.commons.io.input;
18  
19  import static org.apache.commons.io.IOUtils.EOF;
20  
21  import java.io.FilterInputStream;
22  import java.io.IOException;
23  import java.io.InputStream;
24  
25  import org.apache.commons.io.IOUtils;
26  import org.apache.commons.io.build.AbstractStreamBuilder;
27  import org.apache.commons.io.function.Erase;
28  import org.apache.commons.io.function.IOConsumer;
29  import org.apache.commons.io.function.IOIntConsumer;
30  
31  /**
32   * A proxy stream which acts as a {@link FilterInputStream}, by passing all method calls on to the proxied stream, not changing which methods are called.
33   * <p>
34   * It is an alternative base class to {@link FilterInputStream} to increase reusability, because {@link FilterInputStream} changes the methods being called,
35   * such as read(byte[]) to read(byte[], int, int).
36   * </p>
37   * <p>
38   * In addition, this class allows you to:
39   * </p>
40   * <ul>
41   * <li>notify a subclass that <em>n</em> bytes are about to be read through {@link #beforeRead(int)}</li>
42   * <li>notify a subclass that <em>n</em> bytes were read through {@link #afterRead(int)}</li>
43   * <li>notify a subclass that an exception was caught through {@link #handleIOException(IOException)}</li>
44   * <li>{@link #unwrap()} itself</li>
45   * </ul>
46   */
47  public abstract class ProxyInputStream extends FilterInputStream {
48  
49      /**
50       * Abstracts builder properties for subclasses.
51       *
52       * @param <T> The InputStream type.
53       * @param <B> The builder type.
54       * @since 2.18.0
55       */
56      protected abstract static class AbstractBuilder<T, B extends AbstractStreamBuilder<T, B>> extends AbstractStreamBuilder<T, B> {
57  
58          private IOIntConsumer afterRead;
59  
60          /**
61           * Constructs a builder of {@code T}.
62           */
63          protected AbstractBuilder() {
64              // empty
65          }
66  
67          /**
68           * Gets the {@link ProxyInputStream#afterRead(int)} consumer.
69           *
70           * @return the {@link ProxyInputStream#afterRead(int)} consumer.
71           */
72          public IOIntConsumer getAfterRead() {
73              return afterRead;
74          }
75  
76          /**
77           * Sets the {@link ProxyInputStream#afterRead(int)} behavior, null resets to a NOOP.
78           * <p>
79           * Setting this value causes the {@link ProxyInputStream#afterRead(int) afterRead} method to delegate to the given consumer.
80           * </p>
81           * <p>
82           * If a subclass overrides {@link ProxyInputStream#afterRead(int) afterRead} and does not call {@code super.afterRead(int)}, then the given consumer is
83           * not called.
84           * </p>
85           * <p>
86           * This does <em>not</em> override a {@code ProxyInputStream} subclass' implementation of the {@link ProxyInputStream#afterRead(int)} method, it can
87           * supplement it.
88           * </p>
89           *
90           * @param afterRead the {@link ProxyInputStream#afterRead(int)} behavior.
91           * @return this instance.
92           */
93          public B setAfterRead(final IOIntConsumer afterRead) {
94              this.afterRead = afterRead;
95              return asThis();
96          }
97  
98      }
99  
100     /**
101      * Tracks whether {@link #close()} has been called or not.
102      */
103     private boolean closed;
104 
105     /**
106      * Handles exceptions.
107      */
108     private final IOConsumer<IOException> exceptionHandler;
109 
110     private final IOIntConsumer afterRead;
111 
112     /**
113      * Constructs a new ProxyInputStream.
114      *
115      * @param builder  How to build an instance.
116      * @throws IOException if an I/O error occurs.
117      * @since 2.18.0
118      */
119     @SuppressWarnings("resource")
120     protected ProxyInputStream(final AbstractBuilder<?, ?> builder) throws IOException {
121         // the delegate is stored in a protected superclass instance variable named 'in'.
122         this(builder.getInputStream(), builder);
123     }
124 
125     /**
126      * Constructs a new ProxyInputStream.
127      *
128      * @param proxy  the InputStream to proxy.
129      */
130     public ProxyInputStream(final InputStream proxy) {
131         // the delegate is stored in a protected superclass variable named 'in'.
132         super(proxy);
133         this.exceptionHandler = Erase::rethrow;
134         this.afterRead = IOIntConsumer.NOOP;
135     }
136 
137     /**
138      * Constructs a new ProxyInputStream.
139      *
140      * @param proxy  the InputStream to proxy.
141      * @param builder  How to build an instance.
142      * @since 2.18.0
143      */
144     protected ProxyInputStream(final InputStream proxy, final AbstractBuilder<?, ?> builder) {
145         // the delegate is stored in a protected superclass instance variable named 'in'.
146         super(proxy);
147         this.exceptionHandler = Erase::rethrow;
148         this.afterRead = builder.getAfterRead() != null ? builder.getAfterRead() : IOIntConsumer.NOOP;
149     }
150 
151     /**
152      * Called by the {@code read} methods after the proxied call has returned successfully. The argument is the number of bytes returned to the caller or
153      * {@link IOUtils#EOF EOF} if the end of stream was reached.
154      * <p>
155      * The default delegates to the consumer given to {@link AbstractBuilder#setAfterRead(IOIntConsumer)}.
156      * </p>
157      * <p>
158      * Alternatively, a subclasses can override this method to add post-processing functionality without having to override all the read methods.
159      * </p>
160      * <p>
161      * Note this method is <em>not</em> called from {@link #skip(long)} or {@link #reset()}. You need to explicitly override those methods if you want to add
162      * post-processing steps also to them.
163      * </p>
164      *
165      * @param n number of bytes read, or {@link IOUtils#EOF EOF} if the end of stream was reached.
166      * @throws IOException Thrown by a subclass or the consumer given to {@link AbstractBuilder#setAfterRead(IOIntConsumer)}.
167      * @since 2.0
168      */
169     protected void afterRead(final int n) throws IOException {
170         afterRead.accept(n);
171     }
172 
173     /**
174      * Invokes the delegate's {@link InputStream#available()} method.
175      *
176      * @return the number of available bytes, 0 if the stream is closed.
177      * @throws IOException if an I/O error occurs.
178      */
179     @Override
180     public int available() throws IOException {
181         if (in != null && !isClosed()) {
182             try {
183                 return in.available();
184             } catch (final IOException e) {
185                 handleIOException(e);
186             }
187         }
188         return 0;
189     }
190 
191     /**
192      * Invoked by the {@code read} methods before the call is proxied. The number
193      * of bytes that the caller wanted to read (1 for the {@link #read()}
194      * method, buffer length for {@link #read(byte[])}, etc.) is given as
195      * an argument.
196      * <p>
197      * Subclasses can override this method to add common pre-processing
198      * functionality without having to override all the read methods.
199      * The default implementation does nothing.
200      * </p>
201      * <p>
202      * Note this method is <em>not</em> called from {@link #skip(long)} or
203      * {@link #reset()}. You need to explicitly override those methods if
204      * you want to add pre-processing steps also to them.
205      * </p>
206      *
207      * @param n number of bytes that the caller asked to be read.
208      * @throws IOException if the pre-processing fails in a subclass.
209      * @since 2.0
210      */
211     @SuppressWarnings("unused") // Possibly thrown from subclasses.
212     protected void beforeRead(final int n) throws IOException {
213         // no-op default
214     }
215 
216     /**
217      * Checks if this instance is closed and throws an IOException if so.
218      *
219      * @throws IOException if this instance is closed.
220      */
221     void checkOpen() throws IOException {
222         Input.checkOpen(!isClosed());
223     }
224 
225     /**
226      * Invokes the delegate's {@link InputStream#close()} method.
227      *
228      * @throws IOException if an I/O error occurs.
229      */
230     @Override
231     public void close() throws IOException {
232         IOUtils.close(in, this::handleIOException);
233         closed = true;
234     }
235 
236     /**
237      * Handles any IOExceptions thrown; by default, throws the given exception.
238      * <p>
239      * This method provides a point to implement custom exception
240      * handling. The default behavior is to re-throw the exception.
241      * </p>
242      *
243      * @param e The IOException thrown.
244      * @throws IOException if an I/O error occurs.
245      * @since 2.0
246      */
247     protected void handleIOException(final IOException e) throws IOException {
248         exceptionHandler.accept(e);
249     }
250 
251     /**
252      * Tests whether this instance is closed.
253      *
254      * @return whether this instance is closed.
255      */
256     boolean isClosed() {
257         return closed;
258     }
259 
260     /**
261      * Invokes the delegate's {@link InputStream#mark(int)} method.
262      *
263      * @param readLimit read ahead limit.
264      */
265     @Override
266     public synchronized void mark(final int readLimit) {
267         if (in != null) {
268             in.mark(readLimit);
269         }
270     }
271 
272     /**
273      * Invokes the delegate's {@link InputStream#markSupported()} method.
274      *
275      * @return {@code true} if this stream instance supports the mark and reset methods; {@code false} otherwise.
276      * @see #mark(int)
277      * @see #reset()
278      */
279     @Override
280     public boolean markSupported() {
281         return in != null && in.markSupported();
282     }
283 
284     /**
285      * Invokes the delegate's {@link InputStream#read()} method unless the stream is closed.
286      *
287      * @return the byte read or {@link IOUtils#EOF EOF} if we reached the end of stream.
288      * @throws IOException if an I/O error occurs.
289      */
290     @Override
291     public int read() throws IOException {
292         try {
293             beforeRead(1);
294             final int b = in.read();
295             afterRead(b != EOF ? 1 : EOF);
296             return b;
297         } catch (final IOException e) {
298             handleIOException(e);
299             return EOF;
300         }
301     }
302 
303     /**
304      * Invokes the delegate's {@link InputStream#read(byte[])} method.
305      *
306      * @param b the buffer to read the bytes into.
307      * @return the number of bytes read or {@link IOUtils#EOF EOF} if we reached the end of stream.
308      * @throws IOException
309      *                     <ul>
310      *                     <li>If the first byte cannot be read for any reason other than the end of the file,
311      *                     <li>if the input stream has been closed, or</li>
312      *                     <li>if some other I/O error occurs.</li>
313      *                     </ul>
314      */
315     @Override
316     public int read(final byte[] b) throws IOException {
317         try {
318             beforeRead(IOUtils.length(b));
319             final int n = in.read(b);
320             afterRead(n);
321             return n;
322         } catch (final IOException e) {
323             handleIOException(e);
324             return EOF;
325         }
326     }
327 
328     /**
329      * Invokes the delegate's {@link InputStream#read(byte[], int, int)} method.
330      *
331      * @param b   the buffer to read the bytes into.
332      * @param off The start offset.
333      * @param len The number of bytes to read.
334      * @return the number of bytes read or {@link IOUtils#EOF EOF} if we reached the end of stream.
335      * @throws IOException
336      *                     <ul>
337      *                     <li>If the first byte cannot be read for any reason other than the end of the file,
338      *                     <li>if the input stream has been closed, or</li>
339      *                     <li>if some other I/O error occurs.</li>
340      *                     </ul>
341      */
342     @Override
343     public int read(final byte[] b, final int off, final int len) throws IOException {
344         try {
345             beforeRead(len);
346             final int n = in.read(b, off, len);
347             afterRead(n);
348             return n;
349         } catch (final IOException e) {
350             handleIOException(e);
351             return EOF;
352         }
353     }
354 
355     /**
356      * Invokes the delegate's {@link InputStream#reset()} method.
357      *
358      * @throws IOException if this stream has not been marked or if the mark has been invalidated.
359      */
360     @Override
361     public synchronized void reset() throws IOException {
362         try {
363             in.reset();
364         } catch (final IOException e) {
365             handleIOException(e);
366         }
367     }
368 
369     /**
370      * Sets the underlying input stream.
371      *
372      * @param in The input stream to set in {@link java.io.FilterInputStream#in}.
373      * @return this instance.
374      * @since 2.19.0
375      */
376     public ProxyInputStream setReference(final InputStream in) {
377         this.in = in;
378         return this;
379     }
380 
381     /**
382      * Invokes the delegate's {@link InputStream#skip(long)} method.
383      *
384      * @param n the number of bytes to skip.
385      * @return the actual number of bytes skipped.
386      * @throws IOException if the stream does not support seek, or if some other I/O error occurs.
387      */
388     @Override
389     public long skip(final long n) throws IOException {
390         try {
391             return in.skip(n);
392         } catch (final IOException e) {
393             handleIOException(e);
394             return 0;
395         }
396     }
397 
398     /**
399      * Unwraps this instance by returning the underlying {@link InputStream}.
400      * <p>
401      * Use with caution; useful to query the underlying {@link InputStream}.
402      * </p>
403      *
404      * @return the underlying {@link InputStream}.
405      * @since 2.16.0
406      */
407     public InputStream unwrap() {
408         return in;
409     }
410 
411 }