001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.commons.vfs2.util; 018 019import java.io.BufferedInputStream; 020import java.io.IOException; 021import java.io.InputStream; 022import java.util.concurrent.atomic.AtomicBoolean; 023import java.util.concurrent.atomic.AtomicLong; 024 025/** 026 * An InputStream that provides buffering and end-of-stream monitoring. 027 */ 028public class MonitorInputStream extends BufferedInputStream { 029 030 private static final int EOF_CHAR = -1; 031 private final AtomicLong count = new AtomicLong(); 032 private final AtomicBoolean closed = new AtomicBoolean(); 033 034 /** 035 * Constructs a MonitorInputStream from the passed InputStream. 036 * 037 * @param in The input stream to wrap. 038 */ 039 public MonitorInputStream(final InputStream in) { 040 super(in); 041 } 042 043 /** 044 * Constructs a MonitorInputStream from the passed InputStream and with the specified buffer size. 045 * 046 * @param in The input stream to wrap. 047 * @param bufferSize The buffer size to use. 048 * @since 2.4 049 */ 050 public MonitorInputStream(final InputStream in, final int bufferSize) { 051 super(in, bufferSize); 052 } 053 054 /** 055 * Returns 0 if the stream is at EOF, else the underlying inputStream will be queried. 056 * 057 * @return The number of bytes that are available. 058 * @throws IOException if an error occurs. 059 * @since 2.0 060 */ 061 @Override 062 public synchronized int available() throws IOException { 063 if (isClosed()) { 064 return 0; 065 } 066 067 return super.available(); 068 } 069 070 /** 071 * Closes this input stream and releases any system resources associated with the stream. 072 * 073 * @throws IOException if an error occurs. 074 */ 075 @Override 076 public void close() throws IOException { 077 final boolean alreadyClosed = closed.getAndSet(true); 078 if (alreadyClosed) { 079 return; 080 } 081 082 // Close the stream 083 IOException exc = null; 084 try { 085 closeSuper(); 086 } catch (final IOException ioe) { 087 exc = ioe; 088 } 089 090 // Notify that the stream has been closed 091 try { 092 onClose(); 093 } catch (final IOException ioe) { 094 exc = ioe; 095 } 096 097 if (exc != null) { 098 throw exc; 099 } 100 } 101 102 /** 103 * This method exists in order to allow overriding whether to actually close 104 * the underlying stream (VFS-805). There are cases where closing that stream will 105 * consume any amount of remaining data. In such cases closing a different 106 * entity instead (such as an HttpResponse) may be more appropriate. 107 * @throws IOException if an IO error occurs. 108 */ 109 protected void closeSuper() throws IOException { 110 super.close(); 111 } 112 113 /** 114 * Gets the number of bytes read by this input stream. 115 * 116 * @return The number of bytes read by this input stream. 117 */ 118 public long getCount() { 119 return count.get(); 120 } 121 122 private boolean isClosed() { 123 return closed.get(); 124 } 125 126 /** 127 * Called after the stream has been closed. This implementation does nothing. 128 * 129 * @throws IOException if an error occurs. 130 */ 131 protected void onClose() throws IOException { 132 // noop 133 } 134 135 /** 136 * Reads a character. 137 * 138 * @return The character that was read as an integer. 139 * @throws IOException if an IO error occurs. 140 */ 141 @Override 142 public synchronized int read() throws IOException { 143 if (isClosed()) { 144 return EOF_CHAR; 145 } 146 147 final int ch = super.read(); 148 if (ch != EOF_CHAR) { 149 count.incrementAndGet(); 150 } 151 152 return ch; 153 } 154 155 /** 156 * Reads bytes from this input stream. 157 * 158 * @param buffer A byte array in which to place the characters read. 159 * @param offset The offset at which to start reading. 160 * @param length The maximum number of bytes to read. 161 * @return The number of bytes read. 162 * @throws IOException if an IO error occurs. 163 */ 164 @Override 165 public synchronized int read(final byte[] buffer, final int offset, final int length) throws IOException { 166 if (isClosed()) { 167 return EOF_CHAR; 168 } 169 170 final int numRead = super.read(buffer, offset, length); 171 if (numRead != EOF_CHAR) { 172 count.addAndGet(numRead); 173 } 174 return numRead; 175 } 176}