001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 *  contributor license agreements.  See the NOTICE file distributed with
004 *  this work for additional information regarding copyright ownership.
005 *  The ASF licenses this file to You under the Apache License, Version 2.0
006 *  (the "License"); you may not use this file except in compliance with
007 *  the License.  You may obtain a copy of the License at
008 *
009 *      https://www.apache.org/licenses/LICENSE-2.0
010 *
011 *  Unless required by applicable law or agreed to in writing, software
012 *  distributed under the License is distributed on an "AS IS" BASIS,
013 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 *  See the License for the specific language governing permissions and
015 *  limitations under the License.
016 */
017
018package org.apache.commons.exec;
019
020import java.io.IOException;
021import java.io.InputStream;
022import java.io.OutputStream;
023import java.io.PipedOutputStream;
024import java.time.Duration;
025import java.time.Instant;
026import java.util.concurrent.Executors;
027import java.util.concurrent.ThreadFactory;
028
029import org.apache.commons.exec.util.DebugUtils;
030
031/**
032 * Copies standard output and error of sub-processes to standard output and error of the parent process. If output or error stream are set to null, any feedback
033 * from that stream will be lost.
034 */
035public class PumpStreamHandler implements ExecuteStreamHandler {
036
037    private static final Duration STOP_TIMEOUT_ADDITION = Duration.ofSeconds(2);
038
039    private Thread outputThread;
040
041    private Thread errorThread;
042
043    private Thread inputThread;
044
045    private final OutputStream outputStream;
046
047    private final OutputStream errorOutputStream;
048
049    private final InputStream inputStream;
050
051    private InputStreamPumper inputStreamPumper;
052
053    /** The timeout Duration the implementation waits when stopping the pumper threads. */
054    private Duration stopTimeout = Duration.ZERO;
055
056    /** The last exception being caught. */
057    private IOException caught;
058
059    /**
060     * The thread factory.
061     */
062    private final ThreadFactory threadFactory;
063
064    /**
065     * Constructs a new {@link PumpStreamHandler}.
066     */
067    public PumpStreamHandler() {
068        this(System.out, System.err);
069    }
070
071    /**
072     * Constructs a new {@link PumpStreamHandler}.
073     *
074     * @param allOutputStream the output/error {@link OutputStream}. The {@code OutputStream}
075     *      implementation must be thread-safe because the output and error reader threads will
076     *      concurrently write to it.
077     */
078    public PumpStreamHandler(final OutputStream allOutputStream) {
079        this(allOutputStream, allOutputStream);
080    }
081
082    /**
083     * Constructs a new {@link PumpStreamHandler}.
084     *
085     * <p>If the same {@link OutputStream} instance is used for output and error, then it must be
086     * thread-safe because the output and error reader threads will concurrently write to it.
087     *
088     * @param outputStream      the output {@link OutputStream}.
089     * @param errorOutputStream the error {@link OutputStream}.
090     */
091    public PumpStreamHandler(final OutputStream outputStream, final OutputStream errorOutputStream) {
092        this(outputStream, errorOutputStream, null);
093    }
094
095    /**
096     * Constructs a new {@link PumpStreamHandler}.
097     *
098     * <p>If the same {@link OutputStream} instance is used for output and error, then it must be
099     * thread-safe because the output and error reader threads will concurrently write to it.
100     *
101     * @param outputStream      the output {@link OutputStream}.
102     * @param errorOutputStream the error {@link OutputStream}.
103     * @param inputStream       the input {@link InputStream}.
104     */
105    public PumpStreamHandler(final OutputStream outputStream, final OutputStream errorOutputStream, final InputStream inputStream) {
106        this(Executors.defaultThreadFactory(), outputStream, errorOutputStream, inputStream);
107    }
108
109    /**
110     * Constructs a new {@link PumpStreamHandler}.
111     *
112     * <p>If the same {@link OutputStream} instance is used for output and error, then it must be
113     * thread-safe because the output and error reader threads will concurrently write to it.
114     *
115     * @param outputStream      the output {@link OutputStream}.
116     * @param errorOutputStream the error {@link OutputStream}.
117     * @param inputStream       the input {@link InputStream}.
118     */
119    private PumpStreamHandler(final ThreadFactory threadFactory, final OutputStream outputStream, final OutputStream errorOutputStream,
120            final InputStream inputStream) {
121        this.threadFactory = threadFactory;
122        this.outputStream = outputStream;
123        this.errorOutputStream = errorOutputStream;
124        this.inputStream = inputStream;
125    }
126
127    /**
128     * Create the pump to handle error output.
129     *
130     * @param is the {@link InputStream}.
131     * @param os the {@link OutputStream}.
132     */
133    protected void createProcessErrorPump(final InputStream is, final OutputStream os) {
134        errorThread = createPump(is, os);
135    }
136
137    /**
138     * Create the pump to handle process output.
139     *
140     * @param is the {@link InputStream}.
141     * @param os the {@link OutputStream}.
142     */
143    protected void createProcessOutputPump(final InputStream is, final OutputStream os) {
144        outputThread = createPump(is, os);
145    }
146
147    /**
148     * Creates a stream pumper to copy the given input stream to the given output stream. When the 'os' is an PipedOutputStream we are closing 'os' afterwards
149     * to avoid an IOException ("Write end dead").
150     *
151     * @param is the input stream to copy from.
152     * @param os the output stream to copy into.
153     * @return the stream pumper thread.
154     */
155    protected Thread createPump(final InputStream is, final OutputStream os) {
156        return createPump(is, os, os instanceof PipedOutputStream);
157    }
158
159    /**
160     * Creates a stream pumper to copy the given input stream to the given output stream.
161     *
162     * @param is                 the input stream to copy from.
163     * @param os                 the output stream to copy into.
164     * @param closeWhenExhausted close the output stream when the input stream is exhausted.
165     * @return the stream pumper thread.
166     */
167    protected Thread createPump(final InputStream is, final OutputStream os, final boolean closeWhenExhausted) {
168        return ThreadUtil.newThread(threadFactory, new StreamPumper(is, os, closeWhenExhausted), "CommonsExecStreamPumper-", true);
169    }
170
171    /**
172     * Creates a stream pumper to copy the given input stream to the given output stream.
173     *
174     * @param is the System.in input stream to copy from.
175     * @param os the output stream to copy into.
176     * @return the stream pumper thread.
177     */
178    private Thread createSystemInPump(final InputStream is, final OutputStream os) {
179        inputStreamPumper = new InputStreamPumper(is, os);
180        return ThreadUtil.newThread(threadFactory, inputStreamPumper, "CommonsExecStreamPumper-", true);
181    }
182
183    /**
184     * Gets the error stream.
185     *
186     * @return {@link OutputStream}.
187     */
188    protected OutputStream getErr() {
189        return errorOutputStream;
190    }
191
192    /**
193     * Gets the output stream.
194     *
195     * @return {@link OutputStream}.
196     */
197    protected OutputStream getOut() {
198        return outputStream;
199    }
200
201    Duration getStopTimeout() {
202        return stopTimeout;
203    }
204
205    /**
206     * Sets the {@link InputStream} from which to read the standard error of the process.
207     *
208     * @param is the {@link InputStream}.
209     */
210    @Override
211    public void setProcessErrorStream(final InputStream is) {
212        if (errorOutputStream != null) {
213            createProcessErrorPump(is, errorOutputStream);
214        }
215    }
216
217    /**
218     * Sets the {@link OutputStream} by means of which input can be sent to the process.
219     *
220     * @param os the {@link OutputStream}.
221     */
222    @Override
223    public void setProcessInputStream(final OutputStream os) {
224        if (inputStream != null) {
225            if (inputStream == System.in) {
226                inputThread = createSystemInPump(inputStream, os);
227            } else {
228                inputThread = createPump(inputStream, os, true);
229            }
230        } else {
231            try {
232                os.close();
233            } catch (final IOException e) {
234                final String msg = "Got exception while closing output stream";
235                DebugUtils.handleException(msg, e);
236            }
237        }
238    }
239
240    /**
241     * Sets the {@link InputStream} from which to read the standard output of the process.
242     *
243     * @param is the {@link InputStream}.
244     */
245    @Override
246    public void setProcessOutputStream(final InputStream is) {
247        if (outputStream != null) {
248            createProcessOutputPump(is, outputStream);
249        }
250    }
251
252    /**
253     * Sets maximum time to wait until output streams are exhausted when {@link #stop()} was called.
254     *
255     * @param timeout timeout or zero to wait forever (default).
256     * @since 1.4.0
257     */
258    public void setStopTimeout(final Duration timeout) {
259        this.stopTimeout = timeout != null ? timeout : Duration.ZERO;
260    }
261
262    /**
263     * Sets maximum time to wait until output streams are exhausted when {@link #stop()} was called.
264     *
265     * @param timeout timeout in milliseconds or zero to wait forever (default).
266     * @deprecated Use {@link #setStopTimeout(Duration)}.
267     */
268    @Deprecated
269    public void setStopTimeout(final long timeout) {
270        this.stopTimeout = Duration.ofMillis(timeout);
271    }
272
273    /**
274     * Starts the {@link Thread}s.
275     */
276    @Override
277    public void start() {
278        start(outputThread);
279        start(errorThread);
280        start(inputThread);
281    }
282
283    /**
284     * Starts the given {@link Thread}.
285     */
286    private void start(final Thread thread) {
287        if (thread != null) {
288            thread.start();
289        }
290    }
291
292    /**
293     * Stops pumping the streams. When a timeout is specified it is not guaranteed that the pumper threads are cleanly terminated.
294     */
295    @Override
296    public void stop() throws IOException {
297        if (inputStreamPumper != null) {
298            inputStreamPumper.stopProcessing();
299        }
300        stop(outputThread, stopTimeout);
301        stop(errorThread, stopTimeout);
302        stop(inputThread, stopTimeout);
303
304        if (errorOutputStream != null && errorOutputStream != outputStream) {
305            try {
306                errorOutputStream.flush();
307            } catch (final IOException e) {
308                final String msg = "Got exception while flushing the error stream : " + e.getMessage();
309                DebugUtils.handleException(msg, e);
310            }
311        }
312
313        if (outputStream != null) {
314            try {
315                outputStream.flush();
316            } catch (final IOException e) {
317                final String msg = "Got exception while flushing the output stream";
318                DebugUtils.handleException(msg, e);
319            }
320        }
321
322        if (caught != null) {
323            throw caught;
324        }
325    }
326
327    /**
328     * Stops a pumper thread. The implementation actually waits longer than specified in 'timeout' to detect if the timeout was indeed exceeded. If the timeout
329     * was exceeded an IOException is created to be thrown to the caller.
330     *
331     * @param thread  the thread to be stopped.
332     * @param timeout the time in ms to wait to join.
333     */
334    private void stop(final Thread thread, final Duration timeout) {
335        if (thread != null) {
336            try {
337                if (timeout.equals(Duration.ZERO)) {
338                    thread.join();
339                } else {
340                    final Duration timeToWait = timeout.plus(STOP_TIMEOUT_ADDITION);
341                    final Instant startTime = Instant.now();
342                    thread.join(timeToWait.toMillis());
343                    if (Instant.now().isAfter(startTime.plus(timeToWait))) {
344                        caught = new ExecuteException("The stop timeout of " + timeout + " ms was exceeded", Executor.INVALID_EXITVALUE);
345                    }
346                }
347            } catch (final InterruptedException e) {
348                thread.interrupt();
349            }
350        }
351    }
352
353    /**
354     * Stops a pumper thread. The implementation actually waits longer than specified in 'timeout' to detect if the timeout was indeed exceeded. If the timeout
355     * was exceeded an IOException is created to be thrown to the caller.
356     *
357     * @param thread        the thread to be stopped.
358     * @param timeoutMillis the time in ms to wait to join.
359     */
360    protected void stopThread(final Thread thread, final long timeoutMillis) {
361        stop(thread, Duration.ofMillis(timeoutMillis));
362    }
363}