001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      https://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.commons.io.input;
018
019import static org.apache.commons.io.IOUtils.EOF;
020
021import java.io.FilterInputStream;
022import java.io.IOException;
023import java.io.InputStream;
024
025import org.apache.commons.io.IOUtils;
026import org.apache.commons.io.build.AbstractStreamBuilder;
027import org.apache.commons.io.function.IOIntConsumer;
028
029/**
030 * A proxy stream which acts as a {@link FilterInputStream}, by passing all method calls on to the proxied stream, not changing which methods are called.
031 * <p>
032 * It is an alternative base class to {@link FilterInputStream} to increase reusability, because {@link FilterInputStream} changes the methods being called,
033 * such as read(byte[]) to read(byte[], int, int).
034 * </p>
035 * <p>
036 * In addition, this class allows you to:
037 * </p>
038 * <ul>
039 * <li>notify a subclass that <em>n</em> bytes are about to be read through {@link #beforeRead(int)}</li>
040 * <li>notify a subclass that <em>n</em> bytes were read through {@link #afterRead(int)}</li>
041 * <li>notify a subclass that an exception was caught through {@link #handleIOException(IOException)}</li>
042 * <li>{@link #unwrap()} itself</li>
043 * </ul>
044 */
045public abstract class ProxyInputStream extends FilterInputStream {
046
047    /**
048     * Abstracts builder properties for subclasses.
049     *
050     * @param <T> The InputStream type.
051     * @param <B> The builder type.
052     * @since 2.18.0
053     */
054    protected abstract static class AbstractBuilder<T, B extends AbstractStreamBuilder<T, B>> extends AbstractStreamBuilder<T, B> {
055
056        private IOIntConsumer afterRead;
057
058        /**
059         * Constructs a builder of {@code T}.
060         */
061        protected AbstractBuilder() {
062            // empty
063        }
064
065        /**
066         * Gets the {@link ProxyInputStream#afterRead(int)} consumer.
067         *
068         * @return the {@link ProxyInputStream#afterRead(int)} consumer.
069         */
070        public IOIntConsumer getAfterRead() {
071            return afterRead;
072        }
073
074        /**
075         * Sets the {@link ProxyInputStream#afterRead(int)} behavior, null resets to a NOOP.
076         * <p>
077         * Setting this value causes the {@link ProxyInputStream#afterRead(int) afterRead} method to delegate to the given consumer.
078         * </p>
079         * <p>
080         * If a subclass overrides {@link ProxyInputStream#afterRead(int) afterRead} and does not call {@code super.afterRead(int)}, then the given consumer is
081         * not called.
082         * </p>
083         * <p>
084         * This does <em>not</em> override a {@code ProxyInputStream} subclass' implementation of the {@link ProxyInputStream#afterRead(int)} method, it can
085         * supplement it.
086         * </p>
087         *
088         * @param afterRead the {@link ProxyInputStream#afterRead(int)} behavior.
089         * @return {@code this} instance.
090         */
091        public B setAfterRead(final IOIntConsumer afterRead) {
092            this.afterRead = afterRead;
093            return asThis();
094        }
095
096    }
097
098    /**
099     * Tracks whether {@link #close()} has been called or not.
100     */
101    private volatile boolean closed;
102
103    private final IOIntConsumer afterRead;
104
105    /**
106     * Constructs a new ProxyInputStream.
107     *
108     * @param builder  How to build an instance.
109     * @throws IOException if an I/O error occurs.
110     * @since 2.18.0
111     */
112    @SuppressWarnings("resource")
113    protected ProxyInputStream(final AbstractBuilder<?, ?> builder) throws IOException {
114        // the delegate is stored in a protected superclass instance variable named 'in'.
115        this(builder.getInputStream(), builder);
116    }
117
118    /**
119     * Constructs a new ProxyInputStream.
120     *
121     * @param proxy  the InputStream to proxy.
122     */
123    public ProxyInputStream(final InputStream proxy) {
124        // the delegate is stored in a protected superclass variable named 'in'.
125        super(proxy);
126        this.afterRead = IOIntConsumer.NOOP;
127    }
128
129    /**
130     * Constructs a new ProxyInputStream.
131     *
132     * @param proxy  the InputStream to proxy.
133     * @param builder  How to build an instance.
134     * @since 2.18.0
135     */
136    protected ProxyInputStream(final InputStream proxy, final AbstractBuilder<?, ?> builder) {
137        // the delegate is stored in a protected superclass instance variable named 'in'.
138        super(proxy);
139        this.afterRead = builder.getAfterRead() != null ? builder.getAfterRead() : IOIntConsumer.NOOP;
140    }
141
142    /**
143     * Called by the {@code read} methods after the proxied call has returned successfully. The argument is the number of bytes returned to the caller or
144     * {@link IOUtils#EOF EOF} if the end of stream was reached.
145     * <p>
146     * The default delegates to the consumer given to {@link AbstractBuilder#setAfterRead(IOIntConsumer)}.
147     * </p>
148     * <p>
149     * Alternatively, a subclasses can override this method to add post-processing functionality without having to override all the read methods.
150     * </p>
151     * <p>
152     * Note this method is <em>not</em> called from {@link #skip(long)} or {@link #reset()}. You need to explicitly override those methods if you want to add
153     * post-processing steps also to them.
154     * </p>
155     *
156     * @param n number of bytes read, or {@link IOUtils#EOF EOF} if the end of stream was reached.
157     * @throws IOException Thrown by a subclass or the consumer given to {@link AbstractBuilder#setAfterRead(IOIntConsumer)}.
158     * @since 2.0
159     */
160    protected void afterRead(final int n) throws IOException {
161        afterRead.accept(n);
162    }
163
164    /**
165     * Invokes the delegate's {@link InputStream#available()} method.
166     *
167     * @return the number of available bytes, 0 if the stream is closed.
168     * @throws IOException if an I/O error occurs.
169     */
170    @Override
171    public int available() throws IOException {
172        if (in != null && !isClosed()) {
173            try {
174                return in.available();
175            } catch (final IOException e) {
176                handleIOException(e);
177            }
178        }
179        return 0;
180    }
181
182    /**
183     * Invoked by the {@code read} methods before the call is proxied. The number
184     * of bytes that the caller wanted to read (1 for the {@link #read()}
185     * method, buffer length for {@link #read(byte[])}, etc.) is given as
186     * an argument.
187     * <p>
188     * Subclasses can override this method to add common pre-processing
189     * functionality without having to override all the read methods.
190     * The default implementation does nothing.
191     * </p>
192     * <p>
193     * Note this method is <em>not</em> called from {@link #skip(long)} or
194     * {@link #reset()}. You need to explicitly override those methods if
195     * you want to add pre-processing steps also to them.
196     * </p>
197     *
198     * @param n number of bytes that the caller asked to be read.
199     * @throws IOException if the pre-processing fails in a subclass.
200     * @since 2.0
201     */
202    @SuppressWarnings("unused") // Possibly thrown from subclasses.
203    protected void beforeRead(final int n) throws IOException {
204        // no-op default
205    }
206
207    /**
208     * Checks if this instance is closed and throws an IOException if so.
209     *
210     * @throws IOException if this instance is closed.
211     */
212    void checkOpen() throws IOException {
213        Input.checkOpen(!isClosed());
214    }
215
216    /**
217     * Invokes the delegate's {@link InputStream#close()} method.
218     *
219     * @throws IOException if an I/O error occurs.
220     */
221    @Override
222    public void close() throws IOException {
223        IOUtils.close(in, this::handleIOException);
224        closed = true;
225    }
226
227    /**
228     * Handles any IOExceptions thrown; by default, throws the given exception.
229     * <p>
230     * This method provides a point to implement custom exception
231     * handling. The default behavior is to re-throw the exception.
232     * </p>
233     *
234     * @param e The IOException thrown.
235     * @throws IOException if an I/O error occurs.
236     * @since 2.0
237     */
238    protected void handleIOException(final IOException e) throws IOException {
239        throw e;
240    }
241
242    /**
243     * Tests whether this instance is closed.
244     *
245     * @return whether this instance is closed.
246     */
247    boolean isClosed() {
248        return closed;
249    }
250
251    /**
252     * Invokes the delegate's {@link InputStream#mark(int)} method.
253     *
254     * @param readLimit read ahead limit.
255     */
256    @Override
257    public synchronized void mark(final int readLimit) {
258        if (in != null) {
259            in.mark(readLimit);
260        }
261    }
262
263    /**
264     * Invokes the delegate's {@link InputStream#markSupported()} method.
265     *
266     * @return {@code true} if this stream instance supports the mark and reset methods; {@code false} otherwise.
267     * @see #mark(int)
268     * @see #reset()
269     */
270    @Override
271    public boolean markSupported() {
272        return in != null && in.markSupported();
273    }
274
275    /**
276     * Invokes the delegate's {@link InputStream#read()} method unless the stream is closed.
277     *
278     * @return the byte read or {@link IOUtils#EOF EOF} if we reached the end of stream.
279     * @throws IOException if an I/O error occurs.
280     */
281    @Override
282    public int read() throws IOException {
283        try {
284            beforeRead(1);
285            final int b = in.read();
286            afterRead(b != EOF ? 1 : EOF);
287            return b;
288        } catch (final IOException e) {
289            handleIOException(e);
290            return EOF;
291        }
292    }
293
294    /**
295     * Invokes the delegate's {@link InputStream#read(byte[])} method.
296     *
297     * @param b the buffer to read the bytes into.
298     * @return the number of bytes read or {@link IOUtils#EOF EOF} if we reached the end of stream.
299     * @throws IOException
300     *                     <ul>
301     *                     <li>If the first byte cannot be read for any reason other than the end of the file,
302     *                     <li>if the input stream has been closed, or</li>
303     *                     <li>if some other I/O error occurs.</li>
304     *                     </ul>
305     */
306    @Override
307    public int read(final byte[] b) throws IOException {
308        try {
309            beforeRead(IOUtils.length(b));
310            final int n = in.read(b);
311            afterRead(n);
312            return n;
313        } catch (final IOException e) {
314            handleIOException(e);
315            return EOF;
316        }
317    }
318
319    /**
320     * Invokes the delegate's {@link InputStream#read(byte[], int, int)} method.
321     *
322     * @param b   the buffer to read the bytes into.
323     * @param off The start offset.
324     * @param len The number of bytes to read.
325     * @return the number of bytes read or {@link IOUtils#EOF EOF} if we reached the end of stream.
326     * @throws IOException
327     *                     <ul>
328     *                     <li>If the first byte cannot be read for any reason other than the end of the file,
329     *                     <li>if the input stream has been closed, or</li>
330     *                     <li>if some other I/O error occurs.</li>
331     *                     </ul>
332     */
333    @Override
334    public int read(final byte[] b, final int off, final int len) throws IOException {
335        try {
336            beforeRead(len);
337            final int n = in.read(b, off, len);
338            afterRead(n);
339            return n;
340        } catch (final IOException e) {
341            handleIOException(e);
342            return EOF;
343        }
344    }
345
346    /**
347     * Invokes the delegate's {@link InputStream#reset()} method.
348     *
349     * @throws IOException if this stream has not been marked or if the mark has been invalidated.
350     */
351    @Override
352    public synchronized void reset() throws IOException {
353        try {
354            in.reset();
355        } catch (final IOException e) {
356            handleIOException(e);
357        }
358    }
359
360    /**
361     * Sets the underlying input stream.
362     *
363     * @param in The input stream to set in {@link java.io.FilterInputStream#in}.
364     * @return {@code this} instance.
365     * @since 2.19.0
366     */
367    public ProxyInputStream setReference(final InputStream in) {
368        this.in = in;
369        return this;
370    }
371
372    /**
373     * Invokes the delegate's {@link InputStream#skip(long)} method.
374     *
375     * @param n the number of bytes to skip.
376     * @return the actual number of bytes skipped.
377     * @throws IOException if the stream does not support seek, or if some other I/O error occurs.
378     */
379    @Override
380    public long skip(final long n) throws IOException {
381        try {
382            return in.skip(n);
383        } catch (final IOException e) {
384            handleIOException(e);
385            return 0;
386        }
387    }
388
389    /**
390     * Unwraps this instance by returning the underlying {@link InputStream}.
391     * <p>
392     * Use with caution; useful to query the underlying {@link InputStream}.
393     * </p>
394     *
395     * @return the underlying {@link InputStream}.
396     * @since 2.16.0
397     */
398    public InputStream unwrap() {
399        return in;
400    }
401
402}