View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    *  contributor license agreements.  See the NOTICE file distributed with
4    *  this work for additional information regarding copyright ownership.
5    *  The ASF licenses this file to You under the Apache License, Version 2.0
6    *  (the "License"); you may not use this file except in compliance with
7    *  the License.  You may obtain a copy of the License at
8    *
9    *      https://www.apache.org/licenses/LICENSE-2.0
10   *
11   *  Unless required by applicable law or agreed to in writing, software
12   *  distributed under the License is distributed on an "AS IS" BASIS,
13   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   *  See the License for the specific language governing permissions and
15   *  limitations under the License.
16   */
17  
18  package org.apache.commons.exec;
19  
20  import java.io.IOException;
21  import java.io.InputStream;
22  import java.io.OutputStream;
23  import java.io.PipedOutputStream;
24  import java.time.Duration;
25  import java.time.Instant;
26  import java.util.concurrent.Executors;
27  import java.util.concurrent.ThreadFactory;
28  
29  import org.apache.commons.exec.util.DebugUtils;
30  
31  /**
32   * Copies standard output and error of sub-processes to standard output and error of the parent process. If output or error stream are set to null, any feedback
33   * from that stream will be lost.
34   */
35  public class PumpStreamHandler implements ExecuteStreamHandler {
36  
37      private static final Duration STOP_TIMEOUT_ADDITION = Duration.ofSeconds(2);
38  
39      private Thread outputThread;
40  
41      private Thread errorThread;
42  
43      private Thread inputThread;
44  
45      private final OutputStream outputStream;
46  
47      private final OutputStream errorOutputStream;
48  
49      private final InputStream inputStream;
50  
51      private InputStreamPumper inputStreamPumper;
52  
53      /** The timeout Duration the implementation waits when stopping the pumper threads. */
54      private Duration stopTimeout = Duration.ZERO;
55  
56      /** The last exception being caught. */
57      private IOException caught;
58  
59      /**
60       * The thread factory.
61       */
62      private final ThreadFactory threadFactory;
63  
64      /**
65       * Constructs a new {@link PumpStreamHandler}.
66       */
67      public PumpStreamHandler() {
68          this(System.out, System.err);
69      }
70  
71      /**
72       * Constructs a new {@link PumpStreamHandler}.
73       *
74       * @param allOutputStream the output/error {@link OutputStream}. The {@code OutputStream}
75       *      implementation must be thread-safe because the output and error reader threads will
76       *      concurrently write to it.
77       */
78      public PumpStreamHandler(final OutputStream allOutputStream) {
79          this(allOutputStream, allOutputStream);
80      }
81  
82      /**
83       * Constructs a new {@link PumpStreamHandler}.
84       *
85       * <p>If the same {@link OutputStream} instance is used for output and error, then it must be
86       * thread-safe because the output and error reader threads will concurrently write to it.
87       *
88       * @param outputStream      the output {@link OutputStream}.
89       * @param errorOutputStream the error {@link OutputStream}.
90       */
91      public PumpStreamHandler(final OutputStream outputStream, final OutputStream errorOutputStream) {
92          this(outputStream, errorOutputStream, null);
93      }
94  
95      /**
96       * Constructs a new {@link PumpStreamHandler}.
97       *
98       * <p>If the same {@link OutputStream} instance is used for output and error, then it must be
99       * thread-safe because the output and error reader threads will concurrently write to it.
100      *
101      * @param outputStream      the output {@link OutputStream}.
102      * @param errorOutputStream the error {@link OutputStream}.
103      * @param inputStream       the input {@link InputStream}.
104      */
105     public PumpStreamHandler(final OutputStream outputStream, final OutputStream errorOutputStream, final InputStream inputStream) {
106         this(Executors.defaultThreadFactory(), outputStream, errorOutputStream, inputStream);
107     }
108 
109     /**
110      * Constructs a new {@link PumpStreamHandler}.
111      *
112      * <p>If the same {@link OutputStream} instance is used for output and error, then it must be
113      * thread-safe because the output and error reader threads will concurrently write to it.
114      *
115      * @param outputStream      the output {@link OutputStream}.
116      * @param errorOutputStream the error {@link OutputStream}.
117      * @param inputStream       the input {@link InputStream}.
118      */
119     private PumpStreamHandler(final ThreadFactory threadFactory, final OutputStream outputStream, final OutputStream errorOutputStream,
120             final InputStream inputStream) {
121         this.threadFactory = threadFactory;
122         this.outputStream = outputStream;
123         this.errorOutputStream = errorOutputStream;
124         this.inputStream = inputStream;
125     }
126 
127     /**
128      * Create the pump to handle error output.
129      *
130      * @param is the {@link InputStream}.
131      * @param os the {@link OutputStream}.
132      */
133     protected void createProcessErrorPump(final InputStream is, final OutputStream os) {
134         errorThread = createPump(is, os);
135     }
136 
137     /**
138      * Create the pump to handle process output.
139      *
140      * @param is the {@link InputStream}.
141      * @param os the {@link OutputStream}.
142      */
143     protected void createProcessOutputPump(final InputStream is, final OutputStream os) {
144         outputThread = createPump(is, os);
145     }
146 
147     /**
148      * Creates a stream pumper to copy the given input stream to the given output stream. When the 'os' is an PipedOutputStream we are closing 'os' afterwards
149      * to avoid an IOException ("Write end dead").
150      *
151      * @param is the input stream to copy from.
152      * @param os the output stream to copy into.
153      * @return the stream pumper thread.
154      */
155     protected Thread createPump(final InputStream is, final OutputStream os) {
156         return createPump(is, os, os instanceof PipedOutputStream);
157     }
158 
159     /**
160      * Creates a stream pumper to copy the given input stream to the given output stream.
161      *
162      * @param is                 the input stream to copy from.
163      * @param os                 the output stream to copy into.
164      * @param closeWhenExhausted close the output stream when the input stream is exhausted.
165      * @return the stream pumper thread.
166      */
167     protected Thread createPump(final InputStream is, final OutputStream os, final boolean closeWhenExhausted) {
168         return ThreadUtil.newThread(threadFactory, new StreamPumper(is, os, closeWhenExhausted), "CommonsExecStreamPumper-", true);
169     }
170 
171     /**
172      * Creates a stream pumper to copy the given input stream to the given output stream.
173      *
174      * @param is the System.in input stream to copy from.
175      * @param os the output stream to copy into.
176      * @return the stream pumper thread.
177      */
178     private Thread createSystemInPump(final InputStream is, final OutputStream os) {
179         inputStreamPumper = new InputStreamPumper(is, os);
180         return ThreadUtil.newThread(threadFactory, inputStreamPumper, "CommonsExecStreamPumper-", true);
181     }
182 
183     /**
184      * Gets the error stream.
185      *
186      * @return {@link OutputStream}.
187      */
188     protected OutputStream getErr() {
189         return errorOutputStream;
190     }
191 
192     /**
193      * Gets the output stream.
194      *
195      * @return {@link OutputStream}.
196      */
197     protected OutputStream getOut() {
198         return outputStream;
199     }
200 
201     Duration getStopTimeout() {
202         return stopTimeout;
203     }
204 
205     /**
206      * Sets the {@link InputStream} from which to read the standard error of the process.
207      *
208      * @param is the {@link InputStream}.
209      */
210     @Override
211     public void setProcessErrorStream(final InputStream is) {
212         if (errorOutputStream != null) {
213             createProcessErrorPump(is, errorOutputStream);
214         }
215     }
216 
217     /**
218      * Sets the {@link OutputStream} by means of which input can be sent to the process.
219      *
220      * @param os the {@link OutputStream}.
221      */
222     @Override
223     public void setProcessInputStream(final OutputStream os) {
224         if (inputStream != null) {
225             if (inputStream == System.in) {
226                 inputThread = createSystemInPump(inputStream, os);
227             } else {
228                 inputThread = createPump(inputStream, os, true);
229             }
230         } else {
231             try {
232                 os.close();
233             } catch (final IOException e) {
234                 final String msg = "Got exception while closing output stream";
235                 DebugUtils.handleException(msg, e);
236             }
237         }
238     }
239 
240     /**
241      * Sets the {@link InputStream} from which to read the standard output of the process.
242      *
243      * @param is the {@link InputStream}.
244      */
245     @Override
246     public void setProcessOutputStream(final InputStream is) {
247         if (outputStream != null) {
248             createProcessOutputPump(is, outputStream);
249         }
250     }
251 
252     /**
253      * Sets maximum time to wait until output streams are exhausted when {@link #stop()} was called.
254      *
255      * @param timeout timeout or zero to wait forever (default).
256      * @since 1.4.0
257      */
258     public void setStopTimeout(final Duration timeout) {
259         this.stopTimeout = timeout != null ? timeout : Duration.ZERO;
260     }
261 
262     /**
263      * Sets maximum time to wait until output streams are exhausted when {@link #stop()} was called.
264      *
265      * @param timeout timeout in milliseconds or zero to wait forever (default).
266      * @deprecated Use {@link #setStopTimeout(Duration)}.
267      */
268     @Deprecated
269     public void setStopTimeout(final long timeout) {
270         this.stopTimeout = Duration.ofMillis(timeout);
271     }
272 
273     /**
274      * Starts the {@link Thread}s.
275      */
276     @Override
277     public void start() {
278         start(outputThread);
279         start(errorThread);
280         start(inputThread);
281     }
282 
283     /**
284      * Starts the given {@link Thread}.
285      */
286     private void start(final Thread thread) {
287         if (thread != null) {
288             thread.start();
289         }
290     }
291 
292     /**
293      * Stops pumping the streams. When a timeout is specified it is not guaranteed that the pumper threads are cleanly terminated.
294      */
295     @Override
296     public void stop() throws IOException {
297         if (inputStreamPumper != null) {
298             inputStreamPumper.stopProcessing();
299         }
300         stop(outputThread, stopTimeout);
301         stop(errorThread, stopTimeout);
302         stop(inputThread, stopTimeout);
303 
304         if (errorOutputStream != null && errorOutputStream != outputStream) {
305             try {
306                 errorOutputStream.flush();
307             } catch (final IOException e) {
308                 final String msg = "Got exception while flushing the error stream : " + e.getMessage();
309                 DebugUtils.handleException(msg, e);
310             }
311         }
312 
313         if (outputStream != null) {
314             try {
315                 outputStream.flush();
316             } catch (final IOException e) {
317                 final String msg = "Got exception while flushing the output stream";
318                 DebugUtils.handleException(msg, e);
319             }
320         }
321 
322         if (caught != null) {
323             throw caught;
324         }
325     }
326 
327     /**
328      * Stops a pumper thread. The implementation actually waits longer than specified in 'timeout' to detect if the timeout was indeed exceeded. If the timeout
329      * was exceeded an IOException is created to be thrown to the caller.
330      *
331      * @param thread  the thread to be stopped.
332      * @param timeout the time in ms to wait to join.
333      */
334     private void stop(final Thread thread, final Duration timeout) {
335         if (thread != null) {
336             try {
337                 if (timeout.equals(Duration.ZERO)) {
338                     thread.join();
339                 } else {
340                     final Duration timeToWait = timeout.plus(STOP_TIMEOUT_ADDITION);
341                     final Instant startTime = Instant.now();
342                     thread.join(timeToWait.toMillis());
343                     if (Instant.now().isAfter(startTime.plus(timeToWait))) {
344                         caught = new ExecuteException("The stop timeout of " + timeout + " ms was exceeded", Executor.INVALID_EXITVALUE);
345                     }
346                 }
347             } catch (final InterruptedException e) {
348                 thread.interrupt();
349             }
350         }
351     }
352 
353     /**
354      * Stops a pumper thread. The implementation actually waits longer than specified in 'timeout' to detect if the timeout was indeed exceeded. If the timeout
355      * was exceeded an IOException is created to be thrown to the caller.
356      *
357      * @param thread        the thread to be stopped.
358      * @param timeoutMillis the time in ms to wait to join.
359      */
360     protected void stopThread(final Thread thread, final long timeoutMillis) {
361         stop(thread, Duration.ofMillis(timeoutMillis));
362     }
363 }