001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.commons.io.output;
018
019import java.io.IOException;
020import java.io.OutputStream;
021
022import org.apache.commons.io.function.IOConsumer;
023import org.apache.commons.io.function.IOFunction;
024
025/**
026 * An output stream which triggers an event when a specified number of bytes of data have been written to it. The event
027 * can be used, for example, to throw an exception if a maximum has been reached, or to switch the underlying stream
028 * type when the threshold is exceeded.
029 * <p>
030 * This class overrides all {@link OutputStream} methods. However, these overrides ultimately call the corresponding
031 * methods in the underlying output stream implementation.
032 * </p>
033 * <p>
034 * NOTE: This implementation may trigger the event <em>before</em> the threshold is actually reached, since it triggers
035 * when a pending write operation would cause the threshold to be exceeded.
036 * </p>
037 */
038public class ThresholdingOutputStream extends OutputStream {
039
040    /**
041     * Noop output stream getter function.
042     */
043    private static final IOFunction<ThresholdingOutputStream, OutputStream> NOOP_OS_GETTER = os -> NullOutputStream.INSTANCE;
044
045    /**
046     * The threshold at which the event will be triggered.
047     */
048    private final int threshold;
049
050    /**
051     * Accepts reaching the threshold.
052     */
053    private final IOConsumer<ThresholdingOutputStream> thresholdConsumer;
054
055    /**
056     * Gets the output stream.
057     */
058    private final IOFunction<ThresholdingOutputStream, OutputStream> outputStreamGetter;
059
060    /**
061     * The number of bytes written to the output stream.
062     */
063    private long written;
064
065    /**
066     * Whether or not the configured threshold has been exceeded.
067     */
068    private boolean thresholdExceeded;
069
070    /**
071     * Constructs an instance of this class which will trigger an event at the specified threshold.
072     *
073     * @param threshold The number of bytes at which to trigger an event.
074     */
075    public ThresholdingOutputStream(final int threshold) {
076        this(threshold, IOConsumer.noop(), NOOP_OS_GETTER);
077    }
078
079    /**
080     * Constructs an instance of this class which will trigger an event at the specified threshold.
081     *
082     * @param threshold The number of bytes at which to trigger an event.
083     * @param thresholdConsumer Accepts reaching the threshold.
084     * @param outputStreamGetter Gets the output stream.
085     * @since 2.9.0
086     */
087    public ThresholdingOutputStream(final int threshold, final IOConsumer<ThresholdingOutputStream> thresholdConsumer,
088        final IOFunction<ThresholdingOutputStream, OutputStream> outputStreamGetter) {
089        this.threshold = threshold;
090        this.thresholdConsumer = thresholdConsumer == null ? IOConsumer.noop() : thresholdConsumer;
091        this.outputStreamGetter = outputStreamGetter == null ? NOOP_OS_GETTER : outputStreamGetter;
092    }
093
094    /**
095     * Checks to see if writing the specified number of bytes would cause the configured threshold to be exceeded. If
096     * so, triggers an event to allow a concrete implementation to take action on this.
097     *
098     * @param count The number of bytes about to be written to the underlying output stream.
099     *
100     * @throws IOException if an error occurs.
101     */
102    protected void checkThreshold(final int count) throws IOException {
103        if (!thresholdExceeded && written + count > threshold) {
104            thresholdExceeded = true;
105            thresholdReached();
106        }
107    }
108
109    /**
110     * Closes this output stream and releases any system resources associated with this stream.
111     *
112     * @throws IOException if an error occurs.
113     */
114    @Override
115    public void close() throws IOException {
116        try {
117            flush();
118        } catch (final IOException ignored) {
119            // ignore
120        }
121        // TODO for 4.0: Replace with getOutputStream()
122        getStream().close();
123    }
124
125    /**
126     * Flushes this output stream and forces any buffered output bytes to be written out.
127     *
128     * @throws IOException if an error occurs.
129     */
130    @SuppressWarnings("resource") // the underlying stream is managed by a subclass.
131    @Override
132    public void flush() throws IOException {
133        // TODO for 4.0: Replace with getOutputStream()
134        getStream().flush();
135    }
136
137    /**
138     * Gets the number of bytes that have been written to this output stream.
139     *
140     * @return The number of bytes written.
141     */
142    public long getByteCount() {
143        return written;
144    }
145
146    /**
147     * Gets the underlying output stream, to which the corresponding {@link OutputStream} methods in this class will
148     * ultimately delegate.
149     *
150     * @return The underlying output stream.
151     * @throws IOException if an error occurs.
152     * @since 2.14.0
153     */
154    protected OutputStream getOutputStream() throws IOException {
155        return outputStreamGetter.apply(this);
156    }
157
158    /**
159     * Gets the underlying output stream, to which the corresponding {@link OutputStream} methods in this class will
160     * ultimately delegate.
161     *
162     * @return The underlying output stream.
163     * @throws IOException if an error occurs.
164     * @deprecated Use {@link #getOutputStream()}.
165     */
166    @Deprecated
167    protected OutputStream getStream() throws IOException {
168        return getOutputStream();
169    }
170
171    /**
172     * Gets the threshold, in bytes, at which an event will be triggered.
173     *
174     * @return The threshold point, in bytes.
175     */
176    public int getThreshold() {
177        return threshold;
178    }
179
180    /**
181     * Tests whether or not the configured threshold has been exceeded for this output stream.
182     *
183     * @return {@code true} if the threshold has been reached; {@code false} otherwise.
184     */
185    public boolean isThresholdExceeded() {
186        return written > threshold;
187    }
188
189    /**
190     * Resets the byteCount to zero. You can call this from {@link #thresholdReached()} if you want the event to be
191     * triggered again.
192     */
193    protected void resetByteCount() {
194        this.thresholdExceeded = false;
195        this.written = 0;
196    }
197
198    /**
199     * Sets the byteCount to count. Useful for re-opening an output stream that has previously been written to.
200     *
201     * @param count The number of bytes that have already been written to the output stream
202     *
203     * @since 2.5
204     */
205    protected void setByteCount(final long count) {
206        this.written = count;
207    }
208
209    /**
210     * Indicates that the configured threshold has been reached, and that a subclass should take whatever action
211     * necessary on this event. This may include changing the underlying output stream.
212     *
213     * @throws IOException if an error occurs.
214     */
215    protected void thresholdReached() throws IOException {
216        thresholdConsumer.accept(this);
217    }
218
219    /**
220     * Writes {@code b.length} bytes from the specified byte array to this output stream.
221     *
222     * @param b The array of bytes to be written.
223     *
224     * @throws IOException if an error occurs.
225     */
226    @SuppressWarnings("resource") // the underlying stream is managed by a subclass.
227    @Override
228    public void write(final byte[] b) throws IOException {
229        checkThreshold(b.length);
230        // TODO for 4.0: Replace with getOutputStream()
231        getStream().write(b);
232        written += b.length;
233    }
234
235    /**
236     * Writes {@code len} bytes from the specified byte array starting at offset {@code off} to this output stream.
237     *
238     * @param b The byte array from which the data will be written.
239     * @param off The start offset in the byte array.
240     * @param len The number of bytes to write.
241     *
242     * @throws IOException if an error occurs.
243     */
244    @SuppressWarnings("resource") // the underlying stream is managed by a subclass.
245    @Override
246    public void write(final byte[] b, final int off, final int len) throws IOException {
247        checkThreshold(len);
248        // TODO for 4.0: Replace with getOutputStream()
249        getStream().write(b, off, len);
250        written += len;
251    }
252
253    /**
254     * Writes the specified byte to this output stream.
255     *
256     * @param b The byte to be written.
257     *
258     * @throws IOException if an error occurs.
259     */
260    @SuppressWarnings("resource") // the underlying stream is managed by a subclass.
261    @Override
262    public void write(final int b) throws IOException {
263        checkThreshold(1);
264        // TODO for 4.0: Replace with getOutputStream()
265        getStream().write(b);
266        written++;
267    }
268}