001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.commons.io.input;
018
019import static org.apache.commons.io.IOUtils.EOF;
020
021import java.io.IOException;
022import java.io.InputStream;
023
024/**
025 * This is a stream that will only supply bytes up to a certain length - if its
026 * position goes above that, it will stop.
027 * <p>
028 * This is useful to wrap ServletInputStreams. The ServletInputStream will block
029 * if you try to read content from it that isn't there, because it doesn't know
030 * whether the content hasn't arrived yet or whether the content has finished.
031 * So, one of these, initialized with the Content-length sent in the
032 * ServletInputStream's header, will stop it blocking, providing it's been sent
033 * with a correct content length.
034 *
035 * @version $Id: BoundedInputStream.java 1586342 2014-04-10 15:36:29Z ggregory $
036 * @since 2.0
037 */
038public class BoundedInputStream extends InputStream {
039
040    /** the wrapped input stream */
041    private final InputStream in;
042
043    /** the max length to provide */
044    private final long max;
045
046    /** the number of bytes already returned */
047    private long pos = 0;
048
049    /** the marked position */
050    private long mark = EOF;
051
052    /** flag if close shoud be propagated */
053    private boolean propagateClose = true;
054
055    /**
056     * Creates a new <code>BoundedInputStream</code> that wraps the given input
057     * stream and limits it to a certain size.
058     *
059     * @param in The wrapped input stream
060     * @param size The maximum number of bytes to return
061     */
062    public BoundedInputStream(final InputStream in, final long size) {
063        // Some badly designed methods - eg the servlet API - overload length
064        // such that "-1" means stream finished
065        this.max = size;
066        this.in = in;
067    }
068
069    /**
070     * Creates a new <code>BoundedInputStream</code> that wraps the given input
071     * stream and is unlimited.
072     *
073     * @param in The wrapped input stream
074     */
075    public BoundedInputStream(final InputStream in) {
076        this(in, EOF);
077    }
078
079    /**
080     * Invokes the delegate's <code>read()</code> method if
081     * the current position is less than the limit.
082     * @return the byte read or -1 if the end of stream or
083     * the limit has been reached.
084     * @throws IOException if an I/O error occurs
085     */
086    @Override
087    public int read() throws IOException {
088        if (max >= 0 && pos >= max) {
089            return EOF;
090        }
091        final int result = in.read();
092        pos++;
093        return result;
094    }
095
096    /**
097     * Invokes the delegate's <code>read(byte[])</code> method.
098     * @param b the buffer to read the bytes into
099     * @return the number of bytes read or -1 if the end of stream or
100     * the limit has been reached.
101     * @throws IOException if an I/O error occurs
102     */
103    @Override
104    public int read(final byte[] b) throws IOException {
105        return this.read(b, 0, b.length);
106    }
107
108    /**
109     * Invokes the delegate's <code>read(byte[], int, int)</code> method.
110     * @param b the buffer to read the bytes into
111     * @param off The start offset
112     * @param len The number of bytes to read
113     * @return the number of bytes read or -1 if the end of stream or
114     * the limit has been reached.
115     * @throws IOException if an I/O error occurs
116     */
117    @Override
118    public int read(final byte[] b, final int off, final int len) throws IOException {
119        if (max>=0 && pos>=max) {
120            return EOF;
121        }
122        final long maxRead = max>=0 ? Math.min(len, max-pos) : len;
123        final int bytesRead = in.read(b, off, (int)maxRead);
124
125        if (bytesRead==EOF) {
126            return EOF;
127        }
128
129        pos+=bytesRead;
130        return bytesRead;
131    }
132
133    /**
134     * Invokes the delegate's <code>skip(long)</code> method.
135     * @param n the number of bytes to skip
136     * @return the actual number of bytes skipped
137     * @throws IOException if an I/O error occurs
138     */
139    @Override
140    public long skip(final long n) throws IOException {
141        final long toSkip = max>=0 ? Math.min(n, max-pos) : n;
142        final long skippedBytes = in.skip(toSkip);
143        pos+=skippedBytes;
144        return skippedBytes;
145    }
146
147    /**
148     * {@inheritDoc}
149     */
150    @Override
151    public int available() throws IOException {
152        if (max>=0 && pos>=max) {
153            return 0;
154        }
155        return in.available();
156    }
157
158    /**
159     * Invokes the delegate's <code>toString()</code> method.
160     * @return the delegate's <code>toString()</code>
161     */
162    @Override
163    public String toString() {
164        return in.toString();
165    }
166
167    /**
168     * Invokes the delegate's <code>close()</code> method
169     * if {@link #isPropagateClose()} is {@code true}.
170     * @throws IOException if an I/O error occurs
171     */
172    @Override
173    public void close() throws IOException {
174        if (propagateClose) {
175            in.close();
176        }
177    }
178
179    /**
180     * Invokes the delegate's <code>reset()</code> method.
181     * @throws IOException if an I/O error occurs
182     */
183    @Override
184    public synchronized void reset() throws IOException {
185        in.reset();
186        pos = mark;
187    }
188
189    /**
190     * Invokes the delegate's <code>mark(int)</code> method.
191     * @param readlimit read ahead limit
192     */
193    @Override
194    public synchronized void mark(final int readlimit) {
195        in.mark(readlimit);
196        mark = pos;
197    }
198
199    /**
200     * Invokes the delegate's <code>markSupported()</code> method.
201     * @return true if mark is supported, otherwise false
202     */
203    @Override
204    public boolean markSupported() {
205        return in.markSupported();
206    }
207
208    /**
209     * Indicates whether the {@link #close()} method
210     * should propagate to the underling {@link InputStream}.
211     *
212     * @return {@code true} if calling {@link #close()}
213     * propagates to the <code>close()</code> method of the
214     * underlying stream or {@code false} if it does not.
215     */
216    public boolean isPropagateClose() {
217        return propagateClose;
218    }
219
220    /**
221     * Set whether the {@link #close()} method
222     * should propagate to the underling {@link InputStream}.
223     *
224     * @param propagateClose {@code true} if calling
225     * {@link #close()} propagates to the <code>close()</code>
226     * method of the underlying stream or
227     * {@code false} if it does not.
228     */
229    public void setPropagateClose(final boolean propagateClose) {
230        this.propagateClose = propagateClose;
231    }
232}