1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 package org.apache.commons.io.input; 18 19 import static org.apache.commons.io.IOUtils.EOF; 20 21 import java.io.IOException; 22 import java.io.InputStream; 23 import java.io.PipedInputStream; 24 import java.io.PipedOutputStream; 25 import java.time.Duration; 26 import java.util.Objects; 27 import java.util.concurrent.BlockingQueue; 28 import java.util.concurrent.LinkedBlockingQueue; 29 import java.util.concurrent.TimeUnit; 30 31 import org.apache.commons.io.build.AbstractStreamBuilder; 32 import org.apache.commons.io.output.QueueOutputStream; 33 34 /** 35 * Simple alternative to JDK {@link java.io.PipedInputStream}; queue input stream provides what's written in queue output stream. 36 * <p> 37 * To build an instance, use {@link Builder}. 38 * </p> 39 * <p> 40 * Example usage: 41 * </p> 42 * <pre> 43 * QueueInputStream inputStream = new QueueInputStream(); 44 * QueueOutputStream outputStream = inputStream.newQueueOutputStream(); 45 * 46 * outputStream.write("hello world".getBytes(UTF_8)); 47 * inputStream.read(); 48 * </pre> 49 * <p> 50 * Unlike JDK {@link PipedInputStream} and {@link PipedOutputStream}, queue input/output streams may be used safely in a single thread or multiple threads. 51 * Also, unlike JDK classes, no special meaning is attached to initial or current thread. Instances can be used longer after initial threads exited. 52 * </p> 53 * <p> 54 * Closing a {@link QueueInputStream} has no effect. The methods in this class can be called after the stream has been closed without generating an 55 * {@link IOException}. 56 * </p> 57 * 58 * @see Builder 59 * @see QueueOutputStream 60 * @since 2.9.0 61 */ 62 public class QueueInputStream extends InputStream { 63 64 // @formatter:off 65 /** 66 * Builds a new {@link QueueInputStream}. 67 * 68 * <p> 69 * For example: 70 * </p> 71 * <pre>{@code 72 * QueueInputStream s = QueueInputStream.builder() 73 * .setBlockingQueue(new LinkedBlockingQueue<>()) 74 * .setTimeout(Duration.ZERO) 75 * .get();} 76 * </pre> 77 * 78 * @see #get() 79 * @since 2.12.0 80 */ 81 // @formatter:on 82 public static class Builder extends AbstractStreamBuilder<QueueInputStream, Builder> { 83 84 private BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>(); 85 private Duration timeout = Duration.ZERO; 86 87 /** 88 * Builds a new {@link QueueInputStream}. 89 * <p> 90 * This builder use the following aspects: 91 * </p> 92 * <ul> 93 * <li>{@link #setBlockingQueue(BlockingQueue)}</li> 94 * <li>timeout</li> 95 * </ul> 96 * 97 * @return a new instance. 98 */ 99 @Override 100 public QueueInputStream get() { 101 return new QueueInputStream(blockingQueue, timeout); 102 } 103 104 /** 105 * Sets backing queue for the stream. 106 * 107 * @param blockingQueue backing queue for the stream. 108 * @return this 109 */ 110 public Builder setBlockingQueue(final BlockingQueue<Integer> blockingQueue) { 111 this.blockingQueue = blockingQueue != null ? blockingQueue : new LinkedBlockingQueue<>(); 112 return this; 113 } 114 115 /** 116 * Sets the polling timeout. 117 * 118 * @param timeout the polling timeout. 119 * @return this. 120 */ 121 public Builder setTimeout(final Duration timeout) { 122 if (timeout != null && timeout.toNanos() < 0) { 123 throw new IllegalArgumentException("timeout must not be negative"); 124 } 125 this.timeout = timeout != null ? timeout : Duration.ZERO; 126 return this; 127 } 128 129 } 130 131 /** 132 * Constructs a new {@link Builder}. 133 * 134 * @return a new {@link Builder}. 135 * @since 2.12.0 136 */ 137 public static Builder builder() { 138 return new Builder(); 139 } 140 141 private final BlockingQueue<Integer> blockingQueue; 142 143 private final long timeoutNanos; 144 145 /** 146 * Constructs a new instance with no limit to its internal queue size and zero timeout. 147 */ 148 public QueueInputStream() { 149 this(new LinkedBlockingQueue<>()); 150 } 151 152 /** 153 * Constructs a new instance with given queue and zero timeout. 154 * 155 * @param blockingQueue backing queue for the stream. 156 * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}. 157 */ 158 @Deprecated 159 public QueueInputStream(final BlockingQueue<Integer> blockingQueue) { 160 this(blockingQueue, Duration.ZERO); 161 } 162 163 /** 164 * Constructs a new instance with given queue and timeout. 165 * 166 * @param blockingQueue backing queue for the stream. 167 * @param timeout how long to wait before giving up when polling the queue. 168 */ 169 private QueueInputStream(final BlockingQueue<Integer> blockingQueue, final Duration timeout) { 170 this.blockingQueue = Objects.requireNonNull(blockingQueue, "blockingQueue"); 171 this.timeoutNanos = Objects.requireNonNull(timeout, "timeout").toNanos(); 172 } 173 174 /** 175 * Gets the blocking queue. 176 * 177 * @return the blocking queue. 178 */ 179 BlockingQueue<Integer> getBlockingQueue() { 180 return blockingQueue; 181 } 182 183 /** 184 * Gets the timeout duration. 185 * 186 * @return the timeout duration. 187 */ 188 Duration getTimeout() { 189 return Duration.ofNanos(timeoutNanos); 190 } 191 192 /** 193 * Constructs a new QueueOutputStream instance connected to this. Writes to the output stream will be visible to this input stream. 194 * 195 * @return QueueOutputStream connected to this stream. 196 */ 197 public QueueOutputStream newQueueOutputStream() { 198 return new QueueOutputStream(blockingQueue); 199 } 200 201 /** 202 * Reads and returns a single byte. 203 * 204 * @return the byte read, or {@code -1} if a timeout occurs before a queue element is available. 205 * @throws IllegalStateException if thread is interrupted while waiting. 206 */ 207 @Override 208 public int read() { 209 try { 210 final Integer value = blockingQueue.poll(timeoutNanos, TimeUnit.NANOSECONDS); 211 return value == null ? EOF : 0xFF & value; 212 } catch (final InterruptedException e) { 213 Thread.currentThread().interrupt(); 214 // throw runtime unchecked exception to maintain signature backward-compatibility of 215 // this read method, which does not declare IOException 216 throw new IllegalStateException(e); 217 } 218 } 219 220 }