PumpStreamHandler.java

  1. /*
  2.  * Licensed to the Apache Software Foundation (ASF) under one or more
  3.  *  contributor license agreements.  See the NOTICE file distributed with
  4.  *  this work for additional information regarding copyright ownership.
  5.  *  The ASF licenses this file to You under the Apache License, Version 2.0
  6.  *  (the "License"); you may not use this file except in compliance with
  7.  *  the License.  You may obtain a copy of the License at
  8.  *
  9.  *      https://www.apache.org/licenses/LICENSE-2.0
  10.  *
  11.  *  Unless required by applicable law or agreed to in writing, software
  12.  *  distributed under the License is distributed on an "AS IS" BASIS,
  13.  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14.  *  See the License for the specific language governing permissions and
  15.  *  limitations under the License.
  16.  */

  17. package org.apache.commons.exec;

  18. import java.io.IOException;
  19. import java.io.InputStream;
  20. import java.io.OutputStream;
  21. import java.io.PipedOutputStream;
  22. import java.time.Duration;
  23. import java.time.Instant;
  24. import java.util.concurrent.Executors;
  25. import java.util.concurrent.ThreadFactory;

  26. import org.apache.commons.exec.util.DebugUtils;

  27. /**
  28.  * Copies standard output and error of sub-processes to standard output and error of the parent process. If output or error stream are set to null, any feedback
  29.  * from that stream will be lost.
  30.  */
  31. public class PumpStreamHandler implements ExecuteStreamHandler {

  32.     private static final Duration STOP_TIMEOUT_ADDITION = Duration.ofSeconds(2);

  33.     private Thread outputThread;

  34.     private Thread errorThread;

  35.     private Thread inputThread;

  36.     private final OutputStream outputStream;

  37.     private final OutputStream errorOutputStream;

  38.     private final InputStream inputStream;

  39.     private InputStreamPumper inputStreamPumper;

  40.     /** The timeout Duration the implementation waits when stopping the pumper threads. */
  41.     private Duration stopTimeout = Duration.ZERO;

  42.     /** The last exception being caught. */
  43.     private IOException caught;

  44.     /**
  45.      * The thread factory.
  46.      */
  47.     private final ThreadFactory threadFactory;

  48.     /**
  49.      * Constructs a new {@link PumpStreamHandler}.
  50.      */
  51.     public PumpStreamHandler() {
  52.         this(System.out, System.err);
  53.     }

  54.     /**
  55.      * Constructs a new {@link PumpStreamHandler}.
  56.      *
  57.      * @param allOutputStream the output/error {@link OutputStream}. The {@code OutputStream}
  58.      *      implementation must be thread-safe because the output and error reader threads will
  59.      *      concurrently write to it.
  60.      */
  61.     public PumpStreamHandler(final OutputStream allOutputStream) {
  62.         this(allOutputStream, allOutputStream);
  63.     }

  64.     /**
  65.      * Constructs a new {@link PumpStreamHandler}.
  66.      *
  67.      * <p>If the same {@link OutputStream} instance is used for output and error, then it must be
  68.      * thread-safe because the output and error reader threads will concurrently write to it.
  69.      *
  70.      * @param outputStream      the output {@link OutputStream}.
  71.      * @param errorOutputStream the error {@link OutputStream}.
  72.      */
  73.     public PumpStreamHandler(final OutputStream outputStream, final OutputStream errorOutputStream) {
  74.         this(outputStream, errorOutputStream, null);
  75.     }

  76.     /**
  77.      * Constructs a new {@link PumpStreamHandler}.
  78.      *
  79.      * <p>If the same {@link OutputStream} instance is used for output and error, then it must be
  80.      * thread-safe because the output and error reader threads will concurrently write to it.
  81.      *
  82.      * @param outputStream      the output {@link OutputStream}.
  83.      * @param errorOutputStream the error {@link OutputStream}.
  84.      * @param inputStream       the input {@link InputStream}.
  85.      */
  86.     public PumpStreamHandler(final OutputStream outputStream, final OutputStream errorOutputStream, final InputStream inputStream) {
  87.         this(Executors.defaultThreadFactory(), outputStream, errorOutputStream, inputStream);
  88.     }

  89.     /**
  90.      * Constructs a new {@link PumpStreamHandler}.
  91.      *
  92.      * <p>If the same {@link OutputStream} instance is used for output and error, then it must be
  93.      * thread-safe because the output and error reader threads will concurrently write to it.
  94.      *
  95.      * @param outputStream      the output {@link OutputStream}.
  96.      * @param errorOutputStream the error {@link OutputStream}.
  97.      * @param inputStream       the input {@link InputStream}.
  98.      */
  99.     private PumpStreamHandler(final ThreadFactory threadFactory, final OutputStream outputStream, final OutputStream errorOutputStream,
  100.             final InputStream inputStream) {
  101.         this.threadFactory = threadFactory;
  102.         this.outputStream = outputStream;
  103.         this.errorOutputStream = errorOutputStream;
  104.         this.inputStream = inputStream;
  105.     }

  106.     /**
  107.      * Create the pump to handle error output.
  108.      *
  109.      * @param is the {@link InputStream}.
  110.      * @param os the {@link OutputStream}.
  111.      */
  112.     protected void createProcessErrorPump(final InputStream is, final OutputStream os) {
  113.         errorThread = createPump(is, os);
  114.     }

  115.     /**
  116.      * Create the pump to handle process output.
  117.      *
  118.      * @param is the {@link InputStream}.
  119.      * @param os the {@link OutputStream}.
  120.      */
  121.     protected void createProcessOutputPump(final InputStream is, final OutputStream os) {
  122.         outputThread = createPump(is, os);
  123.     }

  124.     /**
  125.      * Creates a stream pumper to copy the given input stream to the given output stream. When the 'os' is an PipedOutputStream we are closing 'os' afterwards
  126.      * to avoid an IOException ("Write end dead").
  127.      *
  128.      * @param is the input stream to copy from.
  129.      * @param os the output stream to copy into.
  130.      * @return the stream pumper thread.
  131.      */
  132.     protected Thread createPump(final InputStream is, final OutputStream os) {
  133.         return createPump(is, os, os instanceof PipedOutputStream);
  134.     }

  135.     /**
  136.      * Creates a stream pumper to copy the given input stream to the given output stream.
  137.      *
  138.      * @param is                 the input stream to copy from.
  139.      * @param os                 the output stream to copy into.
  140.      * @param closeWhenExhausted close the output stream when the input stream is exhausted.
  141.      * @return the stream pumper thread.
  142.      */
  143.     protected Thread createPump(final InputStream is, final OutputStream os, final boolean closeWhenExhausted) {
  144.         return ThreadUtil.newThread(threadFactory, new StreamPumper(is, os, closeWhenExhausted), "CommonsExecStreamPumper-", true);
  145.     }

  146.     /**
  147.      * Creates a stream pumper to copy the given input stream to the given output stream.
  148.      *
  149.      * @param is the System.in input stream to copy from.
  150.      * @param os the output stream to copy into.
  151.      * @return the stream pumper thread.
  152.      */
  153.     private Thread createSystemInPump(final InputStream is, final OutputStream os) {
  154.         inputStreamPumper = new InputStreamPumper(is, os);
  155.         return ThreadUtil.newThread(threadFactory, inputStreamPumper, "CommonsExecStreamPumper-", true);
  156.     }

  157.     /**
  158.      * Gets the error stream.
  159.      *
  160.      * @return {@link OutputStream}.
  161.      */
  162.     protected OutputStream getErr() {
  163.         return errorOutputStream;
  164.     }

  165.     /**
  166.      * Gets the output stream.
  167.      *
  168.      * @return {@link OutputStream}.
  169.      */
  170.     protected OutputStream getOut() {
  171.         return outputStream;
  172.     }

  173.     Duration getStopTimeout() {
  174.         return stopTimeout;
  175.     }

  176.     /**
  177.      * Sets the {@link InputStream} from which to read the standard error of the process.
  178.      *
  179.      * @param is the {@link InputStream}.
  180.      */
  181.     @Override
  182.     public void setProcessErrorStream(final InputStream is) {
  183.         if (errorOutputStream != null) {
  184.             createProcessErrorPump(is, errorOutputStream);
  185.         }
  186.     }

  187.     /**
  188.      * Sets the {@link OutputStream} by means of which input can be sent to the process.
  189.      *
  190.      * @param os the {@link OutputStream}.
  191.      */
  192.     @Override
  193.     public void setProcessInputStream(final OutputStream os) {
  194.         if (inputStream != null) {
  195.             if (inputStream == System.in) {
  196.                 inputThread = createSystemInPump(inputStream, os);
  197.             } else {
  198.                 inputThread = createPump(inputStream, os, true);
  199.             }
  200.         } else {
  201.             try {
  202.                 os.close();
  203.             } catch (final IOException e) {
  204.                 final String msg = "Got exception while closing output stream";
  205.                 DebugUtils.handleException(msg, e);
  206.             }
  207.         }
  208.     }

  209.     /**
  210.      * Sets the {@link InputStream} from which to read the standard output of the process.
  211.      *
  212.      * @param is the {@link InputStream}.
  213.      */
  214.     @Override
  215.     public void setProcessOutputStream(final InputStream is) {
  216.         if (outputStream != null) {
  217.             createProcessOutputPump(is, outputStream);
  218.         }
  219.     }

  220.     /**
  221.      * Sets maximum time to wait until output streams are exhausted when {@link #stop()} was called.
  222.      *
  223.      * @param timeout timeout or zero to wait forever (default).
  224.      * @since 1.4.0
  225.      */
  226.     public void setStopTimeout(final Duration timeout) {
  227.         this.stopTimeout = timeout != null ? timeout : Duration.ZERO;
  228.     }

  229.     /**
  230.      * Sets maximum time to wait until output streams are exhausted when {@link #stop()} was called.
  231.      *
  232.      * @param timeout timeout in milliseconds or zero to wait forever (default).
  233.      * @deprecated Use {@link #setStopTimeout(Duration)}.
  234.      */
  235.     @Deprecated
  236.     public void setStopTimeout(final long timeout) {
  237.         this.stopTimeout = Duration.ofMillis(timeout);
  238.     }

  239.     /**
  240.      * Starts the {@link Thread}s.
  241.      */
  242.     @Override
  243.     public void start() {
  244.         start(outputThread);
  245.         start(errorThread);
  246.         start(inputThread);
  247.     }

  248.     /**
  249.      * Starts the given {@link Thread}.
  250.      */
  251.     private void start(final Thread thread) {
  252.         if (thread != null) {
  253.             thread.start();
  254.         }
  255.     }

  256.     /**
  257.      * Stops pumping the streams. When a timeout is specified it is not guaranteed that the pumper threads are cleanly terminated.
  258.      */
  259.     @Override
  260.     public void stop() throws IOException {
  261.         if (inputStreamPumper != null) {
  262.             inputStreamPumper.stopProcessing();
  263.         }
  264.         stop(outputThread, stopTimeout);
  265.         stop(errorThread, stopTimeout);
  266.         stop(inputThread, stopTimeout);

  267.         if (errorOutputStream != null && errorOutputStream != outputStream) {
  268.             try {
  269.                 errorOutputStream.flush();
  270.             } catch (final IOException e) {
  271.                 final String msg = "Got exception while flushing the error stream : " + e.getMessage();
  272.                 DebugUtils.handleException(msg, e);
  273.             }
  274.         }

  275.         if (outputStream != null) {
  276.             try {
  277.                 outputStream.flush();
  278.             } catch (final IOException e) {
  279.                 final String msg = "Got exception while flushing the output stream";
  280.                 DebugUtils.handleException(msg, e);
  281.             }
  282.         }

  283.         if (caught != null) {
  284.             throw caught;
  285.         }
  286.     }

  287.     /**
  288.      * Stops a pumper thread. The implementation actually waits longer than specified in 'timeout' to detect if the timeout was indeed exceeded. If the timeout
  289.      * was exceeded an IOException is created to be thrown to the caller.
  290.      *
  291.      * @param thread  the thread to be stopped.
  292.      * @param timeout the time in ms to wait to join.
  293.      */
  294.     private void stop(final Thread thread, final Duration timeout) {
  295.         if (thread != null) {
  296.             try {
  297.                 if (timeout.equals(Duration.ZERO)) {
  298.                     thread.join();
  299.                 } else {
  300.                     final Duration timeToWait = timeout.plus(STOP_TIMEOUT_ADDITION);
  301.                     final Instant startTime = Instant.now();
  302.                     thread.join(timeToWait.toMillis());
  303.                     if (Instant.now().isAfter(startTime.plus(timeToWait))) {
  304.                         caught = new ExecuteException("The stop timeout of " + timeout + " ms was exceeded", Executor.INVALID_EXITVALUE);
  305.                     }
  306.                 }
  307.             } catch (final InterruptedException e) {
  308.                 thread.interrupt();
  309.             }
  310.         }
  311.     }

  312.     /**
  313.      * Stops a pumper thread. The implementation actually waits longer than specified in 'timeout' to detect if the timeout was indeed exceeded. If the timeout
  314.      * was exceeded an IOException is created to be thrown to the caller.
  315.      *
  316.      * @param thread        the thread to be stopped.
  317.      * @param timeoutMillis the time in ms to wait to join.
  318.      */
  319.     protected void stopThread(final Thread thread, final long timeoutMillis) {
  320.         stop(thread, Duration.ofMillis(timeoutMillis));
  321.     }
  322. }