001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 *  contributor license agreements.  See the NOTICE file distributed with
004 *  this work for additional information regarding copyright ownership.
005 *  The ASF licenses this file to You under the Apache License, Version 2.0
006 *  (the "License"); you may not use this file except in compliance with
007 *  the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 *  Unless required by applicable law or agreed to in writing, software
012 *  distributed under the License is distributed on an "AS IS" BASIS,
013 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 *  See the License for the specific language governing permissions and
015 *  limitations under the License.
016 */
017
018package org.apache.commons.exec;
019
020import java.io.IOException;
021import java.io.InputStream;
022import java.io.OutputStream;
023import java.io.PipedOutputStream;
024import java.time.Duration;
025import java.time.Instant;
026import java.util.concurrent.Executors;
027import java.util.concurrent.ThreadFactory;
028
029import org.apache.commons.exec.util.DebugUtils;
030
031/**
032 * Copies standard output and error of sub-processes to standard output and error of the parent process. If output or error stream are set to null, any feedback
033 * from that stream will be lost.
034 */
035public class PumpStreamHandler implements ExecuteStreamHandler {
036
037    private static final Duration STOP_TIMEOUT_ADDITION = Duration.ofSeconds(2);
038
039    private Thread outputThread;
040
041    private Thread errorThread;
042
043    private Thread inputThread;
044
045    private final OutputStream outputStream;
046
047    private final OutputStream errorOutputStream;
048
049    private final InputStream inputStream;
050
051    private InputStreamPumper inputStreamPumper;
052
053    /** The timeout Duration the implementation waits when stopping the pumper threads. */
054    private Duration stopTimeout = Duration.ZERO;
055
056    /** The last exception being caught. */
057    private IOException caught;
058
059    /**
060     * The thread factory.
061     */
062    private final ThreadFactory threadFactory;
063
064    /**
065     * Constructs a new {@link PumpStreamHandler}.
066     */
067    public PumpStreamHandler() {
068        this(System.out, System.err);
069    }
070
071    /**
072     * Constructs a new {@link PumpStreamHandler}.
073     *
074     * @param allOutputStream the output/error {@link OutputStream}.
075     */
076    public PumpStreamHandler(final OutputStream allOutputStream) {
077        this(allOutputStream, allOutputStream);
078    }
079
080    /**
081     * Constructs a new {@link PumpStreamHandler}.
082     *
083     * @param outputStream      the output {@link OutputStream}.
084     * @param errorOutputStream the error {@link OutputStream}.
085     */
086    public PumpStreamHandler(final OutputStream outputStream, final OutputStream errorOutputStream) {
087        this(outputStream, errorOutputStream, null);
088    }
089
090    /**
091     * Constructs a new {@link PumpStreamHandler}.
092     *
093     * @param outputStream      the output {@link OutputStream}.
094     * @param errorOutputStream the error {@link OutputStream}.
095     * @param inputStream       the input {@link InputStream}.
096     */
097    public PumpStreamHandler(final OutputStream outputStream, final OutputStream errorOutputStream, final InputStream inputStream) {
098        this(Executors.defaultThreadFactory(), outputStream, errorOutputStream, inputStream);
099    }
100
101    /**
102     * Constructs a new {@link PumpStreamHandler}.
103     *
104     * @param outputStream      the output {@link OutputStream}.
105     * @param errorOutputStream the error {@link OutputStream}.
106     * @param inputStream       the input {@link InputStream}.
107     */
108    private PumpStreamHandler(final ThreadFactory threadFactory, final OutputStream outputStream, final OutputStream errorOutputStream,
109            final InputStream inputStream) {
110        this.threadFactory = threadFactory;
111        this.outputStream = outputStream;
112        this.errorOutputStream = errorOutputStream;
113        this.inputStream = inputStream;
114    }
115
116    /**
117     * Create the pump to handle error output.
118     *
119     * @param is the {@link InputStream}.
120     * @param os the {@link OutputStream}.
121     */
122    protected void createProcessErrorPump(final InputStream is, final OutputStream os) {
123        errorThread = createPump(is, os);
124    }
125
126    /**
127     * Create the pump to handle process output.
128     *
129     * @param is the {@link InputStream}.
130     * @param os the {@link OutputStream}.
131     */
132    protected void createProcessOutputPump(final InputStream is, final OutputStream os) {
133        outputThread = createPump(is, os);
134    }
135
136    /**
137     * Creates a stream pumper to copy the given input stream to the given output stream. When the 'os' is an PipedOutputStream we are closing 'os' afterwards
138     * to avoid an IOException ("Write end dead").
139     *
140     * @param is the input stream to copy from.
141     * @param os the output stream to copy into.
142     * @return the stream pumper thread.
143     */
144    protected Thread createPump(final InputStream is, final OutputStream os) {
145        return createPump(is, os, os instanceof PipedOutputStream);
146    }
147
148    /**
149     * Creates a stream pumper to copy the given input stream to the given output stream.
150     *
151     * @param is                 the input stream to copy from.
152     * @param os                 the output stream to copy into.
153     * @param closeWhenExhausted close the output stream when the input stream is exhausted.
154     * @return the stream pumper thread.
155     */
156    protected Thread createPump(final InputStream is, final OutputStream os, final boolean closeWhenExhausted) {
157        return ThreadUtil.newThread(threadFactory, new StreamPumper(is, os, closeWhenExhausted), "CommonsExecStreamPumper-", true);
158    }
159
160    /**
161     * Creates a stream pumper to copy the given input stream to the given output stream.
162     *
163     * @param is the System.in input stream to copy from.
164     * @param os the output stream to copy into.
165     * @return the stream pumper thread.
166     */
167    private Thread createSystemInPump(final InputStream is, final OutputStream os) {
168        inputStreamPumper = new InputStreamPumper(is, os);
169        return ThreadUtil.newThread(threadFactory, inputStreamPumper, "CommonsExecStreamPumper-", true);
170    }
171
172    /**
173     * Gets the error stream.
174     *
175     * @return {@link OutputStream}.
176     */
177    protected OutputStream getErr() {
178        return errorOutputStream;
179    }
180
181    /**
182     * Gets the output stream.
183     *
184     * @return {@link OutputStream}.
185     */
186    protected OutputStream getOut() {
187        return outputStream;
188    }
189
190    Duration getStopTimeout() {
191        return stopTimeout;
192    }
193
194    /**
195     * Sets the {@link InputStream} from which to read the standard error of the process.
196     *
197     * @param is the {@link InputStream}.
198     */
199    @Override
200    public void setProcessErrorStream(final InputStream is) {
201        if (errorOutputStream != null) {
202            createProcessErrorPump(is, errorOutputStream);
203        }
204    }
205
206    /**
207     * Sets the {@link OutputStream} by means of which input can be sent to the process.
208     *
209     * @param os the {@link OutputStream}.
210     */
211    @Override
212    public void setProcessInputStream(final OutputStream os) {
213        if (inputStream != null) {
214            if (inputStream == System.in) {
215                inputThread = createSystemInPump(inputStream, os);
216            } else {
217                inputThread = createPump(inputStream, os, true);
218            }
219        } else {
220            try {
221                os.close();
222            } catch (final IOException e) {
223                final String msg = "Got exception while closing output stream";
224                DebugUtils.handleException(msg, e);
225            }
226        }
227    }
228
229    /**
230     * Sets the {@link InputStream} from which to read the standard output of the process.
231     *
232     * @param is the {@link InputStream}.
233     */
234    @Override
235    public void setProcessOutputStream(final InputStream is) {
236        if (outputStream != null) {
237            createProcessOutputPump(is, outputStream);
238        }
239    }
240
241    /**
242     * Sets maximum time to wait until output streams are exhausted when {@link #stop()} was called.
243     *
244     * @param timeout timeout or zero to wait forever (default).
245     * @since 1.4.0
246     */
247    public void setStopTimeout(final Duration timeout) {
248        this.stopTimeout = timeout != null ? timeout : Duration.ZERO;
249    }
250
251    /**
252     * Sets maximum time to wait until output streams are exhausted when {@link #stop()} was called.
253     *
254     * @param timeout timeout in milliseconds or zero to wait forever (default).
255     * @deprecated Use {@link #setStopTimeout(Duration)}.
256     */
257    @Deprecated
258    public void setStopTimeout(final long timeout) {
259        this.stopTimeout = Duration.ofMillis(timeout);
260    }
261
262    /**
263     * Starts the {@link Thread}s.
264     */
265    @Override
266    public void start() {
267        start(outputThread);
268        start(errorThread);
269        start(inputThread);
270    }
271
272    /**
273     * Starts the given {@link Thread}.
274     */
275    private void start(final Thread thread) {
276        if (thread != null) {
277            thread.start();
278        }
279    }
280
281    /**
282     * Stops pumping the streams. When a timeout is specified it is not guaranteed that the pumper threads are cleanly terminated.
283     */
284    @Override
285    public void stop() throws IOException {
286        if (inputStreamPumper != null) {
287            inputStreamPumper.stopProcessing();
288        }
289        stop(outputThread, stopTimeout);
290        stop(errorThread, stopTimeout);
291        stop(inputThread, stopTimeout);
292
293        if (errorOutputStream != null && errorOutputStream != outputStream) {
294            try {
295                errorOutputStream.flush();
296            } catch (final IOException e) {
297                final String msg = "Got exception while flushing the error stream : " + e.getMessage();
298                DebugUtils.handleException(msg, e);
299            }
300        }
301
302        if (outputStream != null) {
303            try {
304                outputStream.flush();
305            } catch (final IOException e) {
306                final String msg = "Got exception while flushing the output stream";
307                DebugUtils.handleException(msg, e);
308            }
309        }
310
311        if (caught != null) {
312            throw caught;
313        }
314    }
315
316    /**
317     * Stops a pumper thread. The implementation actually waits longer than specified in 'timeout' to detect if the timeout was indeed exceeded. If the timeout
318     * was exceeded an IOException is created to be thrown to the caller.
319     *
320     * @param thread  the thread to be stopped.
321     * @param timeout the time in ms to wait to join.
322     */
323    private void stop(final Thread thread, final Duration timeout) {
324        if (thread != null) {
325            try {
326                if (timeout.equals(Duration.ZERO)) {
327                    thread.join();
328                } else {
329                    final Duration timeToWait = timeout.plus(STOP_TIMEOUT_ADDITION);
330                    final Instant startTime = Instant.now();
331                    thread.join(timeToWait.toMillis());
332                    if (Instant.now().isAfter(startTime.plus(timeToWait))) {
333                        caught = new ExecuteException("The stop timeout of " + timeout + " ms was exceeded", Executor.INVALID_EXITVALUE);
334                    }
335                }
336            } catch (final InterruptedException e) {
337                thread.interrupt();
338            }
339        }
340    }
341
342    /**
343     * Stops a pumper thread. The implementation actually waits longer than specified in 'timeout' to detect if the timeout was indeed exceeded. If the timeout
344     * was exceeded an IOException is created to be thrown to the caller.
345     *
346     * @param thread        the thread to be stopped.
347     * @param timeoutMillis the time in ms to wait to join.
348     */
349    protected void stopThread(final Thread thread, final long timeoutMillis) {
350        stop(thread, Duration.ofMillis(timeoutMillis));
351    }
352}