1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 package org.apache.commons.exec;
19
20 import java.io.IOException;
21 import java.io.InputStream;
22 import java.io.OutputStream;
23 import java.io.PipedOutputStream;
24 import java.time.Duration;
25 import java.time.Instant;
26 import java.util.concurrent.Executors;
27 import java.util.concurrent.ThreadFactory;
28
29 import org.apache.commons.exec.util.DebugUtils;
30
31 /**
32 * Copies standard output and error of sub-processes to standard output and error of the parent process. If output or error stream are set to null, any feedback
33 * from that stream will be lost.
34 */
35 public class PumpStreamHandler implements ExecuteStreamHandler {
36
37 private static final Duration STOP_TIMEOUT_ADDITION = Duration.ofSeconds(2);
38
39 private Thread outputThread;
40
41 private Thread errorThread;
42
43 private Thread inputThread;
44
45 private final OutputStream outputStream;
46
47 private final OutputStream errorOutputStream;
48
49 private final InputStream inputStream;
50
51 private InputStreamPumper inputStreamPumper;
52
53 /** The timeout Duration the implementation waits when stopping the pumper threads. */
54 private Duration stopTimeout = Duration.ZERO;
55
56 /** The last exception being caught. */
57 private IOException caught;
58
59 /**
60 * The thread factory.
61 */
62 private final ThreadFactory threadFactory;
63
64 /**
65 * Constructs a new {@link PumpStreamHandler}.
66 */
67 public PumpStreamHandler() {
68 this(System.out, System.err);
69 }
70
71 /**
72 * Constructs a new {@link PumpStreamHandler}.
73 *
74 * @param allOutputStream the output/error {@link OutputStream}. The {@code OutputStream}
75 * implementation must be thread-safe because the output and error reader threads will
76 * concurrently write to it.
77 */
78 public PumpStreamHandler(final OutputStream allOutputStream) {
79 this(allOutputStream, allOutputStream);
80 }
81
82 /**
83 * Constructs a new {@link PumpStreamHandler}.
84 *
85 * <p>If the same {@link OutputStream} instance is used for output and error, then it must be
86 * thread-safe because the output and error reader threads will concurrently write to it.
87 *
88 * @param outputStream the output {@link OutputStream}.
89 * @param errorOutputStream the error {@link OutputStream}.
90 */
91 public PumpStreamHandler(final OutputStream outputStream, final OutputStream errorOutputStream) {
92 this(outputStream, errorOutputStream, null);
93 }
94
95 /**
96 * Constructs a new {@link PumpStreamHandler}.
97 *
98 * <p>If the same {@link OutputStream} instance is used for output and error, then it must be
99 * thread-safe because the output and error reader threads will concurrently write to it.
100 *
101 * @param outputStream the output {@link OutputStream}.
102 * @param errorOutputStream the error {@link OutputStream}.
103 * @param inputStream the input {@link InputStream}.
104 */
105 public PumpStreamHandler(final OutputStream outputStream, final OutputStream errorOutputStream, final InputStream inputStream) {
106 this(Executors.defaultThreadFactory(), outputStream, errorOutputStream, inputStream);
107 }
108
109 /**
110 * Constructs a new {@link PumpStreamHandler}.
111 *
112 * <p>If the same {@link OutputStream} instance is used for output and error, then it must be
113 * thread-safe because the output and error reader threads will concurrently write to it.
114 *
115 * @param outputStream the output {@link OutputStream}.
116 * @param errorOutputStream the error {@link OutputStream}.
117 * @param inputStream the input {@link InputStream}.
118 */
119 private PumpStreamHandler(final ThreadFactory threadFactory, final OutputStream outputStream, final OutputStream errorOutputStream,
120 final InputStream inputStream) {
121 this.threadFactory = threadFactory;
122 this.outputStream = outputStream;
123 this.errorOutputStream = errorOutputStream;
124 this.inputStream = inputStream;
125 }
126
127 /**
128 * Create the pump to handle error output.
129 *
130 * @param is the {@link InputStream}.
131 * @param os the {@link OutputStream}.
132 */
133 protected void createProcessErrorPump(final InputStream is, final OutputStream os) {
134 errorThread = createPump(is, os);
135 }
136
137 /**
138 * Create the pump to handle process output.
139 *
140 * @param is the {@link InputStream}.
141 * @param os the {@link OutputStream}.
142 */
143 protected void createProcessOutputPump(final InputStream is, final OutputStream os) {
144 outputThread = createPump(is, os);
145 }
146
147 /**
148 * Creates a stream pumper to copy the given input stream to the given output stream. When the 'os' is an PipedOutputStream we are closing 'os' afterwards
149 * to avoid an IOException ("Write end dead").
150 *
151 * @param is the input stream to copy from.
152 * @param os the output stream to copy into.
153 * @return the stream pumper thread.
154 */
155 protected Thread createPump(final InputStream is, final OutputStream os) {
156 return createPump(is, os, os instanceof PipedOutputStream);
157 }
158
159 /**
160 * Creates a stream pumper to copy the given input stream to the given output stream.
161 *
162 * @param is the input stream to copy from.
163 * @param os the output stream to copy into.
164 * @param closeWhenExhausted close the output stream when the input stream is exhausted.
165 * @return the stream pumper thread.
166 */
167 protected Thread createPump(final InputStream is, final OutputStream os, final boolean closeWhenExhausted) {
168 return ThreadUtil.newThread(threadFactory, new StreamPumper(is, os, closeWhenExhausted), "CommonsExecStreamPumper-", true);
169 }
170
171 /**
172 * Creates a stream pumper to copy the given input stream to the given output stream.
173 *
174 * @param is the System.in input stream to copy from.
175 * @param os the output stream to copy into.
176 * @return the stream pumper thread.
177 */
178 private Thread createSystemInPump(final InputStream is, final OutputStream os) {
179 inputStreamPumper = new InputStreamPumper(is, os);
180 return ThreadUtil.newThread(threadFactory, inputStreamPumper, "CommonsExecStreamPumper-", true);
181 }
182
183 /**
184 * Gets the error stream.
185 *
186 * @return {@link OutputStream}.
187 */
188 protected OutputStream getErr() {
189 return errorOutputStream;
190 }
191
192 /**
193 * Gets the output stream.
194 *
195 * @return {@link OutputStream}.
196 */
197 protected OutputStream getOut() {
198 return outputStream;
199 }
200
201 Duration getStopTimeout() {
202 return stopTimeout;
203 }
204
205 /**
206 * Sets the {@link InputStream} from which to read the standard error of the process.
207 *
208 * @param is the {@link InputStream}.
209 */
210 @Override
211 public void setProcessErrorStream(final InputStream is) {
212 if (errorOutputStream != null) {
213 createProcessErrorPump(is, errorOutputStream);
214 }
215 }
216
217 /**
218 * Sets the {@link OutputStream} by means of which input can be sent to the process.
219 *
220 * @param os the {@link OutputStream}.
221 */
222 @Override
223 public void setProcessInputStream(final OutputStream os) {
224 if (inputStream != null) {
225 if (inputStream == System.in) {
226 inputThread = createSystemInPump(inputStream, os);
227 } else {
228 inputThread = createPump(inputStream, os, true);
229 }
230 } else {
231 try {
232 os.close();
233 } catch (final IOException e) {
234 final String msg = "Got exception while closing output stream";
235 DebugUtils.handleException(msg, e);
236 }
237 }
238 }
239
240 /**
241 * Sets the {@link InputStream} from which to read the standard output of the process.
242 *
243 * @param is the {@link InputStream}.
244 */
245 @Override
246 public void setProcessOutputStream(final InputStream is) {
247 if (outputStream != null) {
248 createProcessOutputPump(is, outputStream);
249 }
250 }
251
252 /**
253 * Sets maximum time to wait until output streams are exhausted when {@link #stop()} was called.
254 *
255 * @param timeout timeout or zero to wait forever (default).
256 * @since 1.4.0
257 */
258 public void setStopTimeout(final Duration timeout) {
259 this.stopTimeout = timeout != null ? timeout : Duration.ZERO;
260 }
261
262 /**
263 * Sets maximum time to wait until output streams are exhausted when {@link #stop()} was called.
264 *
265 * @param timeout timeout in milliseconds or zero to wait forever (default).
266 * @deprecated Use {@link #setStopTimeout(Duration)}.
267 */
268 @Deprecated
269 public void setStopTimeout(final long timeout) {
270 this.stopTimeout = Duration.ofMillis(timeout);
271 }
272
273 /**
274 * Starts the {@link Thread}s.
275 */
276 @Override
277 public void start() {
278 start(outputThread);
279 start(errorThread);
280 start(inputThread);
281 }
282
283 /**
284 * Starts the given {@link Thread}.
285 */
286 private void start(final Thread thread) {
287 if (thread != null) {
288 thread.start();
289 }
290 }
291
292 /**
293 * Stops pumping the streams. When a timeout is specified it is not guaranteed that the pumper threads are cleanly terminated.
294 */
295 @Override
296 public void stop() throws IOException {
297 if (inputStreamPumper != null) {
298 inputStreamPumper.stopProcessing();
299 }
300 stop(outputThread, stopTimeout);
301 stop(errorThread, stopTimeout);
302 stop(inputThread, stopTimeout);
303
304 if (errorOutputStream != null && errorOutputStream != outputStream) {
305 try {
306 errorOutputStream.flush();
307 } catch (final IOException e) {
308 final String msg = "Got exception while flushing the error stream : " + e.getMessage();
309 DebugUtils.handleException(msg, e);
310 }
311 }
312
313 if (outputStream != null) {
314 try {
315 outputStream.flush();
316 } catch (final IOException e) {
317 final String msg = "Got exception while flushing the output stream";
318 DebugUtils.handleException(msg, e);
319 }
320 }
321
322 if (caught != null) {
323 throw caught;
324 }
325 }
326
327 /**
328 * Stops a pumper thread. The implementation actually waits longer than specified in 'timeout' to detect if the timeout was indeed exceeded. If the timeout
329 * was exceeded an IOException is created to be thrown to the caller.
330 *
331 * @param thread the thread to be stopped.
332 * @param timeout the time in ms to wait to join.
333 */
334 private void stop(final Thread thread, final Duration timeout) {
335 if (thread != null) {
336 try {
337 if (timeout.equals(Duration.ZERO)) {
338 thread.join();
339 } else {
340 final Duration timeToWait = timeout.plus(STOP_TIMEOUT_ADDITION);
341 final Instant startTime = Instant.now();
342 thread.join(timeToWait.toMillis());
343 if (Instant.now().isAfter(startTime.plus(timeToWait))) {
344 caught = new ExecuteException("The stop timeout of " + timeout + " ms was exceeded", Executor.INVALID_EXITVALUE);
345 }
346 }
347 } catch (final InterruptedException e) {
348 thread.interrupt();
349 }
350 }
351 }
352
353 /**
354 * Stops a pumper thread. The implementation actually waits longer than specified in 'timeout' to detect if the timeout was indeed exceeded. If the timeout
355 * was exceeded an IOException is created to be thrown to the caller.
356 *
357 * @param thread the thread to be stopped.
358 * @param timeoutMillis the time in ms to wait to join.
359 */
360 protected void stopThread(final Thread thread, final long timeoutMillis) {
361 stop(thread, Duration.ofMillis(timeoutMillis));
362 }
363 }