1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * https://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 package org.apache.commons.io.input; 18 19 import static org.apache.commons.io.IOUtils.EOF; 20 21 import java.io.IOException; 22 import java.io.InputStream; 23 import java.io.PipedInputStream; 24 import java.io.PipedOutputStream; 25 import java.time.Duration; 26 import java.util.ArrayList; 27 import java.util.List; 28 import java.util.Objects; 29 import java.util.concurrent.BlockingQueue; 30 import java.util.concurrent.LinkedBlockingQueue; 31 import java.util.concurrent.TimeUnit; 32 33 import org.apache.commons.io.build.AbstractStreamBuilder; 34 import org.apache.commons.io.output.QueueOutputStream; 35 36 /** 37 * Simple alternative to JDK {@link PipedInputStream}; queue input stream provides what's written in queue output stream. 38 * <p> 39 * To build an instance, use {@link Builder}. 40 * </p> 41 * <p> 42 * Example usage: 43 * </p> 44 * <pre> 45 * QueueInputStream inputStream = new QueueInputStream(); 46 * QueueOutputStream outputStream = inputStream.newQueueOutputStream(); 47 * 48 * outputStream.write("hello world".getBytes(UTF_8)); 49 * inputStream.read(); 50 * </pre> 51 * <p> 52 * Unlike JDK {@link PipedInputStream} and {@link PipedOutputStream}, queue input/output streams may be used safely in a single thread or multiple threads. 53 * Also, unlike JDK classes, no special meaning is attached to initial or current thread. Instances can be used longer after initial threads exited. 54 * </p> 55 * <p> 56 * Closing a {@link QueueInputStream} has no effect. The methods in this class can be called after the stream has been closed without generating an 57 * {@link IOException}. 58 * </p> 59 * 60 * @see Builder 61 * @see QueueOutputStream 62 * @since 2.9.0 63 */ 64 public class QueueInputStream extends InputStream { 65 66 // @formatter:off 67 /** 68 * Builds a new {@link QueueInputStream}. 69 * 70 * <p> 71 * For example: 72 * </p> 73 * <pre>{@code 74 * QueueInputStream s = QueueInputStream.builder() 75 * .setBlockingQueue(new LinkedBlockingQueue<>()) 76 * .setTimeout(Duration.ZERO) 77 * .get();} 78 * </pre> 79 * 80 * @see #get() 81 * @since 2.12.0 82 */ 83 // @formatter:on 84 public static class Builder extends AbstractStreamBuilder<QueueInputStream, Builder> { 85 86 private BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>(); 87 private Duration timeout = Duration.ZERO; 88 89 /** 90 * Constructs a new builder of {@link QueueInputStream}. 91 */ 92 public Builder() { 93 // empty 94 } 95 96 /** 97 * Builds a new {@link QueueInputStream}. 98 * <p> 99 * This builder uses the following aspects: 100 * </p> 101 * <ul> 102 * <li>{@link #setBlockingQueue(BlockingQueue)}</li> 103 * <li>timeout</li> 104 * </ul> 105 * 106 * @return a new instance. 107 * @see #getUnchecked() 108 */ 109 @Override 110 public QueueInputStream get() { 111 return new QueueInputStream(this); 112 } 113 114 /** 115 * Sets backing queue for the stream. 116 * 117 * @param blockingQueue backing queue for the stream, null resets to a new blocking queue instance. 118 * @return {@code this} instance. 119 */ 120 public Builder setBlockingQueue(final BlockingQueue<Integer> blockingQueue) { 121 this.blockingQueue = blockingQueue != null ? blockingQueue : new LinkedBlockingQueue<>(); 122 return this; 123 } 124 125 /** 126 * Sets the polling timeout. 127 * 128 * @param timeout the polling timeout. 129 * @return {@code this} instance. 130 */ 131 public Builder setTimeout(final Duration timeout) { 132 if (timeout != null && timeout.toNanos() < 0) { 133 throw new IllegalArgumentException("timeout must not be negative"); 134 } 135 this.timeout = timeout != null ? timeout : Duration.ZERO; 136 return this; 137 } 138 139 } 140 141 /** 142 * Constructs a new {@link Builder}. 143 * 144 * @return a new {@link Builder}. 145 * @since 2.12.0 146 */ 147 public static Builder builder() { 148 return new Builder(); 149 } 150 151 private final BlockingQueue<Integer> blockingQueue; 152 153 private final long timeoutNanos; 154 155 /** 156 * Constructs a new instance with no limit to its internal queue size and zero timeout. 157 */ 158 public QueueInputStream() { 159 this(new LinkedBlockingQueue<>()); 160 } 161 162 /** 163 * Constructs a new instance with given queue and zero timeout. 164 * 165 * @param blockingQueue backing queue for the stream, null maps to a new blocking queue instance. 166 * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}. 167 */ 168 @Deprecated 169 public QueueInputStream(final BlockingQueue<Integer> blockingQueue) { 170 this(builder().setBlockingQueue(blockingQueue)); 171 } 172 173 /** 174 * Constructs a new instance. 175 * 176 * @param builder The builder. 177 */ 178 private QueueInputStream(final Builder builder) { 179 this.blockingQueue = Objects.requireNonNull(builder.blockingQueue, "blockingQueue"); 180 this.timeoutNanos = Objects.requireNonNull(builder.timeout, "timeout").toNanos(); 181 } 182 183 /** 184 * Gets the blocking queue. 185 * 186 * @return the blocking queue. 187 */ 188 BlockingQueue<Integer> getBlockingQueue() { 189 return blockingQueue; 190 } 191 192 /** 193 * Gets the timeout duration. 194 * 195 * @return the timeout duration. 196 */ 197 Duration getTimeout() { 198 return Duration.ofNanos(timeoutNanos); 199 } 200 201 /** 202 * Constructs a new QueueOutputStream instance connected to this. Writes to the output stream will be visible to this input stream. 203 * 204 * @return QueueOutputStream connected to this stream. 205 */ 206 public QueueOutputStream newQueueOutputStream() { 207 return new QueueOutputStream(blockingQueue); 208 } 209 210 /** 211 * Reads and returns a single byte. 212 * 213 * @return the byte read, or {@code -1} if a timeout occurs before a queue element is available. 214 * @throws IllegalStateException if thread is interrupted while waiting. 215 */ 216 @Override 217 public int read() { 218 try { 219 final Integer value = blockingQueue.poll(timeoutNanos, TimeUnit.NANOSECONDS); 220 return value == null ? EOF : 0xFF & value; 221 } catch (final InterruptedException e) { 222 Thread.currentThread().interrupt(); 223 // throw runtime unchecked exception to maintain signature backward-compatibility of 224 // this read method, which does not declare IOException 225 throw new IllegalStateException(e); 226 } 227 } 228 229 /** 230 * Reads up to {@code length} bytes of data from the input stream into 231 * an array of bytes. The first byte is read while honoring the timeout; the rest are read while <i>not</i> honoring 232 * the timeout. The number of bytes actually read is returned as an integer. 233 * 234 * @param b the buffer into which the data is read. 235 * @param offset the start offset in array {@code b} at which the data is written. 236 * @param length the maximum number of bytes to read. 237 * @return the total number of bytes read into the buffer, or {@code -1} if there is no more data because the 238 * end of the stream has been reached. 239 * @throws NullPointerException If {@code b} is {@code null}. 240 * @throws IllegalStateException if thread is interrupted while waiting for the first byte. 241 * @throws IndexOutOfBoundsException if {@code offset} is negative, {@code length} is negative, or {@code length} is 242 * greater than {@code b.length - offset}. 243 * @since 2.20.0 244 */ 245 @Override 246 public int read(final byte[] b, final int offset, final int length) { 247 if (b == null) { 248 throw new NullPointerException(); 249 } 250 if (offset < 0 || length < 0 || length > b.length - offset) { 251 throw new IndexOutOfBoundsException( 252 String.format("Range [%d, %<d + %d) out of bounds for length %d", offset, length, b.length)); 253 } 254 if (length == 0) { 255 return 0; 256 } 257 final List<Integer> drain = new ArrayList<>(Math.min(length, blockingQueue.size())); 258 blockingQueue.drainTo(drain, length); 259 if (drain.isEmpty()) { 260 // no data immediately available. wait for first byte 261 final int value = read(); 262 if (value == EOF) { 263 return EOF; 264 } 265 drain.add(value); 266 blockingQueue.drainTo(drain, length - 1); 267 } 268 int i = 0; 269 for (final Integer value : drain) { 270 b[offset + i] = (byte) (0xFF & value); 271 i++; 272 } 273 return i; 274 } 275 276 }