001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.commons.vfs2.util;
018
019import java.io.BufferedInputStream;
020import java.io.IOException;
021import java.io.InputStream;
022import java.util.concurrent.atomic.AtomicBoolean;
023import java.util.concurrent.atomic.AtomicLong;
024
025/**
026 * An InputStream that provides buffering and end-of-stream monitoring.
027 */
028public class MonitorInputStream extends BufferedInputStream {
029
030    private static final int EOF_CHAR = -1;
031    private final AtomicLong count = new AtomicLong();
032    private final AtomicBoolean closed = new AtomicBoolean();
033
034    /**
035     * Constructs a MonitorInputStream from the passed InputStream.
036     *
037     * @param in The input stream to wrap.
038     */
039    public MonitorInputStream(final InputStream in) {
040        super(in);
041    }
042
043    /**
044     * Constructs a MonitorInputStream from the passed InputStream and with the specified buffer size.
045     *
046     * @param in The input stream to wrap.
047     * @param bufferSize The buffer size to use.
048     * @since 2.4
049     */
050    public MonitorInputStream(final InputStream in, final int bufferSize) {
051        super(in, bufferSize);
052    }
053
054    /**
055     * Returns 0 if the stream is at EOF, else the underlying inputStream will be queried.
056     *
057     * @return The number of bytes that are available.
058     * @throws IOException if an error occurs.
059     * @since 2.0
060     */
061    @Override
062    public synchronized int available() throws IOException {
063        if (isClosed()) {
064            return 0;
065        }
066
067        return super.available();
068    }
069
070    /**
071     * Closes this input stream and releases any system resources associated with the stream.
072     *
073     * @throws IOException if an error occurs.
074     */
075    @Override
076    public void close() throws IOException {
077        final boolean alreadyClosed = closed.getAndSet(true);
078        if (alreadyClosed) {
079            return;
080        }
081
082        // Close the stream
083        IOException exc = null;
084        try {
085            closeSuper();
086        } catch (final IOException ioe) {
087            exc = ioe;
088        }
089
090        // Notify that the stream has been closed
091        try {
092            onClose();
093        } catch (final IOException ioe) {
094            exc = ioe;
095        }
096
097        if (exc != null) {
098            throw exc;
099        }
100    }
101
102    /**
103     * This method exists in order to allow overriding whether to actually close
104     * the underlying stream (VFS-805). There are cases where closing that stream will
105     * consume any amount of remaining data. In such cases closing a different
106     * entity instead (such as an HttpResponse) may be more appropriate.
107     * @throws IOException if an IO error occurs.
108     */
109    protected void closeSuper() throws IOException {
110        super.close();
111    }
112
113    /**
114     * Gets the number of bytes read by this input stream.
115     *
116     * @return The number of bytes read by this input stream.
117     */
118    public long getCount() {
119        return count.get();
120    }
121
122    private boolean isClosed() {
123        return closed.get();
124    }
125
126    /**
127     * Called after the stream has been closed. This implementation does nothing.
128     *
129     * @throws IOException if an error occurs.
130     */
131    protected void onClose() throws IOException {
132        // noop
133    }
134
135    /**
136     * Reads a character.
137     *
138     * @return The character that was read as an integer.
139     * @throws IOException if an IO error occurs.
140     */
141    @Override
142    public synchronized int read() throws IOException {
143        if (isClosed()) {
144            return EOF_CHAR;
145        }
146
147        final int ch = super.read();
148        if (ch != EOF_CHAR) {
149            count.incrementAndGet();
150        }
151
152        return ch;
153    }
154
155    /**
156     * Reads bytes from this input stream.
157     *
158     * @param buffer A byte array in which to place the characters read.
159     * @param offset The offset at which to start reading.
160     * @param length The maximum number of bytes to read.
161     * @return The number of bytes read.
162     * @throws IOException if an IO error occurs.
163     */
164    @Override
165    public synchronized int read(final byte[] buffer, final int offset, final int length) throws IOException {
166        if (isClosed()) {
167            return EOF_CHAR;
168        }
169
170        final int numRead = super.read(buffer, offset, length);
171        if (numRead != EOF_CHAR) {
172            count.addAndGet(numRead);
173        }
174        return numRead;
175    }
176}