View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *      https://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.commons.io.input;
18  
19  import static org.apache.commons.io.IOUtils.EOF;
20  
21  import java.io.FilterInputStream;
22  import java.io.IOException;
23  import java.io.InputStream;
24  
25  import org.apache.commons.io.IOUtils;
26  import org.apache.commons.io.build.AbstractStreamBuilder;
27  import org.apache.commons.io.function.IOIntConsumer;
28  
29  /**
30   * A proxy stream which acts as a {@link FilterInputStream}, by passing all method calls on to the proxied stream, not changing which methods are called.
31   * <p>
32   * It is an alternative base class to {@link FilterInputStream} to increase reusability, because {@link FilterInputStream} changes the methods being called,
33   * such as read(byte[]) to read(byte[], int, int).
34   * </p>
35   * <p>
36   * In addition, this class allows you to:
37   * </p>
38   * <ul>
39   * <li>notify a subclass that <em>n</em> bytes are about to be read through {@link #beforeRead(int)}</li>
40   * <li>notify a subclass that <em>n</em> bytes were read through {@link #afterRead(int)}</li>
41   * <li>notify a subclass that an exception was caught through {@link #handleIOException(IOException)}</li>
42   * <li>{@link #unwrap()} itself</li>
43   * </ul>
44   */
45  public abstract class ProxyInputStream extends FilterInputStream {
46  
47      /**
48       * Abstracts builder properties for subclasses.
49       *
50       * @param <T> The InputStream type.
51       * @param <B> The builder type.
52       * @since 2.18.0
53       */
54      protected abstract static class AbstractBuilder<T, B extends AbstractStreamBuilder<T, B>> extends AbstractStreamBuilder<T, B> {
55  
56          private IOIntConsumer afterRead;
57  
58          /**
59           * Constructs a builder of {@code T}.
60           */
61          protected AbstractBuilder() {
62              // empty
63          }
64  
65          /**
66           * Gets the {@link ProxyInputStream#afterRead(int)} consumer.
67           *
68           * @return the {@link ProxyInputStream#afterRead(int)} consumer.
69           */
70          public IOIntConsumer getAfterRead() {
71              return afterRead;
72          }
73  
74          /**
75           * Sets the {@link ProxyInputStream#afterRead(int)} behavior, null resets to a NOOP.
76           * <p>
77           * Setting this value causes the {@link ProxyInputStream#afterRead(int) afterRead} method to delegate to the given consumer.
78           * </p>
79           * <p>
80           * If a subclass overrides {@link ProxyInputStream#afterRead(int) afterRead} and does not call {@code super.afterRead(int)}, then the given consumer is
81           * not called.
82           * </p>
83           * <p>
84           * This does <em>not</em> override a {@code ProxyInputStream} subclass' implementation of the {@link ProxyInputStream#afterRead(int)} method, it can
85           * supplement it.
86           * </p>
87           *
88           * @param afterRead the {@link ProxyInputStream#afterRead(int)} behavior.
89           * @return {@code this} instance.
90           */
91          public B setAfterRead(final IOIntConsumer afterRead) {
92              this.afterRead = afterRead;
93              return asThis();
94          }
95  
96      }
97  
98      /**
99       * Tracks whether {@link #close()} has been called or not.
100      */
101     private volatile boolean closed;
102 
103     private final IOIntConsumer afterRead;
104 
105     /**
106      * Constructs a new ProxyInputStream.
107      *
108      * @param builder  How to build an instance.
109      * @throws IOException if an I/O error occurs.
110      * @since 2.18.0
111      */
112     @SuppressWarnings("resource")
113     protected ProxyInputStream(final AbstractBuilder<?, ?> builder) throws IOException {
114         // the delegate is stored in a protected superclass instance variable named 'in'.
115         this(builder.getInputStream(), builder);
116     }
117 
118     /**
119      * Constructs a new ProxyInputStream.
120      *
121      * @param proxy  the InputStream to proxy.
122      */
123     public ProxyInputStream(final InputStream proxy) {
124         // the delegate is stored in a protected superclass variable named 'in'.
125         super(proxy);
126         this.afterRead = IOIntConsumer.NOOP;
127     }
128 
129     /**
130      * Constructs a new ProxyInputStream.
131      *
132      * @param proxy  the InputStream to proxy.
133      * @param builder  How to build an instance.
134      * @since 2.18.0
135      */
136     protected ProxyInputStream(final InputStream proxy, final AbstractBuilder<?, ?> builder) {
137         // the delegate is stored in a protected superclass instance variable named 'in'.
138         super(proxy);
139         this.afterRead = builder.getAfterRead() != null ? builder.getAfterRead() : IOIntConsumer.NOOP;
140     }
141 
142     /**
143      * Called by the {@code read} methods after the proxied call has returned successfully. The argument is the number of bytes returned to the caller or
144      * {@link IOUtils#EOF EOF} if the end of stream was reached.
145      * <p>
146      * The default delegates to the consumer given to {@link AbstractBuilder#setAfterRead(IOIntConsumer)}.
147      * </p>
148      * <p>
149      * Alternatively, a subclasses can override this method to add post-processing functionality without having to override all the read methods.
150      * </p>
151      * <p>
152      * Note this method is <em>not</em> called from {@link #skip(long)} or {@link #reset()}. You need to explicitly override those methods if you want to add
153      * post-processing steps also to them.
154      * </p>
155      *
156      * @param n number of bytes read, or {@link IOUtils#EOF EOF} if the end of stream was reached.
157      * @throws IOException Thrown by a subclass or the consumer given to {@link AbstractBuilder#setAfterRead(IOIntConsumer)}.
158      * @since 2.0
159      */
160     protected void afterRead(final int n) throws IOException {
161         afterRead.accept(n);
162     }
163 
164     /**
165      * Invokes the delegate's {@link InputStream#available()} method.
166      *
167      * @return the number of available bytes, 0 if the stream is closed.
168      * @throws IOException if an I/O error occurs.
169      */
170     @Override
171     public int available() throws IOException {
172         if (in != null && !isClosed()) {
173             try {
174                 return in.available();
175             } catch (final IOException e) {
176                 handleIOException(e);
177             }
178         }
179         return 0;
180     }
181 
182     /**
183      * Invoked by the {@code read} methods before the call is proxied. The number
184      * of bytes that the caller wanted to read (1 for the {@link #read()}
185      * method, buffer length for {@link #read(byte[])}, etc.) is given as
186      * an argument.
187      * <p>
188      * Subclasses can override this method to add common pre-processing
189      * functionality without having to override all the read methods.
190      * The default implementation does nothing.
191      * </p>
192      * <p>
193      * Note this method is <em>not</em> called from {@link #skip(long)} or
194      * {@link #reset()}. You need to explicitly override those methods if
195      * you want to add pre-processing steps also to them.
196      * </p>
197      *
198      * @param n number of bytes that the caller asked to be read.
199      * @throws IOException if the pre-processing fails in a subclass.
200      * @since 2.0
201      */
202     @SuppressWarnings("unused") // Possibly thrown from subclasses.
203     protected void beforeRead(final int n) throws IOException {
204         // no-op default
205     }
206 
207     /**
208      * Checks if this instance is closed and throws an IOException if so.
209      *
210      * @throws IOException if this instance is closed.
211      */
212     void checkOpen() throws IOException {
213         Input.checkOpen(!isClosed());
214     }
215 
216     /**
217      * Invokes the delegate's {@link InputStream#close()} method.
218      *
219      * @throws IOException if an I/O error occurs.
220      */
221     @Override
222     public void close() throws IOException {
223         IOUtils.close(in, this::handleIOException);
224         closed = true;
225     }
226 
227     /**
228      * Handles any IOExceptions thrown; by default, throws the given exception.
229      * <p>
230      * This method provides a point to implement custom exception
231      * handling. The default behavior is to re-throw the exception.
232      * </p>
233      *
234      * @param e The IOException thrown.
235      * @throws IOException if an I/O error occurs.
236      * @since 2.0
237      */
238     protected void handleIOException(final IOException e) throws IOException {
239         throw e;
240     }
241 
242     /**
243      * Tests whether this instance is closed.
244      *
245      * @return whether this instance is closed.
246      */
247     boolean isClosed() {
248         return closed;
249     }
250 
251     /**
252      * Invokes the delegate's {@link InputStream#mark(int)} method.
253      *
254      * @param readLimit read ahead limit.
255      */
256     @Override
257     public synchronized void mark(final int readLimit) {
258         if (in != null) {
259             in.mark(readLimit);
260         }
261     }
262 
263     /**
264      * Invokes the delegate's {@link InputStream#markSupported()} method.
265      *
266      * @return {@code true} if this stream instance supports the mark and reset methods; {@code false} otherwise.
267      * @see #mark(int)
268      * @see #reset()
269      */
270     @Override
271     public boolean markSupported() {
272         return in != null && in.markSupported();
273     }
274 
275     /**
276      * Invokes the delegate's {@link InputStream#read()} method unless the stream is closed.
277      *
278      * @return the byte read or {@link IOUtils#EOF EOF} if we reached the end of stream.
279      * @throws IOException if an I/O error occurs.
280      */
281     @Override
282     public int read() throws IOException {
283         try {
284             beforeRead(1);
285             final int b = in.read();
286             afterRead(b != EOF ? 1 : EOF);
287             return b;
288         } catch (final IOException e) {
289             handleIOException(e);
290             return EOF;
291         }
292     }
293 
294     /**
295      * Invokes the delegate's {@link InputStream#read(byte[])} method.
296      *
297      * @param b the buffer to read the bytes into.
298      * @return the number of bytes read or {@link IOUtils#EOF EOF} if we reached the end of stream.
299      * @throws IOException
300      *                     <ul>
301      *                     <li>If the first byte cannot be read for any reason other than the end of the file,
302      *                     <li>if the input stream has been closed, or</li>
303      *                     <li>if some other I/O error occurs.</li>
304      *                     </ul>
305      */
306     @Override
307     public int read(final byte[] b) throws IOException {
308         try {
309             beforeRead(IOUtils.length(b));
310             final int n = in.read(b);
311             afterRead(n);
312             return n;
313         } catch (final IOException e) {
314             handleIOException(e);
315             return EOF;
316         }
317     }
318 
319     /**
320      * Invokes the delegate's {@link InputStream#read(byte[], int, int)} method.
321      *
322      * @param b   the buffer to read the bytes into.
323      * @param off The start offset.
324      * @param len The number of bytes to read.
325      * @return the number of bytes read or {@link IOUtils#EOF EOF} if we reached the end of stream.
326      * @throws IOException
327      *                     <ul>
328      *                     <li>If the first byte cannot be read for any reason other than the end of the file,
329      *                     <li>if the input stream has been closed, or</li>
330      *                     <li>if some other I/O error occurs.</li>
331      *                     </ul>
332      */
333     @Override
334     public int read(final byte[] b, final int off, final int len) throws IOException {
335         try {
336             beforeRead(len);
337             final int n = in.read(b, off, len);
338             afterRead(n);
339             return n;
340         } catch (final IOException e) {
341             handleIOException(e);
342             return EOF;
343         }
344     }
345 
346     /**
347      * Invokes the delegate's {@link InputStream#reset()} method.
348      *
349      * @throws IOException if this stream has not been marked or if the mark has been invalidated.
350      */
351     @Override
352     public synchronized void reset() throws IOException {
353         try {
354             in.reset();
355         } catch (final IOException e) {
356             handleIOException(e);
357         }
358     }
359 
360     /**
361      * Sets the underlying input stream.
362      *
363      * @param in The input stream to set in {@link java.io.FilterInputStream#in}.
364      * @return {@code this} instance.
365      * @since 2.19.0
366      */
367     public ProxyInputStream setReference(final InputStream in) {
368         this.in = in;
369         return this;
370     }
371 
372     /**
373      * Invokes the delegate's {@link InputStream#skip(long)} method.
374      *
375      * @param n the number of bytes to skip.
376      * @return the actual number of bytes skipped.
377      * @throws IOException if the stream does not support seek, or if some other I/O error occurs.
378      */
379     @Override
380     public long skip(final long n) throws IOException {
381         try {
382             return in.skip(n);
383         } catch (final IOException e) {
384             handleIOException(e);
385             return 0;
386         }
387     }
388 
389     /**
390      * Unwraps this instance by returning the underlying {@link InputStream}.
391      * <p>
392      * Use with caution; useful to query the underlying {@link InputStream}.
393      * </p>
394      *
395      * @return the underlying {@link InputStream}.
396      * @since 2.16.0
397      */
398     public InputStream unwrap() {
399         return in;
400     }
401 
402 }