View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.commons.vfs2.util;
18  
19  import java.io.FilterInputStream;
20  import java.io.IOException;
21  import java.io.InputStream;
22  import java.util.concurrent.atomic.AtomicBoolean;
23  import java.util.concurrent.atomic.AtomicLong;
24  
25  /**
26   * An InputStream that provides end-of-stream monitoring.
27   * <p>
28   * This is the same as {@link MonitorInputStream} but without the buffering.
29   * </p>
30   *
31   * @since 2.5.0
32   */
33  public class RawMonitorInputStream extends FilterInputStream {
34  
35      private static final int EOF_CHAR = -1;
36      private final AtomicBoolean finished = new AtomicBoolean(false);
37      private final AtomicLong atomicCount = new AtomicLong();
38  
39  //    @Override
40  //    public synchronized void reset() throws IOException {
41  //        if (!finished.get()) {
42  //            super.reset();
43  //        }
44  //    }
45  //
46  //    @Override
47  //    public synchronized long skip(long n) throws IOException {
48  //        if (finished.get()) {
49  //            return 0;
50  //        }
51  //        return super.skip(n);
52  //    }
53  
54      /**
55       * Constructs a MonitorInputStream from the passed InputStream
56       *
57       * @param inputStream The input stream to wrap.
58       */
59      public RawMonitorInputStream(final InputStream inputStream) {
60          super(inputStream);
61      }
62  
63      /**
64       * Returns 0 if the stream is at EOF, else the underlying inputStream will be queried.
65       *
66       * @return The number of bytes that are available.
67       * @throws IOException if an error occurs.
68       */
69      @Override
70      public synchronized int available() throws IOException {
71          if (finished.get()) {
72              return 0;
73          }
74  
75          return super.available();
76      }
77  
78      /**
79       * Reads a character.
80       *
81       * @return The character that was read as an integer.
82       * @throws IOException if an error occurs.
83       */
84      @Override
85      public int read() throws IOException { // lgtm [java/non-sync-override]
86          if (finished.get()) {
87              return EOF_CHAR;
88          }
89  
90          final int ch = super.read();
91          if (ch != EOF_CHAR) {
92              atomicCount.incrementAndGet();
93          }
94  
95          return ch;
96      }
97  
98      /**
99       * Reads bytes from this input stream.
100      *
101      * @param buffer A byte array in which to place the characters read.
102      * @param offset The offset at which to start reading.
103      * @param length The maximum number of bytes to read.
104      * @return The number of bytes read.
105      * @throws IOException if an error occurs.
106      */
107     @Override
108     public int read(final byte[] buffer, final int offset, final int length) throws IOException { // lgtm [java/non-sync-override]
109         if (finished.get()) {
110             return EOF_CHAR;
111         }
112 
113         final int nread = super.read(buffer, offset, length);
114         if (nread != EOF_CHAR) {
115             atomicCount.addAndGet(nread);
116         }
117         return nread;
118     }
119 
120     /**
121      * Closes this input stream and releases any system resources associated with the stream.
122      *
123      * @throws IOException if an error occurs.
124      */
125     @Override
126     public void close() throws IOException {
127         final boolean closed = finished.getAndSet(true);
128         if (closed) {
129             return;
130         }
131 
132         // Close the stream
133         IOException exc = null;
134         try {
135             super.close();
136         } catch (final IOException ioe) {
137             exc = ioe;
138         }
139 
140         // Notify that the stream has been closed
141         try {
142             onClose();
143         } catch (final IOException ioe) {
144             exc = ioe;
145         }
146 
147         if (exc != null) {
148             throw exc;
149         }
150     }
151 
152     /**
153      * Called after the stream has been closed. This implementation does nothing.
154      *
155      * @throws IOException if an error occurs.
156      */
157     protected void onClose() throws IOException {
158         // noop
159     }
160 
161     /**
162      * Gets the number of bytes read by this input stream.
163      *
164      * @return The number of bytes read by this input stream.
165      */
166     public long getCount() {
167         return atomicCount.get();
168     }
169 
170     @Override
171     public synchronized void mark(final int readlimit) {
172         // TODO Auto-generated method stub
173         super.mark(readlimit);
174     }
175 }