1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17 package org.apache.commons.io.input;
18
19 import static org.apache.commons.io.IOUtils.EOF;
20
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.PipedInputStream;
24 import java.io.PipedOutputStream;
25 import java.time.Duration;
26 import java.util.ArrayList;
27 import java.util.List;
28 import java.util.Objects;
29 import java.util.concurrent.BlockingQueue;
30 import java.util.concurrent.LinkedBlockingQueue;
31 import java.util.concurrent.TimeUnit;
32
33 import org.apache.commons.io.IOUtils;
34 import org.apache.commons.io.build.AbstractStreamBuilder;
35 import org.apache.commons.io.output.QueueOutputStream;
36
37 /**
38 * Simple alternative to JDK {@link PipedInputStream}; queue input stream provides what's written in queue output stream.
39 * <p>
40 * To build an instance, use {@link Builder}.
41 * </p>
42 * <p>
43 * Example usage:
44 * </p>
45 * <pre>
46 * QueueInputStream inputStream = new QueueInputStream();
47 * QueueOutputStream outputStream = inputStream.newQueueOutputStream();
48 *
49 * outputStream.write("hello world".getBytes(UTF_8));
50 * inputStream.read();
51 * </pre>
52 * <p>
53 * Unlike JDK {@link PipedInputStream} and {@link PipedOutputStream}, queue input/output streams may be used safely in a single thread or multiple threads.
54 * Also, unlike JDK classes, no special meaning is attached to initial or current thread. Instances can be used longer after initial threads exited.
55 * </p>
56 * <p>
57 * Closing a {@link QueueInputStream} has no effect. The methods in this class can be called after the stream has been closed without generating an
58 * {@link IOException}.
59 * </p>
60 *
61 * @see Builder
62 * @see QueueOutputStream
63 * @since 2.9.0
64 */
65 public class QueueInputStream extends InputStream {
66
67 // @formatter:off
68 /**
69 * Builds a new {@link QueueInputStream}.
70 *
71 * <p>
72 * For example:
73 * </p>
74 * <pre>{@code
75 * QueueInputStream s = QueueInputStream.builder()
76 * .setBlockingQueue(new LinkedBlockingQueue<>())
77 * .setTimeout(Duration.ZERO)
78 * .get();}
79 * </pre>
80 *
81 * @see #get()
82 * @since 2.12.0
83 */
84 // @formatter:on
85 public static class Builder extends AbstractStreamBuilder<QueueInputStream, Builder> {
86
87 private BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>();
88 private Duration timeout = Duration.ZERO;
89
90 /**
91 * Constructs a new builder of {@link QueueInputStream}.
92 */
93 public Builder() {
94 // empty
95 }
96
97 /**
98 * Builds a new {@link QueueInputStream}.
99 * <p>
100 * This builder uses the following aspects:
101 * </p>
102 * <ul>
103 * <li>{@link #setBlockingQueue(BlockingQueue)}</li>
104 * <li>timeout</li>
105 * </ul>
106 *
107 * @return a new instance.
108 * @see #getUnchecked()
109 */
110 @Override
111 public QueueInputStream get() {
112 return new QueueInputStream(this);
113 }
114
115 /**
116 * Sets backing queue for the stream.
117 *
118 * @param blockingQueue backing queue for the stream, null resets to a new blocking queue instance.
119 * @return {@code this} instance.
120 */
121 public Builder setBlockingQueue(final BlockingQueue<Integer> blockingQueue) {
122 this.blockingQueue = blockingQueue != null ? blockingQueue : new LinkedBlockingQueue<>();
123 return this;
124 }
125
126 /**
127 * Sets the polling timeout.
128 *
129 * @param timeout the polling timeout.
130 * @return {@code this} instance.
131 */
132 public Builder setTimeout(final Duration timeout) {
133 if (timeout != null && timeout.toNanos() < 0) {
134 throw new IllegalArgumentException("timeout must not be negative");
135 }
136 this.timeout = timeout != null ? timeout : Duration.ZERO;
137 return this;
138 }
139
140 }
141
142 /**
143 * Constructs a new {@link Builder}.
144 *
145 * @return a new {@link Builder}.
146 * @since 2.12.0
147 */
148 public static Builder builder() {
149 return new Builder();
150 }
151
152 private final BlockingQueue<Integer> blockingQueue;
153
154 private final long timeoutNanos;
155
156 /**
157 * Constructs a new instance with no limit to its internal queue size and zero timeout.
158 */
159 public QueueInputStream() {
160 this(new LinkedBlockingQueue<>());
161 }
162
163 /**
164 * Constructs a new instance with given queue and zero timeout.
165 *
166 * @param blockingQueue backing queue for the stream, null maps to a new blocking queue instance.
167 * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}.
168 */
169 @Deprecated
170 public QueueInputStream(final BlockingQueue<Integer> blockingQueue) {
171 this(builder().setBlockingQueue(blockingQueue));
172 }
173
174 /**
175 * Constructs a new instance.
176 *
177 * @param builder The builder.
178 */
179 private QueueInputStream(final Builder builder) {
180 this.blockingQueue = Objects.requireNonNull(builder.blockingQueue, "blockingQueue");
181 this.timeoutNanos = Objects.requireNonNull(builder.timeout, "timeout").toNanos();
182 }
183
184 /**
185 * Gets the blocking queue.
186 *
187 * @return the blocking queue.
188 */
189 BlockingQueue<Integer> getBlockingQueue() {
190 return blockingQueue;
191 }
192
193 /**
194 * Gets the timeout duration.
195 *
196 * @return the timeout duration.
197 */
198 Duration getTimeout() {
199 return Duration.ofNanos(timeoutNanos);
200 }
201
202 /**
203 * Constructs a new QueueOutputStream instance connected to this. Writes to the output stream will be visible to this input stream.
204 *
205 * @return QueueOutputStream connected to this stream.
206 */
207 public QueueOutputStream newQueueOutputStream() {
208 return new QueueOutputStream(blockingQueue);
209 }
210
211 /**
212 * Reads and returns a single byte.
213 *
214 * @return the byte read, or {@code -1} if a timeout occurs before a queue element is available.
215 * @throws IllegalStateException if thread is interrupted while waiting.
216 */
217 @Override
218 public int read() {
219 try {
220 final Integer value = blockingQueue.poll(timeoutNanos, TimeUnit.NANOSECONDS);
221 return value == null ? EOF : 0xFF & value;
222 } catch (final InterruptedException e) {
223 Thread.currentThread().interrupt();
224 // throw runtime unchecked exception to maintain signature backward-compatibility of
225 // this read method, which does not declare IOException
226 throw new IllegalStateException(e);
227 }
228 }
229
230 /**
231 * Reads up to {@code length} bytes of data from the input stream into
232 * an array of bytes. The first byte is read while honoring the timeout; the rest are read while <i>not</i> honoring
233 * the timeout. The number of bytes actually read is returned as an integer.
234 *
235 * @param b the buffer into which the data is read.
236 * @param offset the start offset in array {@code b} at which the data is written.
237 * @param length the maximum number of bytes to read.
238 * @return the total number of bytes read into the buffer, or {@code -1} if there is no more data because the
239 * end of the stream has been reached.
240 * @throws NullPointerException If {@code b} is {@code null}.
241 * @throws IllegalStateException if thread is interrupted while waiting for the first byte.
242 * @throws IndexOutOfBoundsException if {@code offset} is negative, {@code length} is negative, or {@code length} is
243 * greater than {@code b.length - offset}.
244 * @since 2.20.0
245 */
246 @Override
247 public int read(final byte[] b, final int offset, final int length) {
248 IOUtils.checkFromIndexSize(b, offset, length);
249 if (length == 0) {
250 return 0;
251 }
252 final List<Integer> drain = new ArrayList<>(Math.min(length, blockingQueue.size()));
253 blockingQueue.drainTo(drain, length);
254 if (drain.isEmpty()) {
255 // no data immediately available. wait for first byte
256 final int value = read();
257 if (value == EOF) {
258 return EOF;
259 }
260 drain.add(value);
261 blockingQueue.drainTo(drain, length - 1);
262 }
263 int i = 0;
264 for (final Integer value : drain) {
265 b[offset + i] = (byte) (0xFF & value);
266 i++;
267 }
268 return i;
269 }
270
271 }