View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    *  contributor license agreements.  See the NOTICE file distributed with
4    *  this work for additional information regarding copyright ownership.
5    *  The ASF licenses this file to You under the Apache License, Version 2.0
6    *  (the "License"); you may not use this file except in compliance with
7    *  the License.  You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   *  Unless required by applicable law or agreed to in writing, software
12   *  distributed under the License is distributed on an "AS IS" BASIS,
13   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   *  See the License for the specific language governing permissions and
15   *  limitations under the License.
16   */
17  
18  package org.apache.commons.exec;
19  
20  import java.io.IOException;
21  import java.io.InputStream;
22  import java.io.OutputStream;
23  import java.io.PipedOutputStream;
24  import java.time.Duration;
25  import java.time.Instant;
26  import java.util.concurrent.Executors;
27  import java.util.concurrent.ThreadFactory;
28  
29  import org.apache.commons.exec.util.DebugUtils;
30  
31  /**
32   * Copies standard output and error of sub-processes to standard output and error of the parent process. If output or error stream are set to null, any feedback
33   * from that stream will be lost.
34   */
35  public class PumpStreamHandler implements ExecuteStreamHandler {
36  
37      private static final Duration STOP_TIMEOUT_ADDITION = Duration.ofSeconds(2);
38  
39      private Thread outputThread;
40  
41      private Thread errorThread;
42  
43      private Thread inputThread;
44  
45      private final OutputStream outputStream;
46  
47      private final OutputStream errorOutputStream;
48  
49      private final InputStream inputStream;
50  
51      private InputStreamPumper inputStreamPumper;
52  
53      /** The timeout Duration the implementation waits when stopping the pumper threads. */
54      private Duration stopTimeout = Duration.ZERO;
55  
56      /** The last exception being caught. */
57      private IOException caught;
58  
59      /**
60       * The thread factory.
61       */
62      private final ThreadFactory threadFactory;
63  
64      /**
65       * Constructs a new {@link PumpStreamHandler}.
66       */
67      public PumpStreamHandler() {
68          this(System.out, System.err);
69      }
70  
71      /**
72       * Constructs a new {@link PumpStreamHandler}.
73       *
74       * @param allOutputStream the output/error {@link OutputStream}.
75       */
76      public PumpStreamHandler(final OutputStream allOutputStream) {
77          this(allOutputStream, allOutputStream);
78      }
79  
80      /**
81       * Constructs a new {@link PumpStreamHandler}.
82       *
83       * @param outputStream      the output {@link OutputStream}.
84       * @param errorOutputStream the error {@link OutputStream}.
85       */
86      public PumpStreamHandler(final OutputStream outputStream, final OutputStream errorOutputStream) {
87          this(outputStream, errorOutputStream, null);
88      }
89  
90      /**
91       * Constructs a new {@link PumpStreamHandler}.
92       *
93       * @param outputStream      the output {@link OutputStream}.
94       * @param errorOutputStream the error {@link OutputStream}.
95       * @param inputStream       the input {@link InputStream}.
96       */
97      public PumpStreamHandler(final OutputStream outputStream, final OutputStream errorOutputStream, final InputStream inputStream) {
98          this(Executors.defaultThreadFactory(), outputStream, errorOutputStream, inputStream);
99      }
100 
101     /**
102      * Constructs a new {@link PumpStreamHandler}.
103      *
104      * @param outputStream      the output {@link OutputStream}.
105      * @param errorOutputStream the error {@link OutputStream}.
106      * @param inputStream       the input {@link InputStream}.
107      */
108     private PumpStreamHandler(final ThreadFactory threadFactory, final OutputStream outputStream, final OutputStream errorOutputStream,
109             final InputStream inputStream) {
110         this.threadFactory = threadFactory;
111         this.outputStream = outputStream;
112         this.errorOutputStream = errorOutputStream;
113         this.inputStream = inputStream;
114     }
115 
116     /**
117      * Create the pump to handle error output.
118      *
119      * @param is the {@link InputStream}.
120      * @param os the {@link OutputStream}.
121      */
122     protected void createProcessErrorPump(final InputStream is, final OutputStream os) {
123         errorThread = createPump(is, os);
124     }
125 
126     /**
127      * Create the pump to handle process output.
128      *
129      * @param is the {@link InputStream}.
130      * @param os the {@link OutputStream}.
131      */
132     protected void createProcessOutputPump(final InputStream is, final OutputStream os) {
133         outputThread = createPump(is, os);
134     }
135 
136     /**
137      * Creates a stream pumper to copy the given input stream to the given output stream. When the 'os' is an PipedOutputStream we are closing 'os' afterwards
138      * to avoid an IOException ("Write end dead").
139      *
140      * @param is the input stream to copy from.
141      * @param os the output stream to copy into.
142      * @return the stream pumper thread.
143      */
144     protected Thread createPump(final InputStream is, final OutputStream os) {
145         return createPump(is, os, os instanceof PipedOutputStream);
146     }
147 
148     /**
149      * Creates a stream pumper to copy the given input stream to the given output stream.
150      *
151      * @param is                 the input stream to copy from.
152      * @param os                 the output stream to copy into.
153      * @param closeWhenExhausted close the output stream when the input stream is exhausted.
154      * @return the stream pumper thread.
155      */
156     protected Thread createPump(final InputStream is, final OutputStream os, final boolean closeWhenExhausted) {
157         return ThreadUtil.newThread(threadFactory, new StreamPumper(is, os, closeWhenExhausted), "CommonsExecStreamPumper-", true);
158     }
159 
160     /**
161      * Creates a stream pumper to copy the given input stream to the given output stream.
162      *
163      * @param is the System.in input stream to copy from.
164      * @param os the output stream to copy into.
165      * @return the stream pumper thread.
166      */
167     private Thread createSystemInPump(final InputStream is, final OutputStream os) {
168         inputStreamPumper = new InputStreamPumper(is, os);
169         return ThreadUtil.newThread(threadFactory, inputStreamPumper, "CommonsExecStreamPumper-", true);
170     }
171 
172     /**
173      * Gets the error stream.
174      *
175      * @return {@link OutputStream}.
176      */
177     protected OutputStream getErr() {
178         return errorOutputStream;
179     }
180 
181     /**
182      * Gets the output stream.
183      *
184      * @return {@link OutputStream}.
185      */
186     protected OutputStream getOut() {
187         return outputStream;
188     }
189 
190     Duration getStopTimeout() {
191         return stopTimeout;
192     }
193 
194     /**
195      * Sets the {@link InputStream} from which to read the standard error of the process.
196      *
197      * @param is the {@link InputStream}.
198      */
199     @Override
200     public void setProcessErrorStream(final InputStream is) {
201         if (errorOutputStream != null) {
202             createProcessErrorPump(is, errorOutputStream);
203         }
204     }
205 
206     /**
207      * Sets the {@link OutputStream} by means of which input can be sent to the process.
208      *
209      * @param os the {@link OutputStream}.
210      */
211     @Override
212     public void setProcessInputStream(final OutputStream os) {
213         if (inputStream != null) {
214             if (inputStream == System.in) {
215                 inputThread = createSystemInPump(inputStream, os);
216             } else {
217                 inputThread = createPump(inputStream, os, true);
218             }
219         } else {
220             try {
221                 os.close();
222             } catch (final IOException e) {
223                 final String msg = "Got exception while closing output stream";
224                 DebugUtils.handleException(msg, e);
225             }
226         }
227     }
228 
229     /**
230      * Sets the {@link InputStream} from which to read the standard output of the process.
231      *
232      * @param is the {@link InputStream}.
233      */
234     @Override
235     public void setProcessOutputStream(final InputStream is) {
236         if (outputStream != null) {
237             createProcessOutputPump(is, outputStream);
238         }
239     }
240 
241     /**
242      * Sets maximum time to wait until output streams are exhausted when {@link #stop()} was called.
243      *
244      * @param timeout timeout or zero to wait forever (default).
245      * @since 1.4.0
246      */
247     public void setStopTimeout(final Duration timeout) {
248         this.stopTimeout = timeout != null ? timeout : Duration.ZERO;
249     }
250 
251     /**
252      * Sets maximum time to wait until output streams are exhausted when {@link #stop()} was called.
253      *
254      * @param timeout timeout in milliseconds or zero to wait forever (default).
255      * @deprecated Use {@link #setStopTimeout(Duration)}.
256      */
257     @Deprecated
258     public void setStopTimeout(final long timeout) {
259         this.stopTimeout = Duration.ofMillis(timeout);
260     }
261 
262     /**
263      * Starts the {@link Thread}s.
264      */
265     @Override
266     public void start() {
267         start(outputThread);
268         start(errorThread);
269         start(inputThread);
270     }
271 
272     /**
273      * Starts the given {@link Thread}.
274      */
275     private void start(final Thread thread) {
276         if (thread != null) {
277             thread.start();
278         }
279     }
280 
281     /**
282      * Stops pumping the streams. When a timeout is specified it is not guaranteed that the pumper threads are cleanly terminated.
283      */
284     @Override
285     public void stop() throws IOException {
286         if (inputStreamPumper != null) {
287             inputStreamPumper.stopProcessing();
288         }
289         stop(outputThread, stopTimeout);
290         stop(errorThread, stopTimeout);
291         stop(inputThread, stopTimeout);
292 
293         if (errorOutputStream != null && errorOutputStream != outputStream) {
294             try {
295                 errorOutputStream.flush();
296             } catch (final IOException e) {
297                 final String msg = "Got exception while flushing the error stream : " + e.getMessage();
298                 DebugUtils.handleException(msg, e);
299             }
300         }
301 
302         if (outputStream != null) {
303             try {
304                 outputStream.flush();
305             } catch (final IOException e) {
306                 final String msg = "Got exception while flushing the output stream";
307                 DebugUtils.handleException(msg, e);
308             }
309         }
310 
311         if (caught != null) {
312             throw caught;
313         }
314     }
315 
316     /**
317      * Stops a pumper thread. The implementation actually waits longer than specified in 'timeout' to detect if the timeout was indeed exceeded. If the timeout
318      * was exceeded an IOException is created to be thrown to the caller.
319      *
320      * @param thread  the thread to be stopped.
321      * @param timeout the time in ms to wait to join.
322      */
323     private void stop(final Thread thread, final Duration timeout) {
324         if (thread != null) {
325             try {
326                 if (timeout.equals(Duration.ZERO)) {
327                     thread.join();
328                 } else {
329                     final Duration timeToWait = timeout.plus(STOP_TIMEOUT_ADDITION);
330                     final Instant startTime = Instant.now();
331                     thread.join(timeToWait.toMillis());
332                     if (Instant.now().isAfter(startTime.plus(timeToWait))) {
333                         caught = new ExecuteException("The stop timeout of " + timeout + " ms was exceeded", Executor.INVALID_EXITVALUE);
334                     }
335                 }
336             } catch (final InterruptedException e) {
337                 thread.interrupt();
338             }
339         }
340     }
341 
342     /**
343      * Stops a pumper thread. The implementation actually waits longer than specified in 'timeout' to detect if the timeout was indeed exceeded. If the timeout
344      * was exceeded an IOException is created to be thrown to the caller.
345      *
346      * @param thread        the thread to be stopped.
347      * @param timeoutMillis the time in ms to wait to join.
348      */
349     protected void stopThread(final Thread thread, final long timeoutMillis) {
350         stop(thread, Duration.ofMillis(timeoutMillis));
351     }
352 }