View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   https://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  
20  package org.apache.commons.exec;
21  
22  import java.io.IOException;
23  import java.io.InputStream;
24  import java.io.OutputStream;
25  import java.io.PipedOutputStream;
26  import java.time.Duration;
27  import java.time.Instant;
28  import java.util.concurrent.Executors;
29  import java.util.concurrent.ThreadFactory;
30  
31  import org.apache.commons.exec.util.DebugUtils;
32  
33  /**
34   * Copies standard output and error of sub-processes to standard output and error of the parent process. If output or error stream are set to null, any feedback
35   * from that stream will be lost.
36   */
37  public class PumpStreamHandler implements ExecuteStreamHandler {
38  
39      /** Three seconds timeout. */
40      private static final Duration STOP_TIMEOUT_ADDITION = Duration.ofSeconds(2);
41  
42      /**
43       * Output thread.
44       */
45      private Thread outputThread;
46  
47      /**
48       * Error thread.
49       */
50      private Thread errorThread;
51  
52      /** Input thread. */
53      private Thread inputThread;
54  
55      /** Output stream. */
56      private final OutputStream outputStream;
57  
58      /** Error output stream. */
59      private final OutputStream errorOutputStream;
60  
61      /** Error input stream. */
62      private final InputStream inputStream;
63  
64      /** Pumper input stream. */
65      private InputStreamPumper inputStreamPumper;
66  
67      /** The timeout Duration the implementation waits when stopping the pumper threads. */
68      private Duration stopTimeout = Duration.ZERO;
69  
70      /** The last exception being caught. */
71      private IOException caught;
72  
73      /**
74       * The thread factory.
75       */
76      private final ThreadFactory threadFactory;
77  
78      /**
79       * Constructs a new {@link PumpStreamHandler}.
80       */
81      public PumpStreamHandler() {
82          this(System.out, System.err);
83      }
84  
85      /**
86       * Constructs a new {@link PumpStreamHandler}.
87       *
88       * @param allOutputStream the output/error {@link OutputStream}. The {@code OutputStream}
89       *      implementation must be thread-safe because the output and error reader threads will
90       *      concurrently write to it.
91       */
92      public PumpStreamHandler(final OutputStream allOutputStream) {
93          this(allOutputStream, allOutputStream);
94      }
95  
96      /**
97       * Constructs a new {@link PumpStreamHandler}.
98       *
99       * <p>If the same {@link OutputStream} instance is used for output and error, then it must be
100      * thread-safe because the output and error reader threads will concurrently write to it.
101      *
102      * @param outputStream      the output {@link OutputStream}.
103      * @param errorOutputStream the error {@link OutputStream}.
104      */
105     public PumpStreamHandler(final OutputStream outputStream, final OutputStream errorOutputStream) {
106         this(outputStream, errorOutputStream, null);
107     }
108 
109     /**
110      * Constructs a new {@link PumpStreamHandler}.
111      *
112      * <p>If the same {@link OutputStream} instance is used for output and error, then it must be
113      * thread-safe because the output and error reader threads will concurrently write to it.
114      *
115      * @param outputStream      the output {@link OutputStream}.
116      * @param errorOutputStream the error {@link OutputStream}.
117      * @param inputStream       the input {@link InputStream}.
118      */
119     public PumpStreamHandler(final OutputStream outputStream, final OutputStream errorOutputStream, final InputStream inputStream) {
120         this(Executors.defaultThreadFactory(), outputStream, errorOutputStream, inputStream);
121     }
122 
123     /**
124      * Constructs a new {@link PumpStreamHandler}.
125      *
126      * <p>If the same {@link OutputStream} instance is used for output and error, then it must be
127      * thread-safe because the output and error reader threads will concurrently write to it.
128      *
129      * @param outputStream      the output {@link OutputStream}.
130      * @param errorOutputStream the error {@link OutputStream}.
131      * @param inputStream       the input {@link InputStream}.
132      */
133     private PumpStreamHandler(final ThreadFactory threadFactory, final OutputStream outputStream, final OutputStream errorOutputStream,
134             final InputStream inputStream) {
135         this.threadFactory = threadFactory;
136         this.outputStream = outputStream;
137         this.errorOutputStream = errorOutputStream;
138         this.inputStream = inputStream;
139     }
140 
141     /**
142      * Create the pump to handle error output.
143      *
144      * @param is the {@link InputStream}.
145      * @param os the {@link OutputStream}.
146      */
147     protected void createProcessErrorPump(final InputStream is, final OutputStream os) {
148         errorThread = createPump(is, os);
149     }
150 
151     /**
152      * Create the pump to handle process output.
153      *
154      * @param is the {@link InputStream}.
155      * @param os the {@link OutputStream}.
156      */
157     protected void createProcessOutputPump(final InputStream is, final OutputStream os) {
158         outputThread = createPump(is, os);
159     }
160 
161     /**
162      * Creates a stream pumper to copy the given input stream to the given output stream. When the 'os' is an PipedOutputStream we are closing 'os' afterward
163      * to avoid an IOException ("Write end dead").
164      *
165      * @param is the input stream to copy from.
166      * @param os the output stream to copy into.
167      * @return the stream pumper thread.
168      */
169     protected Thread createPump(final InputStream is, final OutputStream os) {
170         return createPump(is, os, os instanceof PipedOutputStream);
171     }
172 
173     /**
174      * Creates a stream pumper to copy the given input stream to the given output stream.
175      *
176      * @param is                 the input stream to copy from.
177      * @param os                 the output stream to copy into.
178      * @param closeWhenExhausted close the output stream when the input stream is exhausted.
179      * @return the stream pumper thread.
180      */
181     protected Thread createPump(final InputStream is, final OutputStream os, final boolean closeWhenExhausted) {
182         return ThreadUtil.newThread(threadFactory, new StreamPumper(is, os, closeWhenExhausted), "CommonsExecStreamPumper-", true);
183     }
184 
185     /**
186      * Creates a stream pumper to copy the given input stream to the given output stream.
187      *
188      * @param is the System.in input stream to copy from.
189      * @param os the output stream to copy into.
190      * @return the stream pumper thread.
191      */
192     private Thread createSystemInPump(final InputStream is, final OutputStream os) {
193         inputStreamPumper = new InputStreamPumper(is, os);
194         return ThreadUtil.newThread(threadFactory, inputStreamPumper, "CommonsExecStreamPumper-", true);
195     }
196 
197     /**
198      * Gets the error stream.
199      *
200      * @return {@link OutputStream}.
201      */
202     protected OutputStream getErr() {
203         return errorOutputStream;
204     }
205 
206     /**
207      * Gets the output stream.
208      *
209      * @return {@link OutputStream}.
210      */
211     protected OutputStream getOut() {
212         return outputStream;
213     }
214 
215     Duration getStopTimeout() {
216         return stopTimeout;
217     }
218 
219     /**
220      * Sets the {@link InputStream} from which to read the standard error of the process.
221      *
222      * @param is the {@link InputStream}.
223      */
224     @Override
225     public void setProcessErrorStream(final InputStream is) {
226         if (errorOutputStream != null) {
227             createProcessErrorPump(is, errorOutputStream);
228         }
229     }
230 
231     /**
232      * Sets the {@link OutputStream} by means of which input can be sent to the process.
233      *
234      * @param os the {@link OutputStream}.
235      */
236     @Override
237     public void setProcessInputStream(final OutputStream os) {
238         if (inputStream != null) {
239             if (inputStream == System.in) {
240                 inputThread = createSystemInPump(inputStream, os);
241             } else {
242                 inputThread = createPump(inputStream, os, true);
243             }
244         } else {
245             try {
246                 os.close();
247             } catch (final IOException e) {
248                 final String msg = "Got exception while closing output stream";
249                 DebugUtils.handleException(msg, e);
250             }
251         }
252     }
253 
254     /**
255      * Sets the {@link InputStream} from which to read the standard output of the process.
256      *
257      * @param is the {@link InputStream}.
258      */
259     @Override
260     public void setProcessOutputStream(final InputStream is) {
261         if (outputStream != null) {
262             createProcessOutputPump(is, outputStream);
263         }
264     }
265 
266     /**
267      * Sets maximum time to wait until output streams are exhausted when {@link #stop()} was called.
268      *
269      * @param timeout timeout or zero to wait forever (default).
270      * @since 1.4.0
271      */
272     public void setStopTimeout(final Duration timeout) {
273         this.stopTimeout = timeout != null ? timeout : Duration.ZERO;
274     }
275 
276     /**
277      * Sets maximum time to wait until output streams are exhausted when {@link #stop()} was called.
278      *
279      * @param timeout timeout in milliseconds or zero to wait forever (default).
280      * @deprecated Use {@link #setStopTimeout(Duration)}.
281      */
282     @Deprecated
283     public void setStopTimeout(final long timeout) {
284         this.stopTimeout = Duration.ofMillis(timeout);
285     }
286 
287     /**
288      * Starts the {@link Thread}s.
289      */
290     @Override
291     public void start() {
292         start(outputThread);
293         start(errorThread);
294         start(inputThread);
295     }
296 
297     /**
298      * Starts the given {@link Thread}.
299      */
300     private void start(final Thread thread) {
301         if (thread != null) {
302             thread.start();
303         }
304     }
305 
306     /**
307      * Stops pumping the streams. When a timeout is specified it is not guaranteed that the pumper threads are cleanly terminated.
308      */
309     @Override
310     public void stop() throws IOException {
311         if (inputStreamPumper != null) {
312             inputStreamPumper.stopProcessing();
313         }
314         stop(outputThread, stopTimeout);
315         stop(errorThread, stopTimeout);
316         stop(inputThread, stopTimeout);
317 
318         if (errorOutputStream != null && errorOutputStream != outputStream) {
319             try {
320                 errorOutputStream.flush();
321             } catch (final IOException e) {
322                 final String msg = "Got exception while flushing the error stream : " + e.getMessage();
323                 DebugUtils.handleException(msg, e);
324             }
325         }
326 
327         if (outputStream != null) {
328             try {
329                 outputStream.flush();
330             } catch (final IOException e) {
331                 final String msg = "Got exception while flushing the output stream";
332                 DebugUtils.handleException(msg, e);
333             }
334         }
335 
336         if (caught != null) {
337             throw caught;
338         }
339     }
340 
341     /**
342      * Stops a pumper thread. The implementation actually waits longer than specified in 'timeout' to detect if the timeout was indeed exceeded. If the timeout
343      * was exceeded an IOException is created to be thrown to the caller.
344      *
345      * @param thread  the thread to be stopped.
346      * @param timeout the time in ms to wait to join.
347      */
348     private void stop(final Thread thread, final Duration timeout) {
349         if (thread != null) {
350             try {
351                 if (timeout.equals(Duration.ZERO)) {
352                     thread.join();
353                 } else {
354                     final Duration timeToWait = timeout.plus(STOP_TIMEOUT_ADDITION);
355                     final Instant startTime = Instant.now();
356                     thread.join(timeToWait.toMillis());
357                     if (Instant.now().isAfter(startTime.plus(timeToWait))) {
358                         caught = new ExecuteException("The stop timeout of " + timeout + " ms was exceeded", Executor.INVALID_EXITVALUE);
359                     }
360                 }
361             } catch (final InterruptedException e) {
362                 thread.interrupt();
363             }
364         }
365     }
366 
367     /**
368      * Stops a pumper thread. The implementation actually waits longer than specified in 'timeout' to detect if the timeout was indeed exceeded. If the timeout
369      * was exceeded an IOException is created to be thrown to the caller.
370      *
371      * @param thread        the thread to be stopped.
372      * @param timeoutMillis the time in ms to wait to join.
373      */
374     protected void stopThread(final Thread thread, final long timeoutMillis) {
375         stop(thread, Duration.ofMillis(timeoutMillis));
376     }
377 }