1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17 package org.apache.commons.io.input;
18
19 import static org.apache.commons.io.IOUtils.EOF;
20
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.PipedInputStream;
24 import java.io.PipedOutputStream;
25 import java.time.Duration;
26 import java.util.ArrayList;
27 import java.util.List;
28 import java.util.Objects;
29 import java.util.concurrent.BlockingQueue;
30 import java.util.concurrent.LinkedBlockingQueue;
31 import java.util.concurrent.TimeUnit;
32
33 import org.apache.commons.io.build.AbstractStreamBuilder;
34 import org.apache.commons.io.output.QueueOutputStream;
35
36 /**
37 * Simple alternative to JDK {@link PipedInputStream}; queue input stream provides what's written in queue output stream.
38 * <p>
39 * To build an instance, use {@link Builder}.
40 * </p>
41 * <p>
42 * Example usage:
43 * </p>
44 * <pre>
45 * QueueInputStream inputStream = new QueueInputStream();
46 * QueueOutputStream outputStream = inputStream.newQueueOutputStream();
47 *
48 * outputStream.write("hello world".getBytes(UTF_8));
49 * inputStream.read();
50 * </pre>
51 * <p>
52 * Unlike JDK {@link PipedInputStream} and {@link PipedOutputStream}, queue input/output streams may be used safely in a single thread or multiple threads.
53 * Also, unlike JDK classes, no special meaning is attached to initial or current thread. Instances can be used longer after initial threads exited.
54 * </p>
55 * <p>
56 * Closing a {@link QueueInputStream} has no effect. The methods in this class can be called after the stream has been closed without generating an
57 * {@link IOException}.
58 * </p>
59 *
60 * @see Builder
61 * @see QueueOutputStream
62 * @since 2.9.0
63 */
64 public class QueueInputStream extends InputStream {
65
66 // @formatter:off
67 /**
68 * Builds a new {@link QueueInputStream}.
69 *
70 * <p>
71 * For example:
72 * </p>
73 * <pre>{@code
74 * QueueInputStream s = QueueInputStream.builder()
75 * .setBlockingQueue(new LinkedBlockingQueue<>())
76 * .setTimeout(Duration.ZERO)
77 * .get();}
78 * </pre>
79 *
80 * @see #get()
81 * @since 2.12.0
82 */
83 // @formatter:on
84 public static class Builder extends AbstractStreamBuilder<QueueInputStream, Builder> {
85
86 private BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>();
87 private Duration timeout = Duration.ZERO;
88
89 /**
90 * Constructs a new builder of {@link QueueInputStream}.
91 */
92 public Builder() {
93 // empty
94 }
95
96 /**
97 * Builds a new {@link QueueInputStream}.
98 * <p>
99 * This builder uses the following aspects:
100 * </p>
101 * <ul>
102 * <li>{@link #setBlockingQueue(BlockingQueue)}</li>
103 * <li>timeout</li>
104 * </ul>
105 *
106 * @return a new instance.
107 * @see #getUnchecked()
108 */
109 @Override
110 public QueueInputStream get() {
111 return new QueueInputStream(this);
112 }
113
114 /**
115 * Sets backing queue for the stream.
116 *
117 * @param blockingQueue backing queue for the stream, null resets to a new blocking queue instance.
118 * @return {@code this} instance.
119 */
120 public Builder setBlockingQueue(final BlockingQueue<Integer> blockingQueue) {
121 this.blockingQueue = blockingQueue != null ? blockingQueue : new LinkedBlockingQueue<>();
122 return this;
123 }
124
125 /**
126 * Sets the polling timeout.
127 *
128 * @param timeout the polling timeout.
129 * @return {@code this} instance.
130 */
131 public Builder setTimeout(final Duration timeout) {
132 if (timeout != null && timeout.toNanos() < 0) {
133 throw new IllegalArgumentException("timeout must not be negative");
134 }
135 this.timeout = timeout != null ? timeout : Duration.ZERO;
136 return this;
137 }
138
139 }
140
141 /**
142 * Constructs a new {@link Builder}.
143 *
144 * @return a new {@link Builder}.
145 * @since 2.12.0
146 */
147 public static Builder builder() {
148 return new Builder();
149 }
150
151 private final BlockingQueue<Integer> blockingQueue;
152
153 private final long timeoutNanos;
154
155 /**
156 * Constructs a new instance with no limit to its internal queue size and zero timeout.
157 */
158 public QueueInputStream() {
159 this(new LinkedBlockingQueue<>());
160 }
161
162 /**
163 * Constructs a new instance with given queue and zero timeout.
164 *
165 * @param blockingQueue backing queue for the stream, null maps to a new blocking queue instance.
166 * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}.
167 */
168 @Deprecated
169 public QueueInputStream(final BlockingQueue<Integer> blockingQueue) {
170 this(builder().setBlockingQueue(blockingQueue));
171 }
172
173 /**
174 * Constructs a new instance.
175 *
176 * @param builder The builder.
177 */
178 private QueueInputStream(final Builder builder) {
179 this.blockingQueue = Objects.requireNonNull(builder.blockingQueue, "blockingQueue");
180 this.timeoutNanos = Objects.requireNonNull(builder.timeout, "timeout").toNanos();
181 }
182
183 /**
184 * Gets the blocking queue.
185 *
186 * @return the blocking queue.
187 */
188 BlockingQueue<Integer> getBlockingQueue() {
189 return blockingQueue;
190 }
191
192 /**
193 * Gets the timeout duration.
194 *
195 * @return the timeout duration.
196 */
197 Duration getTimeout() {
198 return Duration.ofNanos(timeoutNanos);
199 }
200
201 /**
202 * Constructs a new QueueOutputStream instance connected to this. Writes to the output stream will be visible to this input stream.
203 *
204 * @return QueueOutputStream connected to this stream.
205 */
206 public QueueOutputStream newQueueOutputStream() {
207 return new QueueOutputStream(blockingQueue);
208 }
209
210 /**
211 * Reads and returns a single byte.
212 *
213 * @return the byte read, or {@code -1} if a timeout occurs before a queue element is available.
214 * @throws IllegalStateException if thread is interrupted while waiting.
215 */
216 @Override
217 public int read() {
218 try {
219 final Integer value = blockingQueue.poll(timeoutNanos, TimeUnit.NANOSECONDS);
220 return value == null ? EOF : 0xFF & value;
221 } catch (final InterruptedException e) {
222 Thread.currentThread().interrupt();
223 // throw runtime unchecked exception to maintain signature backward-compatibility of
224 // this read method, which does not declare IOException
225 throw new IllegalStateException(e);
226 }
227 }
228
229 /**
230 * Reads up to {@code length} bytes of data from the input stream into
231 * an array of bytes. The first byte is read while honoring the timeout; the rest are read while <i>not</i> honoring
232 * the timeout. The number of bytes actually read is returned as an integer.
233 *
234 * @param b the buffer into which the data is read.
235 * @param offset the start offset in array {@code b} at which the data is written.
236 * @param length the maximum number of bytes to read.
237 * @return the total number of bytes read into the buffer, or {@code -1} if there is no more data because the
238 * end of the stream has been reached.
239 * @throws NullPointerException If {@code b} is {@code null}.
240 * @throws IllegalStateException if thread is interrupted while waiting for the first byte.
241 * @throws IndexOutOfBoundsException if {@code offset} is negative, {@code length} is negative, or {@code length} is
242 * greater than {@code b.length - offset}.
243 * @since 2.20.0
244 */
245 @Override
246 public int read(final byte[] b, final int offset, final int length) {
247 if (b == null) {
248 throw new NullPointerException();
249 }
250 if (offset < 0 || length < 0 || length > b.length - offset) {
251 throw new IndexOutOfBoundsException(
252 String.format("Range [%d, %<d + %d) out of bounds for length %d", offset, length, b.length));
253 }
254 if (length == 0) {
255 return 0;
256 }
257 final List<Integer> drain = new ArrayList<>(Math.min(length, blockingQueue.size()));
258 blockingQueue.drainTo(drain, length);
259 if (drain.isEmpty()) {
260 // no data immediately available. wait for first byte
261 final int value = read();
262 if (value == EOF) {
263 return EOF;
264 }
265 drain.add(value);
266 blockingQueue.drainTo(drain, length - 1);
267 }
268 int i = 0;
269 for (final Integer value : drain) {
270 b[offset + i] = (byte) (0xFF & value);
271 i++;
272 }
273 return i;
274 }
275
276 }