View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *      https://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.commons.io.input;
18  
19  import static org.apache.commons.io.IOUtils.EOF;
20  
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.PipedInputStream;
24  import java.io.PipedOutputStream;
25  import java.time.Duration;
26  import java.util.ArrayList;
27  import java.util.List;
28  import java.util.Objects;
29  import java.util.concurrent.BlockingQueue;
30  import java.util.concurrent.LinkedBlockingQueue;
31  import java.util.concurrent.TimeUnit;
32  
33  import org.apache.commons.io.IOUtils;
34  import org.apache.commons.io.build.AbstractStreamBuilder;
35  import org.apache.commons.io.output.QueueOutputStream;
36  
37  /**
38   * Simple alternative to JDK {@link PipedInputStream}; queue input stream provides what's written in queue output stream.
39   * <p>
40   * To build an instance, use {@link Builder}.
41   * </p>
42   * <p>
43   * Example usage:
44   * </p>
45   * <pre>
46   * QueueInputStream inputStream = new QueueInputStream();
47   * QueueOutputStream outputStream = inputStream.newQueueOutputStream();
48   *
49   * outputStream.write("hello world".getBytes(UTF_8));
50   * inputStream.read();
51   * </pre>
52   * <p>
53   * Unlike JDK {@link PipedInputStream} and {@link PipedOutputStream}, queue input/output streams may be used safely in a single thread or multiple threads.
54   * Also, unlike JDK classes, no special meaning is attached to initial or current thread. Instances can be used longer after initial threads exited.
55   * </p>
56   * <p>
57   * Closing a {@link QueueInputStream} has no effect. The methods in this class can be called after the stream has been closed without generating an
58   * {@link IOException}.
59   * </p>
60   *
61   * @see Builder
62   * @see QueueOutputStream
63   * @since 2.9.0
64   */
65  public class QueueInputStream extends InputStream {
66  
67      // @formatter:off
68      /**
69       * Builds a new {@link QueueInputStream}.
70       *
71       * <p>
72       * For example:
73       * </p>
74       * <pre>{@code
75       * QueueInputStream s = QueueInputStream.builder()
76       *   .setBlockingQueue(new LinkedBlockingQueue<>())
77       *   .setTimeout(Duration.ZERO)
78       *   .get();}
79       * </pre>
80       *
81       * @see #get()
82       * @since 2.12.0
83       */
84      // @formatter:on
85      public static class Builder extends AbstractStreamBuilder<QueueInputStream, Builder> {
86  
87          private BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>();
88          private Duration timeout = Duration.ZERO;
89  
90          /**
91           * Constructs a new builder of {@link QueueInputStream}.
92           */
93          public Builder() {
94              // empty
95          }
96  
97          /**
98           * Builds a new {@link QueueInputStream}.
99           * <p>
100          * This builder uses the following aspects:
101          * </p>
102          * <ul>
103          * <li>{@link #setBlockingQueue(BlockingQueue)}</li>
104          * <li>timeout</li>
105          * </ul>
106          *
107          * @return a new instance.
108          * @see #getUnchecked()
109          */
110         @Override
111         public QueueInputStream get() {
112             return new QueueInputStream(this);
113         }
114 
115         /**
116          * Sets backing queue for the stream.
117          *
118          * @param blockingQueue backing queue for the stream, null resets to a new blocking queue instance.
119          * @return {@code this} instance.
120          */
121         public Builder setBlockingQueue(final BlockingQueue<Integer> blockingQueue) {
122             this.blockingQueue = blockingQueue != null ? blockingQueue : new LinkedBlockingQueue<>();
123             return this;
124         }
125 
126         /**
127          * Sets the polling timeout.
128          *
129          * @param timeout the polling timeout.
130          * @return {@code this} instance.
131          */
132         public Builder setTimeout(final Duration timeout) {
133             if (timeout != null && timeout.toNanos() < 0) {
134                 throw new IllegalArgumentException("timeout must not be negative");
135             }
136             this.timeout = timeout != null ? timeout : Duration.ZERO;
137             return this;
138         }
139 
140     }
141 
142     /**
143      * Constructs a new {@link Builder}.
144      *
145      * @return a new {@link Builder}.
146      * @since 2.12.0
147      */
148     public static Builder builder() {
149         return new Builder();
150     }
151 
152     private final BlockingQueue<Integer> blockingQueue;
153 
154     private final long timeoutNanos;
155 
156     /**
157      * Constructs a new instance with no limit to its internal queue size and zero timeout.
158      */
159     public QueueInputStream() {
160         this(new LinkedBlockingQueue<>());
161     }
162 
163     /**
164      * Constructs a new instance with given queue and zero timeout.
165      *
166      * @param blockingQueue backing queue for the stream, null maps to a new blocking queue instance.
167      * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}.
168      */
169     @Deprecated
170     public QueueInputStream(final BlockingQueue<Integer> blockingQueue) {
171         this(builder().setBlockingQueue(blockingQueue));
172     }
173 
174     /**
175      * Constructs a new instance.
176      *
177      * @param builder The builder.
178      */
179     private QueueInputStream(final Builder builder) {
180         this.blockingQueue = Objects.requireNonNull(builder.blockingQueue, "blockingQueue");
181         this.timeoutNanos = Objects.requireNonNull(builder.timeout, "timeout").toNanos();
182     }
183 
184     /**
185      * Gets the blocking queue.
186      *
187      * @return the blocking queue.
188      */
189     BlockingQueue<Integer> getBlockingQueue() {
190         return blockingQueue;
191     }
192 
193     /**
194      * Gets the timeout duration.
195      *
196      * @return the timeout duration.
197      */
198     Duration getTimeout() {
199         return Duration.ofNanos(timeoutNanos);
200     }
201 
202     /**
203      * Constructs a new QueueOutputStream instance connected to this. Writes to the output stream will be visible to this input stream.
204      *
205      * @return QueueOutputStream connected to this stream.
206      */
207     public QueueOutputStream newQueueOutputStream() {
208         return new QueueOutputStream(blockingQueue);
209     }
210 
211     /**
212      * Reads and returns a single byte.
213      *
214      * @return the byte read, or {@code -1} if a timeout occurs before a queue element is available.
215      * @throws IllegalStateException if thread is interrupted while waiting.
216      */
217     @Override
218     public int read() {
219         try {
220             final Integer value = blockingQueue.poll(timeoutNanos, TimeUnit.NANOSECONDS);
221             return value == null ? EOF : 0xFF & value;
222         } catch (final InterruptedException e) {
223             Thread.currentThread().interrupt();
224             // throw runtime unchecked exception to maintain signature backward-compatibility of
225             // this read method, which does not declare IOException
226             throw new IllegalStateException(e);
227         }
228     }
229 
230     /**
231      * Reads up to {@code length} bytes of data from the input stream into
232      * an array of bytes.  The first byte is read while honoring the timeout; the rest are read while <i>not</i> honoring
233      * the timeout. The number of bytes actually read is returned as an integer.
234      *
235      * @param b     the buffer into which the data is read.
236      * @param offset   the start offset in array {@code b} at which the data is written.
237      * @param length   the maximum number of bytes to read.
238      * @return     the total number of bytes read into the buffer, or {@code -1} if there is no more data because the
239      *              end of the stream has been reached.
240      * @throws NullPointerException If {@code b} is {@code null}.
241      * @throws IllegalStateException if thread is interrupted while waiting for the first byte.
242      * @throws IndexOutOfBoundsException if {@code offset} is negative, {@code length} is negative, or {@code length} is
243      *             greater than {@code b.length - offset}.
244      * @since 2.20.0
245      */
246     @Override
247     public int read(final byte[] b, final int offset, final int length) {
248         IOUtils.checkFromIndexSize(b, offset, length);
249         if (length == 0) {
250             return 0;
251         }
252         final List<Integer> drain = new ArrayList<>(Math.min(length, blockingQueue.size()));
253         blockingQueue.drainTo(drain, length);
254         if (drain.isEmpty()) {
255             // no data immediately available. wait for first byte
256             final int value = read();
257             if (value == EOF) {
258                 return EOF;
259             }
260             drain.add(value);
261             blockingQueue.drainTo(drain, length - 1);
262         }
263         int i = 0;
264         for (final Integer value : drain) {
265             b[offset + i] = (byte) (0xFF & value);
266             i++;
267         }
268         return i;
269     }
270 
271 }