1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17 package org.apache.commons.vfs2.util;
18
19 import java.io.FilterInputStream;
20 import java.io.IOException;
21 import java.io.InputStream;
22 import java.util.concurrent.atomic.AtomicBoolean;
23 import java.util.concurrent.atomic.AtomicLong;
24
25 /**
26 * An InputStream that provides end-of-stream monitoring.
27 * <p>
28 * This is the same as {@link MonitorInputStream} but without the buffering.
29 * </p>
30 *
31 * @since 2.5.0
32 */
33 public class RawMonitorInputStream extends FilterInputStream {
34
35 private static final int EOF_CHAR = -1;
36 private final AtomicBoolean finished = new AtomicBoolean();
37 private final AtomicLong atomicCount = new AtomicLong();
38
39 // @Override
40 // public synchronized void reset() throws IOException {
41 // if (!finished.get()) {
42 // super.reset();
43 // }
44 // }
45 //
46 // @Override
47 // public synchronized long skip(long n) throws IOException {
48 // if (finished.get()) {
49 // return 0;
50 // }
51 // return super.skip(n);
52 // }
53
54 /**
55 * Constructs a MonitorInputStream from the passed InputStream.
56 *
57 * @param inputStream The input stream to wrap.
58 */
59 public RawMonitorInputStream(final InputStream inputStream) {
60 super(inputStream);
61 }
62
63 /**
64 * Returns 0 if the stream is at EOF, else the underlying inputStream will be queried.
65 *
66 * @return The number of bytes that are available.
67 * @throws IOException if an error occurs.
68 */
69 @Override
70 public synchronized int available() throws IOException {
71 if (finished.get()) {
72 return 0;
73 }
74
75 return super.available();
76 }
77
78 /**
79 * Closes this input stream and releases any system resources associated with the stream.
80 *
81 * @throws IOException if an error occurs.
82 */
83 @Override
84 public void close() throws IOException {
85 final boolean closed = finished.getAndSet(true);
86 if (closed) {
87 return;
88 }
89
90 // Close the stream
91 IOException exc = null;
92 try {
93 super.close();
94 } catch (final IOException ioe) {
95 exc = ioe;
96 }
97
98 // Notify that the stream has been closed
99 try {
100 onClose();
101 } catch (final IOException ioe) {
102 exc = ioe;
103 }
104
105 if (exc != null) {
106 throw exc;
107 }
108 }
109
110 /**
111 * Gets the number of bytes read by this input stream.
112 *
113 * @return The number of bytes read by this input stream.
114 */
115 public long getCount() {
116 return atomicCount.get();
117 }
118
119 @Override
120 public synchronized void mark(final int readLimit) {
121 // TODO Auto-generated method stub
122 super.mark(readLimit);
123 }
124
125 /**
126 * Called after the stream has been closed. This implementation does nothing.
127 *
128 * @throws IOException if an error occurs.
129 */
130 protected void onClose() throws IOException {
131 // noop
132 }
133
134 /**
135 * Reads a character.
136 *
137 * @return The character that was read as an integer.
138 * @throws IOException if an error occurs.
139 */
140 @Override
141 public int read() throws IOException { // lgtm [java/non-sync-override]
142 if (finished.get()) {
143 return EOF_CHAR;
144 }
145
146 final int ch = super.read();
147 if (ch != EOF_CHAR) {
148 atomicCount.incrementAndGet();
149 }
150
151 return ch;
152 }
153
154 /**
155 * Reads bytes from this input stream.
156 *
157 * @param buffer A byte array in which to place the characters read.
158 * @param offset The offset at which to start reading.
159 * @param length The maximum number of bytes to read.
160 * @return The number of bytes read.
161 * @throws IOException if an error occurs.
162 */
163 @Override
164 public int read(final byte[] buffer, final int offset, final int length) throws IOException { // lgtm [java/non-sync-override]
165 if (finished.get()) {
166 return EOF_CHAR;
167 }
168
169 final int nread = super.read(buffer, offset, length);
170 if (nread != EOF_CHAR) {
171 atomicCount.addAndGet(nread);
172 }
173 return nread;
174 }
175 }