001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   https://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019
020package org.apache.commons.exec;
021
022import java.io.IOException;
023import java.io.InputStream;
024import java.io.OutputStream;
025import java.io.PipedOutputStream;
026import java.time.Duration;
027import java.time.Instant;
028import java.util.concurrent.Executors;
029import java.util.concurrent.ThreadFactory;
030
031import org.apache.commons.exec.util.DebugUtils;
032
033/**
034 * Copies standard output and error of sub-processes to standard output and error of the parent process. If output or error stream are set to null, any feedback
035 * from that stream will be lost.
036 */
037public class PumpStreamHandler implements ExecuteStreamHandler {
038
039    /** Three seconds timeout. */
040    private static final Duration STOP_TIMEOUT_ADDITION = Duration.ofSeconds(2);
041
042    /**
043     * Output thread.
044     */
045    private Thread outputThread;
046
047    /**
048     * Error thread.
049     */
050    private Thread errorThread;
051
052    /** Input thread. */
053    private Thread inputThread;
054
055    /** Output stream. */
056    private final OutputStream outputStream;
057
058    /** Error output stream. */
059    private final OutputStream errorOutputStream;
060
061    /** Error input stream. */
062    private final InputStream inputStream;
063
064    /** Pumper input stream. */
065    private InputStreamPumper inputStreamPumper;
066
067    /** The timeout Duration the implementation waits when stopping the pumper threads. */
068    private Duration stopTimeout = Duration.ZERO;
069
070    /** The last exception being caught. */
071    private IOException caught;
072
073    /**
074     * The thread factory.
075     */
076    private final ThreadFactory threadFactory;
077
078    /**
079     * Constructs a new {@link PumpStreamHandler}.
080     */
081    public PumpStreamHandler() {
082        this(System.out, System.err);
083    }
084
085    /**
086     * Constructs a new {@link PumpStreamHandler}.
087     *
088     * @param allOutputStream the output/error {@link OutputStream}. The {@code OutputStream}
089     *      implementation must be thread-safe because the output and error reader threads will
090     *      concurrently write to it.
091     */
092    public PumpStreamHandler(final OutputStream allOutputStream) {
093        this(allOutputStream, allOutputStream);
094    }
095
096    /**
097     * Constructs a new {@link PumpStreamHandler}.
098     *
099     * <p>If the same {@link OutputStream} instance is used for output and error, then it must be
100     * thread-safe because the output and error reader threads will concurrently write to it.
101     *
102     * @param outputStream      the output {@link OutputStream}.
103     * @param errorOutputStream the error {@link OutputStream}.
104     */
105    public PumpStreamHandler(final OutputStream outputStream, final OutputStream errorOutputStream) {
106        this(outputStream, errorOutputStream, null);
107    }
108
109    /**
110     * Constructs a new {@link PumpStreamHandler}.
111     *
112     * <p>If the same {@link OutputStream} instance is used for output and error, then it must be
113     * thread-safe because the output and error reader threads will concurrently write to it.
114     *
115     * @param outputStream      the output {@link OutputStream}.
116     * @param errorOutputStream the error {@link OutputStream}.
117     * @param inputStream       the input {@link InputStream}.
118     */
119    public PumpStreamHandler(final OutputStream outputStream, final OutputStream errorOutputStream, final InputStream inputStream) {
120        this(Executors.defaultThreadFactory(), outputStream, errorOutputStream, inputStream);
121    }
122
123    /**
124     * Constructs a new {@link PumpStreamHandler}.
125     *
126     * <p>If the same {@link OutputStream} instance is used for output and error, then it must be
127     * thread-safe because the output and error reader threads will concurrently write to it.
128     *
129     * @param outputStream      the output {@link OutputStream}.
130     * @param errorOutputStream the error {@link OutputStream}.
131     * @param inputStream       the input {@link InputStream}.
132     */
133    private PumpStreamHandler(final ThreadFactory threadFactory, final OutputStream outputStream, final OutputStream errorOutputStream,
134            final InputStream inputStream) {
135        this.threadFactory = threadFactory;
136        this.outputStream = outputStream;
137        this.errorOutputStream = errorOutputStream;
138        this.inputStream = inputStream;
139    }
140
141    /**
142     * Create the pump to handle error output.
143     *
144     * @param is the {@link InputStream}.
145     * @param os the {@link OutputStream}.
146     */
147    protected void createProcessErrorPump(final InputStream is, final OutputStream os) {
148        errorThread = createPump(is, os);
149    }
150
151    /**
152     * Create the pump to handle process output.
153     *
154     * @param is the {@link InputStream}.
155     * @param os the {@link OutputStream}.
156     */
157    protected void createProcessOutputPump(final InputStream is, final OutputStream os) {
158        outputThread = createPump(is, os);
159    }
160
161    /**
162     * Creates a stream pumper to copy the given input stream to the given output stream. When the 'os' is an PipedOutputStream we are closing 'os' afterward
163     * to avoid an IOException ("Write end dead").
164     *
165     * @param is the input stream to copy from.
166     * @param os the output stream to copy into.
167     * @return the stream pumper thread.
168     */
169    protected Thread createPump(final InputStream is, final OutputStream os) {
170        return createPump(is, os, os instanceof PipedOutputStream);
171    }
172
173    /**
174     * Creates a stream pumper to copy the given input stream to the given output stream.
175     *
176     * @param is                 the input stream to copy from.
177     * @param os                 the output stream to copy into.
178     * @param closeWhenExhausted close the output stream when the input stream is exhausted.
179     * @return the stream pumper thread.
180     */
181    protected Thread createPump(final InputStream is, final OutputStream os, final boolean closeWhenExhausted) {
182        return ThreadUtil.newThread(threadFactory, new StreamPumper(is, os, closeWhenExhausted), "CommonsExecStreamPumper-", true);
183    }
184
185    /**
186     * Creates a stream pumper to copy the given input stream to the given output stream.
187     *
188     * @param is the System.in input stream to copy from.
189     * @param os the output stream to copy into.
190     * @return the stream pumper thread.
191     */
192    private Thread createSystemInPump(final InputStream is, final OutputStream os) {
193        inputStreamPumper = new InputStreamPumper(is, os);
194        return ThreadUtil.newThread(threadFactory, inputStreamPumper, "CommonsExecStreamPumper-", true);
195    }
196
197    /**
198     * Gets the error stream.
199     *
200     * @return {@link OutputStream}.
201     */
202    protected OutputStream getErr() {
203        return errorOutputStream;
204    }
205
206    /**
207     * Gets the output stream.
208     *
209     * @return {@link OutputStream}.
210     */
211    protected OutputStream getOut() {
212        return outputStream;
213    }
214
215    Duration getStopTimeout() {
216        return stopTimeout;
217    }
218
219    /**
220     * Sets the {@link InputStream} from which to read the standard error of the process.
221     *
222     * @param is the {@link InputStream}.
223     */
224    @Override
225    public void setProcessErrorStream(final InputStream is) {
226        if (errorOutputStream != null) {
227            createProcessErrorPump(is, errorOutputStream);
228        }
229    }
230
231    /**
232     * Sets the {@link OutputStream} by means of which input can be sent to the process.
233     *
234     * @param os the {@link OutputStream}.
235     */
236    @Override
237    public void setProcessInputStream(final OutputStream os) {
238        if (inputStream != null) {
239            if (inputStream == System.in) {
240                inputThread = createSystemInPump(inputStream, os);
241            } else {
242                inputThread = createPump(inputStream, os, true);
243            }
244        } else {
245            try {
246                os.close();
247            } catch (final IOException e) {
248                final String msg = "Got exception while closing output stream";
249                DebugUtils.handleException(msg, e);
250            }
251        }
252    }
253
254    /**
255     * Sets the {@link InputStream} from which to read the standard output of the process.
256     *
257     * @param is the {@link InputStream}.
258     */
259    @Override
260    public void setProcessOutputStream(final InputStream is) {
261        if (outputStream != null) {
262            createProcessOutputPump(is, outputStream);
263        }
264    }
265
266    /**
267     * Sets maximum time to wait until output streams are exhausted when {@link #stop()} was called.
268     *
269     * @param timeout timeout or zero to wait forever (default).
270     * @since 1.4.0
271     */
272    public void setStopTimeout(final Duration timeout) {
273        this.stopTimeout = timeout != null ? timeout : Duration.ZERO;
274    }
275
276    /**
277     * Sets maximum time to wait until output streams are exhausted when {@link #stop()} was called.
278     *
279     * @param timeout timeout in milliseconds or zero to wait forever (default).
280     * @deprecated Use {@link #setStopTimeout(Duration)}.
281     */
282    @Deprecated
283    public void setStopTimeout(final long timeout) {
284        this.stopTimeout = Duration.ofMillis(timeout);
285    }
286
287    /**
288     * Starts the {@link Thread}s.
289     */
290    @Override
291    public void start() {
292        start(outputThread);
293        start(errorThread);
294        start(inputThread);
295    }
296
297    /**
298     * Starts the given {@link Thread}.
299     */
300    private void start(final Thread thread) {
301        if (thread != null) {
302            thread.start();
303        }
304    }
305
306    /**
307     * Stops pumping the streams. When a timeout is specified it is not guaranteed that the pumper threads are cleanly terminated.
308     */
309    @Override
310    public void stop() throws IOException {
311        if (inputStreamPumper != null) {
312            inputStreamPumper.stopProcessing();
313        }
314        stop(outputThread, stopTimeout);
315        stop(errorThread, stopTimeout);
316        stop(inputThread, stopTimeout);
317
318        if (errorOutputStream != null && errorOutputStream != outputStream) {
319            try {
320                errorOutputStream.flush();
321            } catch (final IOException e) {
322                final String msg = "Got exception while flushing the error stream : " + e.getMessage();
323                DebugUtils.handleException(msg, e);
324            }
325        }
326
327        if (outputStream != null) {
328            try {
329                outputStream.flush();
330            } catch (final IOException e) {
331                final String msg = "Got exception while flushing the output stream";
332                DebugUtils.handleException(msg, e);
333            }
334        }
335
336        if (caught != null) {
337            throw caught;
338        }
339    }
340
341    /**
342     * Stops a pumper thread. The implementation actually waits longer than specified in 'timeout' to detect if the timeout was indeed exceeded. If the timeout
343     * was exceeded an IOException is created to be thrown to the caller.
344     *
345     * @param thread  the thread to be stopped.
346     * @param timeout the time in ms to wait to join.
347     */
348    private void stop(final Thread thread, final Duration timeout) {
349        if (thread != null) {
350            try {
351                if (timeout.equals(Duration.ZERO)) {
352                    thread.join();
353                } else {
354                    final Duration timeToWait = timeout.plus(STOP_TIMEOUT_ADDITION);
355                    final Instant startTime = Instant.now();
356                    thread.join(timeToWait.toMillis());
357                    if (Instant.now().isAfter(startTime.plus(timeToWait))) {
358                        caught = new ExecuteException("The stop timeout of " + timeout + " ms was exceeded", Executor.INVALID_EXITVALUE);
359                    }
360                }
361            } catch (final InterruptedException e) {
362                thread.interrupt();
363            }
364        }
365    }
366
367    /**
368     * Stops a pumper thread. The implementation actually waits longer than specified in 'timeout' to detect if the timeout was indeed exceeded. If the timeout
369     * was exceeded an IOException is created to be thrown to the caller.
370     *
371     * @param thread        the thread to be stopped.
372     * @param timeoutMillis the time in ms to wait to join.
373     */
374    protected void stopThread(final Thread thread, final long timeoutMillis) {
375        stop(thread, Duration.ofMillis(timeoutMillis));
376    }
377}