001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * https://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019 020package org.apache.commons.exec; 021 022import java.io.IOException; 023import java.io.InputStream; 024import java.io.OutputStream; 025import java.io.PipedOutputStream; 026import java.time.Duration; 027import java.time.Instant; 028import java.util.concurrent.Executors; 029import java.util.concurrent.ThreadFactory; 030 031import org.apache.commons.exec.util.DebugUtils; 032 033/** 034 * Copies standard output and error of sub-processes to standard output and error of the parent process. If output or error stream are set to null, any feedback 035 * from that stream will be lost. 036 */ 037public class PumpStreamHandler implements ExecuteStreamHandler { 038 039 /** Three seconds timeout. */ 040 private static final Duration STOP_TIMEOUT_ADDITION = Duration.ofSeconds(2); 041 042 /** 043 * Output thread. 044 */ 045 private Thread outputThread; 046 047 /** 048 * Error thread. 049 */ 050 private Thread errorThread; 051 052 /** Input thread. */ 053 private Thread inputThread; 054 055 /** Output stream. */ 056 private final OutputStream outputStream; 057 058 /** Error output stream. */ 059 private final OutputStream errorOutputStream; 060 061 /** Error input stream. */ 062 private final InputStream inputStream; 063 064 /** Pumper input stream. */ 065 private InputStreamPumper inputStreamPumper; 066 067 /** The timeout Duration the implementation waits when stopping the pumper threads. */ 068 private Duration stopTimeout = Duration.ZERO; 069 070 /** The last exception being caught. */ 071 private IOException caught; 072 073 /** 074 * The thread factory. 075 */ 076 private final ThreadFactory threadFactory; 077 078 /** 079 * Constructs a new {@link PumpStreamHandler}. 080 */ 081 public PumpStreamHandler() { 082 this(System.out, System.err); 083 } 084 085 /** 086 * Constructs a new {@link PumpStreamHandler}. 087 * 088 * @param allOutputStream the output/error {@link OutputStream}. The {@code OutputStream} 089 * implementation must be thread-safe because the output and error reader threads will 090 * concurrently write to it. 091 */ 092 public PumpStreamHandler(final OutputStream allOutputStream) { 093 this(allOutputStream, allOutputStream); 094 } 095 096 /** 097 * Constructs a new {@link PumpStreamHandler}. 098 * 099 * <p>If the same {@link OutputStream} instance is used for output and error, then it must be 100 * thread-safe because the output and error reader threads will concurrently write to it. 101 * 102 * @param outputStream the output {@link OutputStream}. 103 * @param errorOutputStream the error {@link OutputStream}. 104 */ 105 public PumpStreamHandler(final OutputStream outputStream, final OutputStream errorOutputStream) { 106 this(outputStream, errorOutputStream, null); 107 } 108 109 /** 110 * Constructs a new {@link PumpStreamHandler}. 111 * 112 * <p>If the same {@link OutputStream} instance is used for output and error, then it must be 113 * thread-safe because the output and error reader threads will concurrently write to it. 114 * 115 * @param outputStream the output {@link OutputStream}. 116 * @param errorOutputStream the error {@link OutputStream}. 117 * @param inputStream the input {@link InputStream}. 118 */ 119 public PumpStreamHandler(final OutputStream outputStream, final OutputStream errorOutputStream, final InputStream inputStream) { 120 this(Executors.defaultThreadFactory(), outputStream, errorOutputStream, inputStream); 121 } 122 123 /** 124 * Constructs a new {@link PumpStreamHandler}. 125 * 126 * <p>If the same {@link OutputStream} instance is used for output and error, then it must be 127 * thread-safe because the output and error reader threads will concurrently write to it. 128 * 129 * @param outputStream the output {@link OutputStream}. 130 * @param errorOutputStream the error {@link OutputStream}. 131 * @param inputStream the input {@link InputStream}. 132 */ 133 private PumpStreamHandler(final ThreadFactory threadFactory, final OutputStream outputStream, final OutputStream errorOutputStream, 134 final InputStream inputStream) { 135 this.threadFactory = threadFactory; 136 this.outputStream = outputStream; 137 this.errorOutputStream = errorOutputStream; 138 this.inputStream = inputStream; 139 } 140 141 /** 142 * Create the pump to handle error output. 143 * 144 * @param is the {@link InputStream}. 145 * @param os the {@link OutputStream}. 146 */ 147 protected void createProcessErrorPump(final InputStream is, final OutputStream os) { 148 errorThread = createPump(is, os); 149 } 150 151 /** 152 * Create the pump to handle process output. 153 * 154 * @param is the {@link InputStream}. 155 * @param os the {@link OutputStream}. 156 */ 157 protected void createProcessOutputPump(final InputStream is, final OutputStream os) { 158 outputThread = createPump(is, os); 159 } 160 161 /** 162 * Creates a stream pumper to copy the given input stream to the given output stream. When the 'os' is an PipedOutputStream we are closing 'os' afterward 163 * to avoid an IOException ("Write end dead"). 164 * 165 * @param is the input stream to copy from. 166 * @param os the output stream to copy into. 167 * @return the stream pumper thread. 168 */ 169 protected Thread createPump(final InputStream is, final OutputStream os) { 170 return createPump(is, os, os instanceof PipedOutputStream); 171 } 172 173 /** 174 * Creates a stream pumper to copy the given input stream to the given output stream. 175 * 176 * @param is the input stream to copy from. 177 * @param os the output stream to copy into. 178 * @param closeWhenExhausted close the output stream when the input stream is exhausted. 179 * @return the stream pumper thread. 180 */ 181 protected Thread createPump(final InputStream is, final OutputStream os, final boolean closeWhenExhausted) { 182 return ThreadUtil.newThread(threadFactory, new StreamPumper(is, os, closeWhenExhausted), "CommonsExecStreamPumper-", true); 183 } 184 185 /** 186 * Creates a stream pumper to copy the given input stream to the given output stream. 187 * 188 * @param is the System.in input stream to copy from. 189 * @param os the output stream to copy into. 190 * @return the stream pumper thread. 191 */ 192 private Thread createSystemInPump(final InputStream is, final OutputStream os) { 193 inputStreamPumper = new InputStreamPumper(is, os); 194 return ThreadUtil.newThread(threadFactory, inputStreamPumper, "CommonsExecStreamPumper-", true); 195 } 196 197 /** 198 * Gets the error stream. 199 * 200 * @return {@link OutputStream}. 201 */ 202 protected OutputStream getErr() { 203 return errorOutputStream; 204 } 205 206 /** 207 * Gets the output stream. 208 * 209 * @return {@link OutputStream}. 210 */ 211 protected OutputStream getOut() { 212 return outputStream; 213 } 214 215 Duration getStopTimeout() { 216 return stopTimeout; 217 } 218 219 /** 220 * Sets the {@link InputStream} from which to read the standard error of the process. 221 * 222 * @param is the {@link InputStream}. 223 */ 224 @Override 225 public void setProcessErrorStream(final InputStream is) { 226 if (errorOutputStream != null) { 227 createProcessErrorPump(is, errorOutputStream); 228 } 229 } 230 231 /** 232 * Sets the {@link OutputStream} by means of which input can be sent to the process. 233 * 234 * @param os the {@link OutputStream}. 235 */ 236 @Override 237 public void setProcessInputStream(final OutputStream os) { 238 if (inputStream != null) { 239 if (inputStream == System.in) { 240 inputThread = createSystemInPump(inputStream, os); 241 } else { 242 inputThread = createPump(inputStream, os, true); 243 } 244 } else { 245 try { 246 os.close(); 247 } catch (final IOException e) { 248 final String msg = "Got exception while closing output stream"; 249 DebugUtils.handleException(msg, e); 250 } 251 } 252 } 253 254 /** 255 * Sets the {@link InputStream} from which to read the standard output of the process. 256 * 257 * @param is the {@link InputStream}. 258 */ 259 @Override 260 public void setProcessOutputStream(final InputStream is) { 261 if (outputStream != null) { 262 createProcessOutputPump(is, outputStream); 263 } 264 } 265 266 /** 267 * Sets maximum time to wait until output streams are exhausted when {@link #stop()} was called. 268 * 269 * @param timeout timeout or zero to wait forever (default). 270 * @since 1.4.0 271 */ 272 public void setStopTimeout(final Duration timeout) { 273 this.stopTimeout = timeout != null ? timeout : Duration.ZERO; 274 } 275 276 /** 277 * Sets maximum time to wait until output streams are exhausted when {@link #stop()} was called. 278 * 279 * @param timeout timeout in milliseconds or zero to wait forever (default). 280 * @deprecated Use {@link #setStopTimeout(Duration)}. 281 */ 282 @Deprecated 283 public void setStopTimeout(final long timeout) { 284 this.stopTimeout = Duration.ofMillis(timeout); 285 } 286 287 /** 288 * Starts the {@link Thread}s. 289 */ 290 @Override 291 public void start() { 292 start(outputThread); 293 start(errorThread); 294 start(inputThread); 295 } 296 297 /** 298 * Starts the given {@link Thread}. 299 */ 300 private void start(final Thread thread) { 301 if (thread != null) { 302 thread.start(); 303 } 304 } 305 306 /** 307 * Stops pumping the streams. When a timeout is specified it is not guaranteed that the pumper threads are cleanly terminated. 308 */ 309 @Override 310 public void stop() throws IOException { 311 if (inputStreamPumper != null) { 312 inputStreamPumper.stopProcessing(); 313 } 314 stop(outputThread, stopTimeout); 315 stop(errorThread, stopTimeout); 316 stop(inputThread, stopTimeout); 317 318 if (errorOutputStream != null && errorOutputStream != outputStream) { 319 try { 320 errorOutputStream.flush(); 321 } catch (final IOException e) { 322 final String msg = "Got exception while flushing the error stream : " + e.getMessage(); 323 DebugUtils.handleException(msg, e); 324 } 325 } 326 327 if (outputStream != null) { 328 try { 329 outputStream.flush(); 330 } catch (final IOException e) { 331 final String msg = "Got exception while flushing the output stream"; 332 DebugUtils.handleException(msg, e); 333 } 334 } 335 336 if (caught != null) { 337 throw caught; 338 } 339 } 340 341 /** 342 * Stops a pumper thread. The implementation actually waits longer than specified in 'timeout' to detect if the timeout was indeed exceeded. If the timeout 343 * was exceeded an IOException is created to be thrown to the caller. 344 * 345 * @param thread the thread to be stopped. 346 * @param timeout the time in ms to wait to join. 347 */ 348 private void stop(final Thread thread, final Duration timeout) { 349 if (thread != null) { 350 try { 351 if (timeout.equals(Duration.ZERO)) { 352 thread.join(); 353 } else { 354 final Duration timeToWait = timeout.plus(STOP_TIMEOUT_ADDITION); 355 final Instant startTime = Instant.now(); 356 thread.join(timeToWait.toMillis()); 357 if (Instant.now().isAfter(startTime.plus(timeToWait))) { 358 caught = new ExecuteException("The stop timeout of " + timeout + " ms was exceeded", Executor.INVALID_EXITVALUE); 359 } 360 } 361 } catch (final InterruptedException e) { 362 thread.interrupt(); 363 } 364 } 365 } 366 367 /** 368 * Stops a pumper thread. The implementation actually waits longer than specified in 'timeout' to detect if the timeout was indeed exceeded. If the timeout 369 * was exceeded an IOException is created to be thrown to the caller. 370 * 371 * @param thread the thread to be stopped. 372 * @param timeoutMillis the time in ms to wait to join. 373 */ 374 protected void stopThread(final Thread thread, final long timeoutMillis) { 375 stop(thread, Duration.ofMillis(timeoutMillis)); 376 } 377}