001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * https://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018package org.apache.commons.exec; 019 020import java.io.IOException; 021import java.io.InputStream; 022import java.io.OutputStream; 023import java.io.PipedOutputStream; 024import java.time.Duration; 025import java.time.Instant; 026import java.util.concurrent.Executors; 027import java.util.concurrent.ThreadFactory; 028 029import org.apache.commons.exec.util.DebugUtils; 030 031/** 032 * Copies standard output and error of sub-processes to standard output and error of the parent process. If output or error stream are set to null, any feedback 033 * from that stream will be lost. 034 */ 035public class PumpStreamHandler implements ExecuteStreamHandler { 036 037 private static final Duration STOP_TIMEOUT_ADDITION = Duration.ofSeconds(2); 038 039 private Thread outputThread; 040 041 private Thread errorThread; 042 043 private Thread inputThread; 044 045 private final OutputStream outputStream; 046 047 private final OutputStream errorOutputStream; 048 049 private final InputStream inputStream; 050 051 private InputStreamPumper inputStreamPumper; 052 053 /** The timeout Duration the implementation waits when stopping the pumper threads. */ 054 private Duration stopTimeout = Duration.ZERO; 055 056 /** The last exception being caught. */ 057 private IOException caught; 058 059 /** 060 * The thread factory. 061 */ 062 private final ThreadFactory threadFactory; 063 064 /** 065 * Constructs a new {@link PumpStreamHandler}. 066 */ 067 public PumpStreamHandler() { 068 this(System.out, System.err); 069 } 070 071 /** 072 * Constructs a new {@link PumpStreamHandler}. 073 * 074 * @param allOutputStream the output/error {@link OutputStream}. The {@code OutputStream} 075 * implementation must be thread-safe because the output and error reader threads will 076 * concurrently write to it. 077 */ 078 public PumpStreamHandler(final OutputStream allOutputStream) { 079 this(allOutputStream, allOutputStream); 080 } 081 082 /** 083 * Constructs a new {@link PumpStreamHandler}. 084 * 085 * <p>If the same {@link OutputStream} instance is used for output and error, then it must be 086 * thread-safe because the output and error reader threads will concurrently write to it. 087 * 088 * @param outputStream the output {@link OutputStream}. 089 * @param errorOutputStream the error {@link OutputStream}. 090 */ 091 public PumpStreamHandler(final OutputStream outputStream, final OutputStream errorOutputStream) { 092 this(outputStream, errorOutputStream, null); 093 } 094 095 /** 096 * Constructs a new {@link PumpStreamHandler}. 097 * 098 * <p>If the same {@link OutputStream} instance is used for output and error, then it must be 099 * thread-safe because the output and error reader threads will concurrently write to it. 100 * 101 * @param outputStream the output {@link OutputStream}. 102 * @param errorOutputStream the error {@link OutputStream}. 103 * @param inputStream the input {@link InputStream}. 104 */ 105 public PumpStreamHandler(final OutputStream outputStream, final OutputStream errorOutputStream, final InputStream inputStream) { 106 this(Executors.defaultThreadFactory(), outputStream, errorOutputStream, inputStream); 107 } 108 109 /** 110 * Constructs a new {@link PumpStreamHandler}. 111 * 112 * <p>If the same {@link OutputStream} instance is used for output and error, then it must be 113 * thread-safe because the output and error reader threads will concurrently write to it. 114 * 115 * @param outputStream the output {@link OutputStream}. 116 * @param errorOutputStream the error {@link OutputStream}. 117 * @param inputStream the input {@link InputStream}. 118 */ 119 private PumpStreamHandler(final ThreadFactory threadFactory, final OutputStream outputStream, final OutputStream errorOutputStream, 120 final InputStream inputStream) { 121 this.threadFactory = threadFactory; 122 this.outputStream = outputStream; 123 this.errorOutputStream = errorOutputStream; 124 this.inputStream = inputStream; 125 } 126 127 /** 128 * Create the pump to handle error output. 129 * 130 * @param is the {@link InputStream}. 131 * @param os the {@link OutputStream}. 132 */ 133 protected void createProcessErrorPump(final InputStream is, final OutputStream os) { 134 errorThread = createPump(is, os); 135 } 136 137 /** 138 * Create the pump to handle process output. 139 * 140 * @param is the {@link InputStream}. 141 * @param os the {@link OutputStream}. 142 */ 143 protected void createProcessOutputPump(final InputStream is, final OutputStream os) { 144 outputThread = createPump(is, os); 145 } 146 147 /** 148 * Creates a stream pumper to copy the given input stream to the given output stream. When the 'os' is an PipedOutputStream we are closing 'os' afterwards 149 * to avoid an IOException ("Write end dead"). 150 * 151 * @param is the input stream to copy from. 152 * @param os the output stream to copy into. 153 * @return the stream pumper thread. 154 */ 155 protected Thread createPump(final InputStream is, final OutputStream os) { 156 return createPump(is, os, os instanceof PipedOutputStream); 157 } 158 159 /** 160 * Creates a stream pumper to copy the given input stream to the given output stream. 161 * 162 * @param is the input stream to copy from. 163 * @param os the output stream to copy into. 164 * @param closeWhenExhausted close the output stream when the input stream is exhausted. 165 * @return the stream pumper thread. 166 */ 167 protected Thread createPump(final InputStream is, final OutputStream os, final boolean closeWhenExhausted) { 168 return ThreadUtil.newThread(threadFactory, new StreamPumper(is, os, closeWhenExhausted), "CommonsExecStreamPumper-", true); 169 } 170 171 /** 172 * Creates a stream pumper to copy the given input stream to the given output stream. 173 * 174 * @param is the System.in input stream to copy from. 175 * @param os the output stream to copy into. 176 * @return the stream pumper thread. 177 */ 178 private Thread createSystemInPump(final InputStream is, final OutputStream os) { 179 inputStreamPumper = new InputStreamPumper(is, os); 180 return ThreadUtil.newThread(threadFactory, inputStreamPumper, "CommonsExecStreamPumper-", true); 181 } 182 183 /** 184 * Gets the error stream. 185 * 186 * @return {@link OutputStream}. 187 */ 188 protected OutputStream getErr() { 189 return errorOutputStream; 190 } 191 192 /** 193 * Gets the output stream. 194 * 195 * @return {@link OutputStream}. 196 */ 197 protected OutputStream getOut() { 198 return outputStream; 199 } 200 201 Duration getStopTimeout() { 202 return stopTimeout; 203 } 204 205 /** 206 * Sets the {@link InputStream} from which to read the standard error of the process. 207 * 208 * @param is the {@link InputStream}. 209 */ 210 @Override 211 public void setProcessErrorStream(final InputStream is) { 212 if (errorOutputStream != null) { 213 createProcessErrorPump(is, errorOutputStream); 214 } 215 } 216 217 /** 218 * Sets the {@link OutputStream} by means of which input can be sent to the process. 219 * 220 * @param os the {@link OutputStream}. 221 */ 222 @Override 223 public void setProcessInputStream(final OutputStream os) { 224 if (inputStream != null) { 225 if (inputStream == System.in) { 226 inputThread = createSystemInPump(inputStream, os); 227 } else { 228 inputThread = createPump(inputStream, os, true); 229 } 230 } else { 231 try { 232 os.close(); 233 } catch (final IOException e) { 234 final String msg = "Got exception while closing output stream"; 235 DebugUtils.handleException(msg, e); 236 } 237 } 238 } 239 240 /** 241 * Sets the {@link InputStream} from which to read the standard output of the process. 242 * 243 * @param is the {@link InputStream}. 244 */ 245 @Override 246 public void setProcessOutputStream(final InputStream is) { 247 if (outputStream != null) { 248 createProcessOutputPump(is, outputStream); 249 } 250 } 251 252 /** 253 * Sets maximum time to wait until output streams are exhausted when {@link #stop()} was called. 254 * 255 * @param timeout timeout or zero to wait forever (default). 256 * @since 1.4.0 257 */ 258 public void setStopTimeout(final Duration timeout) { 259 this.stopTimeout = timeout != null ? timeout : Duration.ZERO; 260 } 261 262 /** 263 * Sets maximum time to wait until output streams are exhausted when {@link #stop()} was called. 264 * 265 * @param timeout timeout in milliseconds or zero to wait forever (default). 266 * @deprecated Use {@link #setStopTimeout(Duration)}. 267 */ 268 @Deprecated 269 public void setStopTimeout(final long timeout) { 270 this.stopTimeout = Duration.ofMillis(timeout); 271 } 272 273 /** 274 * Starts the {@link Thread}s. 275 */ 276 @Override 277 public void start() { 278 start(outputThread); 279 start(errorThread); 280 start(inputThread); 281 } 282 283 /** 284 * Starts the given {@link Thread}. 285 */ 286 private void start(final Thread thread) { 287 if (thread != null) { 288 thread.start(); 289 } 290 } 291 292 /** 293 * Stops pumping the streams. When a timeout is specified it is not guaranteed that the pumper threads are cleanly terminated. 294 */ 295 @Override 296 public void stop() throws IOException { 297 if (inputStreamPumper != null) { 298 inputStreamPumper.stopProcessing(); 299 } 300 stop(outputThread, stopTimeout); 301 stop(errorThread, stopTimeout); 302 stop(inputThread, stopTimeout); 303 304 if (errorOutputStream != null && errorOutputStream != outputStream) { 305 try { 306 errorOutputStream.flush(); 307 } catch (final IOException e) { 308 final String msg = "Got exception while flushing the error stream : " + e.getMessage(); 309 DebugUtils.handleException(msg, e); 310 } 311 } 312 313 if (outputStream != null) { 314 try { 315 outputStream.flush(); 316 } catch (final IOException e) { 317 final String msg = "Got exception while flushing the output stream"; 318 DebugUtils.handleException(msg, e); 319 } 320 } 321 322 if (caught != null) { 323 throw caught; 324 } 325 } 326 327 /** 328 * Stops a pumper thread. The implementation actually waits longer than specified in 'timeout' to detect if the timeout was indeed exceeded. If the timeout 329 * was exceeded an IOException is created to be thrown to the caller. 330 * 331 * @param thread the thread to be stopped. 332 * @param timeout the time in ms to wait to join. 333 */ 334 private void stop(final Thread thread, final Duration timeout) { 335 if (thread != null) { 336 try { 337 if (timeout.equals(Duration.ZERO)) { 338 thread.join(); 339 } else { 340 final Duration timeToWait = timeout.plus(STOP_TIMEOUT_ADDITION); 341 final Instant startTime = Instant.now(); 342 thread.join(timeToWait.toMillis()); 343 if (Instant.now().isAfter(startTime.plus(timeToWait))) { 344 caught = new ExecuteException("The stop timeout of " + timeout + " ms was exceeded", Executor.INVALID_EXITVALUE); 345 } 346 } 347 } catch (final InterruptedException e) { 348 thread.interrupt(); 349 } 350 } 351 } 352 353 /** 354 * Stops a pumper thread. The implementation actually waits longer than specified in 'timeout' to detect if the timeout was indeed exceeded. If the timeout 355 * was exceeded an IOException is created to be thrown to the caller. 356 * 357 * @param thread the thread to be stopped. 358 * @param timeoutMillis the time in ms to wait to join. 359 */ 360 protected void stopThread(final Thread thread, final long timeoutMillis) { 361 stop(thread, Duration.ofMillis(timeoutMillis)); 362 } 363}