1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17 package org.apache.commons.vfs2.util;
18
19 import java.io.BufferedInputStream;
20 import java.io.IOException;
21 import java.io.InputStream;
22 import java.util.concurrent.atomic.AtomicBoolean;
23 import java.util.concurrent.atomic.AtomicLong;
24
25 /**
26 * An InputStream that provides buffering and end-of-stream monitoring.
27 */
28 public class MonitorInputStream extends BufferedInputStream {
29
30 private static final int EOF_CHAR = -1;
31 private final AtomicLong count = new AtomicLong();
32 private final AtomicBoolean closed = new AtomicBoolean();
33
34 /**
35 * Constructs a MonitorInputStream from the passed InputStream.
36 *
37 * @param in The input stream to wrap.
38 */
39 public MonitorInputStream(final InputStream in) {
40 super(in);
41 }
42
43 /**
44 * Constructs a MonitorInputStream from the passed InputStream and with the specified buffer size.
45 *
46 * @param in The input stream to wrap.
47 * @param bufferSize The buffer size to use.
48 * @since 2.4
49 */
50 public MonitorInputStream(final InputStream in, final int bufferSize) {
51 super(in, bufferSize);
52 }
53
54 /**
55 * Returns 0 if the stream is at EOF, else the underlying inputStream will be queried.
56 *
57 * @return The number of bytes that are available.
58 * @throws IOException if an error occurs.
59 * @since 2.0
60 */
61 @Override
62 public synchronized int available() throws IOException {
63 if (isClosed()) {
64 return 0;
65 }
66
67 return super.available();
68 }
69
70 /**
71 * Closes this input stream and releases any system resources associated with the stream.
72 *
73 * @throws IOException if an error occurs.
74 */
75 @Override
76 public void close() throws IOException {
77 final boolean alreadyClosed = closed.getAndSet(true);
78 if (alreadyClosed) {
79 return;
80 }
81
82 // Close the stream
83 IOException exc = null;
84 try {
85 closeSuper();
86 } catch (final IOException ioe) {
87 exc = ioe;
88 }
89
90 // Notify that the stream has been closed
91 try {
92 onClose();
93 } catch (final IOException ioe) {
94 exc = ioe;
95 }
96
97 if (exc != null) {
98 throw exc;
99 }
100 }
101
102 /**
103 * This method exists in order to allow overriding whether to actually close
104 * the underlying stream (VFS-805). There are cases where closing that stream will
105 * consume any amount of remaining data. In such cases closing a different
106 * entity instead (such as an HttpResponse) may be more appropriate.
107 * @throws IOException if an IO error occurs.
108 */
109 protected void closeSuper() throws IOException {
110 super.close();
111 }
112
113 /**
114 * Gets the number of bytes read by this input stream.
115 *
116 * @return The number of bytes read by this input stream.
117 */
118 public long getCount() {
119 return count.get();
120 }
121
122 private boolean isClosed() {
123 return closed.get();
124 }
125
126 /**
127 * Called after the stream has been closed. This implementation does nothing.
128 *
129 * @throws IOException if an error occurs.
130 */
131 protected void onClose() throws IOException {
132 // noop
133 }
134
135 /**
136 * Reads a character.
137 *
138 * @return The character that was read as an integer.
139 * @throws IOException if an IO error occurs.
140 */
141 @Override
142 public synchronized int read() throws IOException {
143 if (isClosed()) {
144 return EOF_CHAR;
145 }
146
147 final int ch = super.read();
148 if (ch != EOF_CHAR) {
149 count.incrementAndGet();
150 }
151
152 return ch;
153 }
154
155 /**
156 * Reads bytes from this input stream.
157 *
158 * @param buffer A byte array in which to place the characters read.
159 * @param offset The offset at which to start reading.
160 * @param length The maximum number of bytes to read.
161 * @return The number of bytes read.
162 * @throws IOException if an IO error occurs.
163 */
164 @Override
165 public synchronized int read(final byte[] buffer, final int offset, final int length) throws IOException {
166 if (isClosed()) {
167 return EOF_CHAR;
168 }
169
170 final int numRead = super.read(buffer, offset, length);
171 if (numRead != EOF_CHAR) {
172 count.addAndGet(numRead);
173 }
174 return numRead;
175 }
176 }