1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * https://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20 package org.apache.commons.exec;
21
22 import java.io.IOException;
23 import java.io.InputStream;
24 import java.io.OutputStream;
25 import java.io.PipedOutputStream;
26 import java.time.Duration;
27 import java.time.Instant;
28 import java.util.concurrent.Executors;
29 import java.util.concurrent.ThreadFactory;
30
31 import org.apache.commons.exec.util.DebugUtils;
32
33 /**
34 * Copies standard output and error of sub-processes to standard output and error of the parent process. If output or error stream are set to null, any feedback
35 * from that stream will be lost.
36 */
37 public class PumpStreamHandler implements ExecuteStreamHandler {
38
39 /** Three seconds timeout. */
40 private static final Duration STOP_TIMEOUT_ADDITION = Duration.ofSeconds(2);
41
42 /**
43 * Output thread.
44 */
45 private Thread outputThread;
46
47 /**
48 * Error thread.
49 */
50 private Thread errorThread;
51
52 /** Input thread. */
53 private Thread inputThread;
54
55 /** Output stream. */
56 private final OutputStream outputStream;
57
58 /** Error output stream. */
59 private final OutputStream errorOutputStream;
60
61 /** Error input stream. */
62 private final InputStream inputStream;
63
64 /** Pumper input stream. */
65 private InputStreamPumper inputStreamPumper;
66
67 /** The timeout Duration the implementation waits when stopping the pumper threads. */
68 private Duration stopTimeout = Duration.ZERO;
69
70 /** The last exception being caught. */
71 private IOException caught;
72
73 /**
74 * The thread factory.
75 */
76 private final ThreadFactory threadFactory;
77
78 /**
79 * Constructs a new {@link PumpStreamHandler}.
80 */
81 public PumpStreamHandler() {
82 this(System.out, System.err);
83 }
84
85 /**
86 * Constructs a new {@link PumpStreamHandler}.
87 *
88 * @param allOutputStream the output/error {@link OutputStream}. The {@code OutputStream}
89 * implementation must be thread-safe because the output and error reader threads will
90 * concurrently write to it.
91 */
92 public PumpStreamHandler(final OutputStream allOutputStream) {
93 this(allOutputStream, allOutputStream);
94 }
95
96 /**
97 * Constructs a new {@link PumpStreamHandler}.
98 *
99 * <p>If the same {@link OutputStream} instance is used for output and error, then it must be
100 * thread-safe because the output and error reader threads will concurrently write to it.
101 *
102 * @param outputStream the output {@link OutputStream}.
103 * @param errorOutputStream the error {@link OutputStream}.
104 */
105 public PumpStreamHandler(final OutputStream outputStream, final OutputStream errorOutputStream) {
106 this(outputStream, errorOutputStream, null);
107 }
108
109 /**
110 * Constructs a new {@link PumpStreamHandler}.
111 *
112 * <p>If the same {@link OutputStream} instance is used for output and error, then it must be
113 * thread-safe because the output and error reader threads will concurrently write to it.
114 *
115 * @param outputStream the output {@link OutputStream}.
116 * @param errorOutputStream the error {@link OutputStream}.
117 * @param inputStream the input {@link InputStream}.
118 */
119 public PumpStreamHandler(final OutputStream outputStream, final OutputStream errorOutputStream, final InputStream inputStream) {
120 this(Executors.defaultThreadFactory(), outputStream, errorOutputStream, inputStream);
121 }
122
123 /**
124 * Constructs a new {@link PumpStreamHandler}.
125 *
126 * <p>If the same {@link OutputStream} instance is used for output and error, then it must be
127 * thread-safe because the output and error reader threads will concurrently write to it.
128 *
129 * @param outputStream the output {@link OutputStream}.
130 * @param errorOutputStream the error {@link OutputStream}.
131 * @param inputStream the input {@link InputStream}.
132 */
133 private PumpStreamHandler(final ThreadFactory threadFactory, final OutputStream outputStream, final OutputStream errorOutputStream,
134 final InputStream inputStream) {
135 this.threadFactory = threadFactory;
136 this.outputStream = outputStream;
137 this.errorOutputStream = errorOutputStream;
138 this.inputStream = inputStream;
139 }
140
141 /**
142 * Create the pump to handle error output.
143 *
144 * @param is the {@link InputStream}.
145 * @param os the {@link OutputStream}.
146 */
147 protected void createProcessErrorPump(final InputStream is, final OutputStream os) {
148 errorThread = createPump(is, os);
149 }
150
151 /**
152 * Create the pump to handle process output.
153 *
154 * @param is the {@link InputStream}.
155 * @param os the {@link OutputStream}.
156 */
157 protected void createProcessOutputPump(final InputStream is, final OutputStream os) {
158 outputThread = createPump(is, os);
159 }
160
161 /**
162 * Creates a stream pumper to copy the given input stream to the given output stream. When the 'os' is an PipedOutputStream we are closing 'os' afterward
163 * to avoid an IOException ("Write end dead").
164 *
165 * @param is the input stream to copy from.
166 * @param os the output stream to copy into.
167 * @return the stream pumper thread.
168 */
169 protected Thread createPump(final InputStream is, final OutputStream os) {
170 return createPump(is, os, os instanceof PipedOutputStream);
171 }
172
173 /**
174 * Creates a stream pumper to copy the given input stream to the given output stream.
175 *
176 * @param is the input stream to copy from.
177 * @param os the output stream to copy into.
178 * @param closeWhenExhausted close the output stream when the input stream is exhausted.
179 * @return the stream pumper thread.
180 */
181 protected Thread createPump(final InputStream is, final OutputStream os, final boolean closeWhenExhausted) {
182 return ThreadUtil.newThread(threadFactory, new StreamPumper(is, os, closeWhenExhausted), "CommonsExecStreamPumper-", true);
183 }
184
185 /**
186 * Creates a stream pumper to copy the given input stream to the given output stream.
187 *
188 * @param is the System.in input stream to copy from.
189 * @param os the output stream to copy into.
190 * @return the stream pumper thread.
191 */
192 private Thread createSystemInPump(final InputStream is, final OutputStream os) {
193 inputStreamPumper = new InputStreamPumper(is, os);
194 return ThreadUtil.newThread(threadFactory, inputStreamPumper, "CommonsExecStreamPumper-", true);
195 }
196
197 /**
198 * Gets the error stream.
199 *
200 * @return {@link OutputStream}.
201 */
202 protected OutputStream getErr() {
203 return errorOutputStream;
204 }
205
206 /**
207 * Gets the output stream.
208 *
209 * @return {@link OutputStream}.
210 */
211 protected OutputStream getOut() {
212 return outputStream;
213 }
214
215 Duration getStopTimeout() {
216 return stopTimeout;
217 }
218
219 /**
220 * Sets the {@link InputStream} from which to read the standard error of the process.
221 *
222 * @param is the {@link InputStream}.
223 */
224 @Override
225 public void setProcessErrorStream(final InputStream is) {
226 if (errorOutputStream != null) {
227 createProcessErrorPump(is, errorOutputStream);
228 }
229 }
230
231 /**
232 * Sets the {@link OutputStream} by means of which input can be sent to the process.
233 *
234 * @param os the {@link OutputStream}.
235 */
236 @Override
237 public void setProcessInputStream(final OutputStream os) {
238 if (inputStream != null) {
239 if (inputStream == System.in) {
240 inputThread = createSystemInPump(inputStream, os);
241 } else {
242 inputThread = createPump(inputStream, os, true);
243 }
244 } else {
245 try {
246 os.close();
247 } catch (final IOException e) {
248 final String msg = "Got exception while closing output stream";
249 DebugUtils.handleException(msg, e);
250 }
251 }
252 }
253
254 /**
255 * Sets the {@link InputStream} from which to read the standard output of the process.
256 *
257 * @param is the {@link InputStream}.
258 */
259 @Override
260 public void setProcessOutputStream(final InputStream is) {
261 if (outputStream != null) {
262 createProcessOutputPump(is, outputStream);
263 }
264 }
265
266 /**
267 * Sets maximum time to wait until output streams are exhausted when {@link #stop()} was called.
268 *
269 * @param timeout timeout or zero to wait forever (default).
270 * @since 1.4.0
271 */
272 public void setStopTimeout(final Duration timeout) {
273 this.stopTimeout = timeout != null ? timeout : Duration.ZERO;
274 }
275
276 /**
277 * Sets maximum time to wait until output streams are exhausted when {@link #stop()} was called.
278 *
279 * @param timeout timeout in milliseconds or zero to wait forever (default).
280 * @deprecated Use {@link #setStopTimeout(Duration)}.
281 */
282 @Deprecated
283 public void setStopTimeout(final long timeout) {
284 this.stopTimeout = Duration.ofMillis(timeout);
285 }
286
287 /**
288 * Starts the {@link Thread}s.
289 */
290 @Override
291 public void start() {
292 start(outputThread);
293 start(errorThread);
294 start(inputThread);
295 }
296
297 /**
298 * Starts the given {@link Thread}.
299 */
300 private void start(final Thread thread) {
301 if (thread != null) {
302 thread.start();
303 }
304 }
305
306 /**
307 * Stops pumping the streams. When a timeout is specified it is not guaranteed that the pumper threads are cleanly terminated.
308 */
309 @Override
310 public void stop() throws IOException {
311 if (inputStreamPumper != null) {
312 inputStreamPumper.stopProcessing();
313 }
314 stop(outputThread, stopTimeout);
315 stop(errorThread, stopTimeout);
316 stop(inputThread, stopTimeout);
317
318 if (errorOutputStream != null && errorOutputStream != outputStream) {
319 try {
320 errorOutputStream.flush();
321 } catch (final IOException e) {
322 final String msg = "Got exception while flushing the error stream : " + e.getMessage();
323 DebugUtils.handleException(msg, e);
324 }
325 }
326
327 if (outputStream != null) {
328 try {
329 outputStream.flush();
330 } catch (final IOException e) {
331 final String msg = "Got exception while flushing the output stream";
332 DebugUtils.handleException(msg, e);
333 }
334 }
335
336 if (caught != null) {
337 throw caught;
338 }
339 }
340
341 /**
342 * Stops a pumper thread. The implementation actually waits longer than specified in 'timeout' to detect if the timeout was indeed exceeded. If the timeout
343 * was exceeded an IOException is created to be thrown to the caller.
344 *
345 * @param thread the thread to be stopped.
346 * @param timeout the time in ms to wait to join.
347 */
348 private void stop(final Thread thread, final Duration timeout) {
349 if (thread != null) {
350 try {
351 if (timeout.equals(Duration.ZERO)) {
352 thread.join();
353 } else {
354 final Duration timeToWait = timeout.plus(STOP_TIMEOUT_ADDITION);
355 final Instant startTime = Instant.now();
356 thread.join(timeToWait.toMillis());
357 if (Instant.now().isAfter(startTime.plus(timeToWait))) {
358 caught = new ExecuteException("The stop timeout of " + timeout + " ms was exceeded", Executor.INVALID_EXITVALUE);
359 }
360 }
361 } catch (final InterruptedException e) {
362 thread.interrupt();
363 }
364 }
365 }
366
367 /**
368 * Stops a pumper thread. The implementation actually waits longer than specified in 'timeout' to detect if the timeout was indeed exceeded. If the timeout
369 * was exceeded an IOException is created to be thrown to the caller.
370 *
371 * @param thread the thread to be stopped.
372 * @param timeoutMillis the time in ms to wait to join.
373 */
374 protected void stopThread(final Thread thread, final long timeoutMillis) {
375 stop(thread, Duration.ofMillis(timeoutMillis));
376 }
377 }