001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   https://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019
020package org.apache.commons.exec;
021
022import java.io.IOException;
023import java.io.InputStream;
024import java.io.OutputStream;
025
026import org.apache.commons.exec.util.DebugUtils;
027
028/**
029 * Copies all data from an input stream to an output stream.
030 */
031public class StreamPumper implements Runnable {
032
033    /** The default size of the internal buffer for copying the streams. */
034    private static final int DEFAULT_SIZE = 1024;
035
036    /** The input stream to pump from. */
037    private final InputStream is;
038
039    /** The output stream to pmp into. */
040    private final OutputStream os;
041
042    /** The size of the internal buffer for copying the streams. */
043    private final int size;
044
045    /** Was the end of the stream reached. */
046    private boolean finished;
047
048    /** Close the output stream when exhausted. */
049    private final boolean closeWhenExhausted;
050
051    /**
052     * Constructs a new stream pumper.
053     *
054     * @param is input stream to read data from.
055     * @param os output stream to write data to.
056     */
057    public StreamPumper(final InputStream is, final OutputStream os) {
058        this(is, os, false);
059    }
060
061    /**
062     * Constructs a new stream pumper.
063     *
064     * @param is                 input stream to read data from.
065     * @param os                 output stream to write data to.
066     * @param closeWhenExhausted if true, the output stream will be closed when the input is exhausted.
067     */
068    public StreamPumper(final InputStream is, final OutputStream os, final boolean closeWhenExhausted) {
069        this.is = is;
070        this.os = os;
071        this.size = DEFAULT_SIZE;
072        this.closeWhenExhausted = closeWhenExhausted;
073    }
074
075    /**
076     * Constructs a new stream pumper.
077     *
078     * @param is                 input stream to read data from.
079     * @param os                 output stream to write data to.
080     * @param closeWhenExhausted if true, the output stream will be closed when the input is exhausted.
081     * @param size               the size of the internal buffer for copying the streams.
082     */
083    public StreamPumper(final InputStream is, final OutputStream os, final boolean closeWhenExhausted, final int size) {
084        this.is = is;
085        this.os = os;
086        this.size = size > 0 ? size : DEFAULT_SIZE;
087        this.closeWhenExhausted = closeWhenExhausted;
088    }
089
090    /**
091     * Tests whether the end of the stream has been reached.
092     *
093     * @return true is the stream has been exhausted.
094     */
095    public synchronized boolean isFinished() {
096        return finished;
097    }
098
099    /**
100     * Copies data from the input stream to the output stream. Terminates as soon as the input stream is closed or an error occurs.
101     */
102    @Override
103    public void run() {
104        synchronized (this) {
105            // Just in case this object is reused in the future
106            finished = false;
107        }
108
109        final byte[] buf = new byte[this.size];
110
111        int length;
112        try {
113            while ((length = is.read(buf)) > 0) {
114                os.write(buf, 0, length);
115            }
116        } catch (final Exception ignored) {
117            // nothing to do - happens quite often with watchdog
118        } finally {
119            if (closeWhenExhausted) {
120                try {
121                    os.close();
122                } catch (final IOException e) {
123                    final String msg = "Got exception while closing exhausted output stream";
124                    DebugUtils.handleException(msg, e);
125                }
126            }
127            synchronized (this) {
128                finished = true;
129                notifyAll();
130            }
131        }
132    }
133
134    /**
135     * This method blocks until the stream pumper finishes.
136     *
137     * @throws InterruptedException if any thread interrupted the current thread before or while the current thread was waiting for a notification.
138     * @see #isFinished()
139     */
140    public synchronized void waitFor() throws InterruptedException {
141        while (!isFinished()) {
142            wait();
143        }
144    }
145}