001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.commons.vfs2.util; 018 019import java.io.FilterInputStream; 020import java.io.IOException; 021import java.io.InputStream; 022import java.util.concurrent.atomic.AtomicBoolean; 023import java.util.concurrent.atomic.AtomicLong; 024 025/** 026 * An InputStream that provides end-of-stream monitoring. 027 * <p> 028 * This is the same as {@link MonitorInputStream} but without the buffering. 029 * </p> 030 * 031 * @since 2.5.0 032 */ 033public class RawMonitorInputStream extends FilterInputStream { 034 035 private static final int EOF_CHAR = -1; 036 private final AtomicBoolean finished = new AtomicBoolean(); 037 private final AtomicLong atomicCount = new AtomicLong(); 038 039// @Override 040// public synchronized void reset() throws IOException { 041// if (!finished.get()) { 042// super.reset(); 043// } 044// } 045// 046// @Override 047// public synchronized long skip(long n) throws IOException { 048// if (finished.get()) { 049// return 0; 050// } 051// return super.skip(n); 052// } 053 054 /** 055 * Constructs a MonitorInputStream from the passed InputStream. 056 * 057 * @param inputStream The input stream to wrap. 058 */ 059 public RawMonitorInputStream(final InputStream inputStream) { 060 super(inputStream); 061 } 062 063 /** 064 * Returns 0 if the stream is at EOF, else the underlying inputStream will be queried. 065 * 066 * @return The number of bytes that are available. 067 * @throws IOException if an error occurs. 068 */ 069 @Override 070 public synchronized int available() throws IOException { 071 if (finished.get()) { 072 return 0; 073 } 074 075 return super.available(); 076 } 077 078 /** 079 * Closes this input stream and releases any system resources associated with the stream. 080 * 081 * @throws IOException if an error occurs. 082 */ 083 @Override 084 public void close() throws IOException { 085 final boolean closed = finished.getAndSet(true); 086 if (closed) { 087 return; 088 } 089 090 // Close the stream 091 IOException exc = null; 092 try { 093 super.close(); 094 } catch (final IOException ioe) { 095 exc = ioe; 096 } 097 098 // Notify that the stream has been closed 099 try { 100 onClose(); 101 } catch (final IOException ioe) { 102 exc = ioe; 103 } 104 105 if (exc != null) { 106 throw exc; 107 } 108 } 109 110 /** 111 * Gets the number of bytes read by this input stream. 112 * 113 * @return The number of bytes read by this input stream. 114 */ 115 public long getCount() { 116 return atomicCount.get(); 117 } 118 119 @Override 120 public synchronized void mark(final int readLimit) { 121 // TODO Auto-generated method stub 122 super.mark(readLimit); 123 } 124 125 /** 126 * Called after the stream has been closed. This implementation does nothing. 127 * 128 * @throws IOException if an error occurs. 129 */ 130 protected void onClose() throws IOException { 131 // noop 132 } 133 134 /** 135 * Reads a character. 136 * 137 * @return The character that was read as an integer. 138 * @throws IOException if an error occurs. 139 */ 140 @Override 141 public int read() throws IOException { // lgtm [java/non-sync-override] 142 if (finished.get()) { 143 return EOF_CHAR; 144 } 145 146 final int ch = super.read(); 147 if (ch != EOF_CHAR) { 148 atomicCount.incrementAndGet(); 149 } 150 151 return ch; 152 } 153 154 /** 155 * Reads bytes from this input stream. 156 * 157 * @param buffer A byte array in which to place the characters read. 158 * @param offset The offset at which to start reading. 159 * @param length The maximum number of bytes to read. 160 * @return The number of bytes read. 161 * @throws IOException if an error occurs. 162 */ 163 @Override 164 public int read(final byte[] buffer, final int offset, final int length) throws IOException { // lgtm [java/non-sync-override] 165 if (finished.get()) { 166 return EOF_CHAR; 167 } 168 169 final int nread = super.read(buffer, offset, length); 170 if (nread != EOF_CHAR) { 171 atomicCount.addAndGet(nread); 172 } 173 return nread; 174 } 175}