View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.commons.vfs2.util;
18  
19  import java.io.BufferedInputStream;
20  import java.io.IOException;
21  import java.io.InputStream;
22  import java.util.concurrent.atomic.AtomicBoolean;
23  import java.util.concurrent.atomic.AtomicLong;
24  
25  /**
26   * An InputStream that provides buffering and end-of-stream monitoring.
27   */
28  public class MonitorInputStream extends BufferedInputStream {
29  
30      private static final int EOF_CHAR = -1;
31      private final AtomicLong atomicCount = new AtomicLong();
32      private final AtomicBoolean finished = new AtomicBoolean(false);
33  
34      /**
35       * Constructs a MonitorInputStream from the passed InputStream
36       *
37       * @param in The input stream to wrap.
38       */
39      public MonitorInputStream(final InputStream in) {
40          super(in);
41      }
42  
43      /**
44       * Constructs a MonitorInputStream from the passed InputStream and with the specified buffer size
45       *
46       * @param in The input stream to wrap.
47       * @param bufferSize The buffer size to use.
48       * @since 2.4
49       */
50      public MonitorInputStream(final InputStream in, final int bufferSize) {
51          super(in, bufferSize);
52      }
53  
54      /**
55       * Returns 0 if the stream is at EOF, else the underlying inputStream will be queried.
56       *
57       * @return The number of bytes that are available.
58       * @throws IOException if an error occurs.
59       * @since 2.0
60       */
61      @Override
62      public synchronized int available() throws IOException {
63          if (finished.get()) {
64              return 0;
65          }
66  
67          return super.available();
68      }
69  
70      /**
71       * Closes this input stream and releases any system resources associated with the stream.
72       *
73       * @throws IOException if an error occurs.
74       */
75      @Override
76      public void close() throws IOException {
77          final boolean closed = finished.getAndSet(true);
78          if (closed) {
79              return;
80          }
81  
82          // Close the stream
83          IOException exc = null;
84          try {
85              closeSuper();
86          } catch (final IOException ioe) {
87              exc = ioe;
88          }
89  
90          // Notify that the stream has been closed
91          try {
92              onClose();
93          } catch (final IOException ioe) {
94              exc = ioe;
95          }
96  
97          if (exc != null) {
98              throw exc;
99          }
100     }
101 
102     /**
103      * Gets the number of bytes read by this input stream.
104      *
105      * @return The number of bytes read by this input stream.
106      */
107     public long getCount() {
108         return atomicCount.get();
109     }
110 
111     /**
112      * This method exists in order to allow overriding whether to actually close
113      * the underlying stream (VFS-805). There are cases where closing that stream will
114      * consume any amount of remaining data. In such cases closing a different
115      * entity instead (such as an HttpResponse) may be more appropriate.
116      * @throws IOException if an IO error occurs.
117      */
118     protected void closeSuper() throws IOException {
119         super.close();
120     }
121 
122     /**
123      * Called after the stream has been closed. This implementation does nothing.
124      *
125      * @throws IOException if an error occurs.
126      */
127     protected void onClose() throws IOException {
128         // noop
129     }
130 
131     /**
132      * Reads a character.
133      *
134      * @return The character that was read as an integer.
135      * @throws IOException if an IO error occurs.
136      */
137     @Override
138     public int read() throws IOException { // lgtm [java/non-sync-override]
139         if (finished.get()) {
140             return EOF_CHAR;
141         }
142 
143         final int ch = super.read();
144         if (ch != EOF_CHAR) {
145             atomicCount.incrementAndGet();
146         }
147 
148         return ch;
149     }
150 
151     /**
152      * Reads bytes from this input stream.
153      *
154      * @param buffer A byte array in which to place the characters read.
155      * @param offset The offset at which to start reading.
156      * @param length The maximum number of bytes to read.
157      * @return The number of bytes read.
158      * @throws IOException if an IO error occurs.
159      */
160     @Override
161     public int read(final byte[] buffer, final int offset, final int length) throws IOException { // lgtm [java/non-sync-override]
162         if (finished.get()) {
163             return EOF_CHAR;
164         }
165 
166         final int nread = super.read(buffer, offset, length);
167         if (nread != EOF_CHAR) {
168             atomicCount.addAndGet(nread);
169         }
170         return nread;
171     }
172 }