View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *      https://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.commons.io.input;
18  
19  import static org.apache.commons.io.IOUtils.EOF;
20  
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.PipedInputStream;
24  import java.io.PipedOutputStream;
25  import java.time.Duration;
26  import java.util.ArrayList;
27  import java.util.List;
28  import java.util.Objects;
29  import java.util.concurrent.BlockingQueue;
30  import java.util.concurrent.LinkedBlockingQueue;
31  import java.util.concurrent.TimeUnit;
32  
33  import org.apache.commons.io.build.AbstractStreamBuilder;
34  import org.apache.commons.io.output.QueueOutputStream;
35  
36  /**
37   * Simple alternative to JDK {@link PipedInputStream}; queue input stream provides what's written in queue output stream.
38   * <p>
39   * To build an instance, use {@link Builder}.
40   * </p>
41   * <p>
42   * Example usage:
43   * </p>
44   * <pre>
45   * QueueInputStream inputStream = new QueueInputStream();
46   * QueueOutputStream outputStream = inputStream.newQueueOutputStream();
47   *
48   * outputStream.write("hello world".getBytes(UTF_8));
49   * inputStream.read();
50   * </pre>
51   * <p>
52   * Unlike JDK {@link PipedInputStream} and {@link PipedOutputStream}, queue input/output streams may be used safely in a single thread or multiple threads.
53   * Also, unlike JDK classes, no special meaning is attached to initial or current thread. Instances can be used longer after initial threads exited.
54   * </p>
55   * <p>
56   * Closing a {@link QueueInputStream} has no effect. The methods in this class can be called after the stream has been closed without generating an
57   * {@link IOException}.
58   * </p>
59   *
60   * @see Builder
61   * @see QueueOutputStream
62   * @since 2.9.0
63   */
64  public class QueueInputStream extends InputStream {
65  
66      // @formatter:off
67      /**
68       * Builds a new {@link QueueInputStream}.
69       *
70       * <p>
71       * For example:
72       * </p>
73       * <pre>{@code
74       * QueueInputStream s = QueueInputStream.builder()
75       *   .setBlockingQueue(new LinkedBlockingQueue<>())
76       *   .setTimeout(Duration.ZERO)
77       *   .get();}
78       * </pre>
79       *
80       * @see #get()
81       * @since 2.12.0
82       */
83      // @formatter:on
84      public static class Builder extends AbstractStreamBuilder<QueueInputStream, Builder> {
85  
86          private BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>();
87          private Duration timeout = Duration.ZERO;
88  
89          /**
90           * Constructs a new builder of {@link QueueInputStream}.
91           */
92          public Builder() {
93              // empty
94          }
95  
96          /**
97           * Builds a new {@link QueueInputStream}.
98           * <p>
99           * This builder uses the following aspects:
100          * </p>
101          * <ul>
102          * <li>{@link #setBlockingQueue(BlockingQueue)}</li>
103          * <li>timeout</li>
104          * </ul>
105          *
106          * @return a new instance.
107          * @see #getUnchecked()
108          */
109         @Override
110         public QueueInputStream get() {
111             return new QueueInputStream(this);
112         }
113 
114         /**
115          * Sets backing queue for the stream.
116          *
117          * @param blockingQueue backing queue for the stream, null resets to a new blocking queue instance.
118          * @return {@code this} instance.
119          */
120         public Builder setBlockingQueue(final BlockingQueue<Integer> blockingQueue) {
121             this.blockingQueue = blockingQueue != null ? blockingQueue : new LinkedBlockingQueue<>();
122             return this;
123         }
124 
125         /**
126          * Sets the polling timeout.
127          *
128          * @param timeout the polling timeout.
129          * @return {@code this} instance.
130          */
131         public Builder setTimeout(final Duration timeout) {
132             if (timeout != null && timeout.toNanos() < 0) {
133                 throw new IllegalArgumentException("timeout must not be negative");
134             }
135             this.timeout = timeout != null ? timeout : Duration.ZERO;
136             return this;
137         }
138 
139     }
140 
141     /**
142      * Constructs a new {@link Builder}.
143      *
144      * @return a new {@link Builder}.
145      * @since 2.12.0
146      */
147     public static Builder builder() {
148         return new Builder();
149     }
150 
151     private final BlockingQueue<Integer> blockingQueue;
152 
153     private final long timeoutNanos;
154 
155     /**
156      * Constructs a new instance with no limit to its internal queue size and zero timeout.
157      */
158     public QueueInputStream() {
159         this(new LinkedBlockingQueue<>());
160     }
161 
162     /**
163      * Constructs a new instance with given queue and zero timeout.
164      *
165      * @param blockingQueue backing queue for the stream, null maps to a new blocking queue instance.
166      * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}.
167      */
168     @Deprecated
169     public QueueInputStream(final BlockingQueue<Integer> blockingQueue) {
170         this(builder().setBlockingQueue(blockingQueue));
171     }
172 
173     /**
174      * Constructs a new instance.
175      *
176      * @param builder The builder.
177      */
178     private QueueInputStream(final Builder builder) {
179         this.blockingQueue = Objects.requireNonNull(builder.blockingQueue, "blockingQueue");
180         this.timeoutNanos = Objects.requireNonNull(builder.timeout, "timeout").toNanos();
181     }
182 
183     /**
184      * Gets the blocking queue.
185      *
186      * @return the blocking queue.
187      */
188     BlockingQueue<Integer> getBlockingQueue() {
189         return blockingQueue;
190     }
191 
192     /**
193      * Gets the timeout duration.
194      *
195      * @return the timeout duration.
196      */
197     Duration getTimeout() {
198         return Duration.ofNanos(timeoutNanos);
199     }
200 
201     /**
202      * Constructs a new QueueOutputStream instance connected to this. Writes to the output stream will be visible to this input stream.
203      *
204      * @return QueueOutputStream connected to this stream.
205      */
206     public QueueOutputStream newQueueOutputStream() {
207         return new QueueOutputStream(blockingQueue);
208     }
209 
210     /**
211      * Reads and returns a single byte.
212      *
213      * @return the byte read, or {@code -1} if a timeout occurs before a queue element is available.
214      * @throws IllegalStateException if thread is interrupted while waiting.
215      */
216     @Override
217     public int read() {
218         try {
219             final Integer value = blockingQueue.poll(timeoutNanos, TimeUnit.NANOSECONDS);
220             return value == null ? EOF : 0xFF & value;
221         } catch (final InterruptedException e) {
222             Thread.currentThread().interrupt();
223             // throw runtime unchecked exception to maintain signature backward-compatibility of
224             // this read method, which does not declare IOException
225             throw new IllegalStateException(e);
226         }
227     }
228 
229     /**
230      * Reads up to {@code length} bytes of data from the input stream into
231      * an array of bytes.  The first byte is read while honoring the timeout; the rest are read while <i>not</i> honoring
232      * the timeout. The number of bytes actually read is returned as an integer.
233      *
234      * @param b     the buffer into which the data is read.
235      * @param offset   the start offset in array {@code b} at which the data is written.
236      * @param length   the maximum number of bytes to read.
237      * @return     the total number of bytes read into the buffer, or {@code -1} if there is no more data because the
238      *              end of the stream has been reached.
239      * @throws NullPointerException If {@code b} is {@code null}.
240      * @throws IllegalStateException if thread is interrupted while waiting for the first byte.
241      * @throws IndexOutOfBoundsException if {@code offset} is negative, {@code length} is negative, or {@code length} is
242      *             greater than {@code b.length - offset}.
243      * @since 2.20.0
244      */
245     @Override
246     public int read(final byte[] b, final int offset, final int length) {
247         if (b == null) {
248             throw new NullPointerException();
249         }
250         if (offset < 0 || length < 0 || length > b.length - offset) {
251             throw new IndexOutOfBoundsException(
252                     String.format("Range [%d, %<d + %d) out of bounds for length %d", offset, length, b.length));
253         }
254         if (length == 0) {
255             return 0;
256         }
257         final List<Integer> drain = new ArrayList<>(Math.min(length, blockingQueue.size()));
258         blockingQueue.drainTo(drain, length);
259         if (drain.isEmpty()) {
260             // no data immediately available. wait for first byte
261             final int value = read();
262             if (value == EOF) {
263                 return EOF;
264             }
265             drain.add(value);
266             blockingQueue.drainTo(drain, length - 1);
267         }
268         int i = 0;
269         for (final Integer value : drain) {
270             b[offset + i] = (byte) (0xFF & value);
271             i++;
272         }
273         return i;
274     }
275 
276 }