001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.commons.vfs2.util;
018
019import java.io.FilterInputStream;
020import java.io.IOException;
021import java.io.InputStream;
022import java.util.concurrent.atomic.AtomicBoolean;
023import java.util.concurrent.atomic.AtomicLong;
024
025/**
026 * An InputStream that provides end-of-stream monitoring.
027 * <p>
028 * This is the same as {@link MonitorInputStream} but without the buffering.
029 * </p>
030 *
031 * @since 2.5.0
032 */
033public class RawMonitorInputStream extends FilterInputStream {
034
035    private static final int EOF_CHAR = -1;
036    private final AtomicBoolean finished = new AtomicBoolean();
037    private final AtomicLong atomicCount = new AtomicLong();
038
039//    @Override
040//    public synchronized void reset() throws IOException {
041//        if (!finished.get()) {
042//            super.reset();
043//        }
044//    }
045//
046//    @Override
047//    public synchronized long skip(long n) throws IOException {
048//        if (finished.get()) {
049//            return 0;
050//        }
051//        return super.skip(n);
052//    }
053
054    /**
055     * Constructs a MonitorInputStream from the passed InputStream.
056     *
057     * @param inputStream The input stream to wrap.
058     */
059    public RawMonitorInputStream(final InputStream inputStream) {
060        super(inputStream);
061    }
062
063    /**
064     * Returns 0 if the stream is at EOF, else the underlying inputStream will be queried.
065     *
066     * @return The number of bytes that are available.
067     * @throws IOException if an error occurs.
068     */
069    @Override
070    public synchronized int available() throws IOException {
071        if (finished.get()) {
072            return 0;
073        }
074
075        return super.available();
076    }
077
078    /**
079     * Closes this input stream and releases any system resources associated with the stream.
080     *
081     * @throws IOException if an error occurs.
082     */
083    @Override
084    public void close() throws IOException {
085        final boolean closed = finished.getAndSet(true);
086        if (closed) {
087            return;
088        }
089
090        // Close the stream
091        IOException exc = null;
092        try {
093            super.close();
094        } catch (final IOException ioe) {
095            exc = ioe;
096        }
097
098        // Notify that the stream has been closed
099        try {
100            onClose();
101        } catch (final IOException ioe) {
102            exc = ioe;
103        }
104
105        if (exc != null) {
106            throw exc;
107        }
108    }
109
110    /**
111     * Gets the number of bytes read by this input stream.
112     *
113     * @return The number of bytes read by this input stream.
114     */
115    public long getCount() {
116        return atomicCount.get();
117    }
118
119    @Override
120    public synchronized void mark(final int readLimit) {
121        // TODO Auto-generated method stub
122        super.mark(readLimit);
123    }
124
125    /**
126     * Called after the stream has been closed. This implementation does nothing.
127     *
128     * @throws IOException if an error occurs.
129     */
130    protected void onClose() throws IOException {
131        // noop
132    }
133
134    /**
135     * Reads a character.
136     *
137     * @return The character that was read as an integer.
138     * @throws IOException if an error occurs.
139     */
140    @Override
141    public int read() throws IOException { // lgtm [java/non-sync-override]
142        if (finished.get()) {
143            return EOF_CHAR;
144        }
145
146        final int ch = super.read();
147        if (ch != EOF_CHAR) {
148            atomicCount.incrementAndGet();
149        }
150
151        return ch;
152    }
153
154    /**
155     * Reads bytes from this input stream.
156     *
157     * @param buffer A byte array in which to place the characters read.
158     * @param offset The offset at which to start reading.
159     * @param length The maximum number of bytes to read.
160     * @return The number of bytes read.
161     * @throws IOException if an error occurs.
162     */
163    @Override
164    public int read(final byte[] buffer, final int offset, final int length) throws IOException { // lgtm [java/non-sync-override]
165        if (finished.get()) {
166            return EOF_CHAR;
167        }
168
169        final int nread = super.read(buffer, offset, length);
170        if (nread != EOF_CHAR) {
171            atomicCount.addAndGet(nread);
172        }
173        return nread;
174    }
175}