1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17 package org.apache.commons.io.input;
18
19 import static org.apache.commons.io.IOUtils.EOF;
20
21 import java.io.FilterInputStream;
22 import java.io.IOException;
23 import java.io.InputStream;
24
25 import org.apache.commons.io.IOUtils;
26 import org.apache.commons.io.build.AbstractStreamBuilder;
27 import org.apache.commons.io.function.IOIntConsumer;
28
29 /**
30 * A proxy stream which acts as a {@link FilterInputStream}, by passing all method calls on to the proxied stream, not changing which methods are called.
31 * <p>
32 * It is an alternative base class to {@link FilterInputStream} to increase reusability, because {@link FilterInputStream} changes the methods being called,
33 * such as read(byte[]) to read(byte[], int, int).
34 * </p>
35 * <p>
36 * In addition, this class allows you to:
37 * </p>
38 * <ul>
39 * <li>notify a subclass that <em>n</em> bytes are about to be read through {@link #beforeRead(int)}</li>
40 * <li>notify a subclass that <em>n</em> bytes were read through {@link #afterRead(int)}</li>
41 * <li>notify a subclass that an exception was caught through {@link #handleIOException(IOException)}</li>
42 * <li>{@link #unwrap()} itself</li>
43 * </ul>
44 */
45 public abstract class ProxyInputStream extends FilterInputStream {
46
47 /**
48 * Abstracts builder properties for subclasses.
49 *
50 * @param <T> The InputStream type.
51 * @param <B> The builder type.
52 * @since 2.18.0
53 */
54 protected abstract static class AbstractBuilder<T, B extends AbstractStreamBuilder<T, B>> extends AbstractStreamBuilder<T, B> {
55
56 private IOIntConsumer afterRead;
57
58 /**
59 * Constructs a builder of {@code T}.
60 */
61 protected AbstractBuilder() {
62 // empty
63 }
64
65 /**
66 * Gets the {@link ProxyInputStream#afterRead(int)} consumer.
67 *
68 * @return the {@link ProxyInputStream#afterRead(int)} consumer.
69 */
70 public IOIntConsumer getAfterRead() {
71 return afterRead;
72 }
73
74 /**
75 * Sets the {@link ProxyInputStream#afterRead(int)} behavior, null resets to a NOOP.
76 * <p>
77 * Setting this value causes the {@link ProxyInputStream#afterRead(int) afterRead} method to delegate to the given consumer.
78 * </p>
79 * <p>
80 * If a subclass overrides {@link ProxyInputStream#afterRead(int) afterRead} and does not call {@code super.afterRead(int)}, then the given consumer is
81 * not called.
82 * </p>
83 * <p>
84 * This does <em>not</em> override a {@code ProxyInputStream} subclass' implementation of the {@link ProxyInputStream#afterRead(int)} method, it can
85 * supplement it.
86 * </p>
87 *
88 * @param afterRead the {@link ProxyInputStream#afterRead(int)} behavior.
89 * @return {@code this} instance.
90 */
91 public B setAfterRead(final IOIntConsumer afterRead) {
92 this.afterRead = afterRead;
93 return asThis();
94 }
95
96 }
97
98 /**
99 * Tracks whether {@link #close()} has been called or not.
100 */
101 private volatile boolean closed;
102
103 private final IOIntConsumer afterRead;
104
105 /**
106 * Constructs a new ProxyInputStream.
107 *
108 * @param builder How to build an instance.
109 * @throws IOException if an I/O error occurs.
110 * @since 2.18.0
111 */
112 @SuppressWarnings("resource")
113 protected ProxyInputStream(final AbstractBuilder<?, ?> builder) throws IOException {
114 // the delegate is stored in a protected superclass instance variable named 'in'.
115 this(builder.getInputStream(), builder);
116 }
117
118 /**
119 * Constructs a new ProxyInputStream.
120 *
121 * @param proxy the InputStream to proxy.
122 */
123 public ProxyInputStream(final InputStream proxy) {
124 // the delegate is stored in a protected superclass variable named 'in'.
125 super(proxy);
126 this.afterRead = IOIntConsumer.NOOP;
127 }
128
129 /**
130 * Constructs a new ProxyInputStream.
131 *
132 * @param proxy the InputStream to proxy.
133 * @param builder How to build an instance.
134 * @since 2.18.0
135 */
136 protected ProxyInputStream(final InputStream proxy, final AbstractBuilder<?, ?> builder) {
137 // the delegate is stored in a protected superclass instance variable named 'in'.
138 super(proxy);
139 this.afterRead = builder.getAfterRead() != null ? builder.getAfterRead() : IOIntConsumer.NOOP;
140 }
141
142 /**
143 * Called by the {@code read} methods after the proxied call has returned successfully. The argument is the number of bytes returned to the caller or
144 * {@link IOUtils#EOF EOF} if the end of stream was reached.
145 * <p>
146 * The default delegates to the consumer given to {@link AbstractBuilder#setAfterRead(IOIntConsumer)}.
147 * </p>
148 * <p>
149 * Alternatively, a subclasses can override this method to add post-processing functionality without having to override all the read methods.
150 * </p>
151 * <p>
152 * Note this method is <em>not</em> called from {@link #skip(long)} or {@link #reset()}. You need to explicitly override those methods if you want to add
153 * post-processing steps also to them.
154 * </p>
155 *
156 * @param n number of bytes read, or {@link IOUtils#EOF EOF} if the end of stream was reached.
157 * @throws IOException Thrown by a subclass or the consumer given to {@link AbstractBuilder#setAfterRead(IOIntConsumer)}.
158 * @since 2.0
159 */
160 protected void afterRead(final int n) throws IOException {
161 afterRead.accept(n);
162 }
163
164 /**
165 * Invokes the delegate's {@link InputStream#available()} method.
166 *
167 * @return the number of available bytes, 0 if the stream is closed.
168 * @throws IOException if an I/O error occurs.
169 */
170 @Override
171 public int available() throws IOException {
172 if (in != null && !isClosed()) {
173 try {
174 return in.available();
175 } catch (final IOException e) {
176 handleIOException(e);
177 }
178 }
179 return 0;
180 }
181
182 /**
183 * Invoked by the {@code read} methods before the call is proxied. The number
184 * of bytes that the caller wanted to read (1 for the {@link #read()}
185 * method, buffer length for {@link #read(byte[])}, etc.) is given as
186 * an argument.
187 * <p>
188 * Subclasses can override this method to add common pre-processing
189 * functionality without having to override all the read methods.
190 * The default implementation does nothing.
191 * </p>
192 * <p>
193 * Note this method is <em>not</em> called from {@link #skip(long)} or
194 * {@link #reset()}. You need to explicitly override those methods if
195 * you want to add pre-processing steps also to them.
196 * </p>
197 *
198 * @param n number of bytes that the caller asked to be read.
199 * @throws IOException if the pre-processing fails in a subclass.
200 * @since 2.0
201 */
202 @SuppressWarnings("unused") // Possibly thrown from subclasses.
203 protected void beforeRead(final int n) throws IOException {
204 // no-op default
205 }
206
207 /**
208 * Checks if this instance is closed and throws an IOException if so.
209 *
210 * @throws IOException if this instance is closed.
211 */
212 void checkOpen() throws IOException {
213 Input.checkOpen(!isClosed());
214 }
215
216 /**
217 * Invokes the delegate's {@link InputStream#close()} method.
218 *
219 * @throws IOException if an I/O error occurs.
220 */
221 @Override
222 public void close() throws IOException {
223 IOUtils.close(in, this::handleIOException);
224 closed = true;
225 }
226
227 /**
228 * Handles any IOExceptions thrown; by default, throws the given exception.
229 * <p>
230 * This method provides a point to implement custom exception
231 * handling. The default behavior is to re-throw the exception.
232 * </p>
233 *
234 * @param e The IOException thrown.
235 * @throws IOException if an I/O error occurs.
236 * @since 2.0
237 */
238 protected void handleIOException(final IOException e) throws IOException {
239 throw e;
240 }
241
242 /**
243 * Tests whether this instance is closed.
244 *
245 * @return whether this instance is closed.
246 */
247 boolean isClosed() {
248 return closed;
249 }
250
251 /**
252 * Invokes the delegate's {@link InputStream#mark(int)} method.
253 *
254 * @param readLimit read ahead limit.
255 */
256 @Override
257 public synchronized void mark(final int readLimit) {
258 if (in != null) {
259 in.mark(readLimit);
260 }
261 }
262
263 /**
264 * Invokes the delegate's {@link InputStream#markSupported()} method.
265 *
266 * @return {@code true} if this stream instance supports the mark and reset methods; {@code false} otherwise.
267 * @see #mark(int)
268 * @see #reset()
269 */
270 @Override
271 public boolean markSupported() {
272 return in != null && in.markSupported();
273 }
274
275 /**
276 * Invokes the delegate's {@link InputStream#read()} method unless the stream is closed.
277 *
278 * @return the byte read or {@link IOUtils#EOF EOF} if we reached the end of stream.
279 * @throws IOException if an I/O error occurs.
280 */
281 @Override
282 public int read() throws IOException {
283 try {
284 beforeRead(1);
285 final int b = in.read();
286 afterRead(b != EOF ? 1 : EOF);
287 return b;
288 } catch (final IOException e) {
289 handleIOException(e);
290 return EOF;
291 }
292 }
293
294 /**
295 * Invokes the delegate's {@link InputStream#read(byte[])} method.
296 *
297 * @param b the buffer to read the bytes into.
298 * @return the number of bytes read or {@link IOUtils#EOF EOF} if we reached the end of stream.
299 * @throws IOException
300 * <ul>
301 * <li>If the first byte cannot be read for any reason other than the end of the file,
302 * <li>if the input stream has been closed, or</li>
303 * <li>if some other I/O error occurs.</li>
304 * </ul>
305 */
306 @Override
307 public int read(final byte[] b) throws IOException {
308 try {
309 beforeRead(IOUtils.length(b));
310 final int n = in.read(b);
311 afterRead(n);
312 return n;
313 } catch (final IOException e) {
314 handleIOException(e);
315 return EOF;
316 }
317 }
318
319 /**
320 * Invokes the delegate's {@link InputStream#read(byte[], int, int)} method.
321 *
322 * @param b the buffer to read the bytes into.
323 * @param off The start offset.
324 * @param len The number of bytes to read.
325 * @return the number of bytes read or {@link IOUtils#EOF EOF} if we reached the end of stream.
326 * @throws IOException
327 * <ul>
328 * <li>If the first byte cannot be read for any reason other than the end of the file,
329 * <li>if the input stream has been closed, or</li>
330 * <li>if some other I/O error occurs.</li>
331 * </ul>
332 */
333 @Override
334 public int read(final byte[] b, final int off, final int len) throws IOException {
335 try {
336 beforeRead(len);
337 final int n = in.read(b, off, len);
338 afterRead(n);
339 return n;
340 } catch (final IOException e) {
341 handleIOException(e);
342 return EOF;
343 }
344 }
345
346 /**
347 * Invokes the delegate's {@link InputStream#reset()} method.
348 *
349 * @throws IOException if this stream has not been marked or if the mark has been invalidated.
350 */
351 @Override
352 public synchronized void reset() throws IOException {
353 try {
354 in.reset();
355 } catch (final IOException e) {
356 handleIOException(e);
357 }
358 }
359
360 /**
361 * Sets the underlying input stream.
362 *
363 * @param in The input stream to set in {@link java.io.FilterInputStream#in}.
364 * @return {@code this} instance.
365 * @since 2.19.0
366 */
367 public ProxyInputStream setReference(final InputStream in) {
368 this.in = in;
369 return this;
370 }
371
372 /**
373 * Invokes the delegate's {@link InputStream#skip(long)} method.
374 *
375 * @param n the number of bytes to skip.
376 * @return the actual number of bytes skipped.
377 * @throws IOException if the stream does not support seek, or if some other I/O error occurs.
378 */
379 @Override
380 public long skip(final long n) throws IOException {
381 try {
382 return in.skip(n);
383 } catch (final IOException e) {
384 handleIOException(e);
385 return 0;
386 }
387 }
388
389 /**
390 * Unwraps this instance by returning the underlying {@link InputStream}.
391 * <p>
392 * Use with caution; useful to query the underlying {@link InputStream}.
393 * </p>
394 *
395 * @return the underlying {@link InputStream}.
396 * @since 2.16.0
397 */
398 public InputStream unwrap() {
399 return in;
400 }
401
402 }