View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.commons.io.input;
18  
19  import static org.apache.commons.io.IOUtils.EOF;
20  
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.PipedInputStream;
24  import java.io.PipedOutputStream;
25  import java.time.Duration;
26  import java.util.Objects;
27  import java.util.concurrent.BlockingQueue;
28  import java.util.concurrent.LinkedBlockingQueue;
29  import java.util.concurrent.TimeUnit;
30  
31  import org.apache.commons.io.build.AbstractStreamBuilder;
32  import org.apache.commons.io.output.QueueOutputStream;
33  
34  /**
35   * Simple alternative to JDK {@link PipedInputStream}; queue input stream provides what's written in queue output stream.
36   * <p>
37   * To build an instance, use {@link Builder}.
38   * </p>
39   * <p>
40   * Example usage:
41   * </p>
42   * <pre>
43   * QueueInputStream inputStream = new QueueInputStream();
44   * QueueOutputStream outputStream = inputStream.newQueueOutputStream();
45   *
46   * outputStream.write("hello world".getBytes(UTF_8));
47   * inputStream.read();
48   * </pre>
49   * <p>
50   * Unlike JDK {@link PipedInputStream} and {@link PipedOutputStream}, queue input/output streams may be used safely in a single thread or multiple threads.
51   * Also, unlike JDK classes, no special meaning is attached to initial or current thread. Instances can be used longer after initial threads exited.
52   * </p>
53   * <p>
54   * Closing a {@link QueueInputStream} has no effect. The methods in this class can be called after the stream has been closed without generating an
55   * {@link IOException}.
56   * </p>
57   *
58   * @see Builder
59   * @see QueueOutputStream
60   * @since 2.9.0
61   */
62  public class QueueInputStream extends InputStream {
63  
64      // @formatter:off
65      /**
66       * Builds a new {@link QueueInputStream}.
67       *
68       * <p>
69       * For example:
70       * </p>
71       * <pre>{@code
72       * QueueInputStream s = QueueInputStream.builder()
73       *   .setBlockingQueue(new LinkedBlockingQueue<>())
74       *   .setTimeout(Duration.ZERO)
75       *   .get();}
76       * </pre>
77       *
78       * @see #get()
79       * @since 2.12.0
80       */
81      // @formatter:on
82      public static class Builder extends AbstractStreamBuilder<QueueInputStream, Builder> {
83  
84          private BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>();
85          private Duration timeout = Duration.ZERO;
86  
87          /**
88           * Constructs a new builder of {@link QueueInputStream}.
89           */
90          public Builder() {
91              // empty
92          }
93  
94          /**
95           * Builds a new {@link QueueInputStream}.
96           * <p>
97           * This builder uses the following aspects:
98           * </p>
99           * <ul>
100          * <li>{@link #setBlockingQueue(BlockingQueue)}</li>
101          * <li>timeout</li>
102          * </ul>
103          *
104          * @return a new instance.
105          * @see #getUnchecked()
106          */
107         @Override
108         public QueueInputStream get() {
109             return new QueueInputStream(this);
110         }
111 
112         /**
113          * Sets backing queue for the stream.
114          *
115          * @param blockingQueue backing queue for the stream, null resets to a new blocking queue instance.
116          * @return {@code this} instance.
117          */
118         public Builder setBlockingQueue(final BlockingQueue<Integer> blockingQueue) {
119             this.blockingQueue = blockingQueue != null ? blockingQueue : new LinkedBlockingQueue<>();
120             return this;
121         }
122 
123         /**
124          * Sets the polling timeout.
125          *
126          * @param timeout the polling timeout.
127          * @return {@code this} instance.
128          */
129         public Builder setTimeout(final Duration timeout) {
130             if (timeout != null && timeout.toNanos() < 0) {
131                 throw new IllegalArgumentException("timeout must not be negative");
132             }
133             this.timeout = timeout != null ? timeout : Duration.ZERO;
134             return this;
135         }
136 
137     }
138 
139     /**
140      * Constructs a new {@link Builder}.
141      *
142      * @return a new {@link Builder}.
143      * @since 2.12.0
144      */
145     public static Builder builder() {
146         return new Builder();
147     }
148 
149     private final BlockingQueue<Integer> blockingQueue;
150 
151     private final long timeoutNanos;
152 
153     /**
154      * Constructs a new instance with no limit to its internal queue size and zero timeout.
155      */
156     public QueueInputStream() {
157         this(new LinkedBlockingQueue<>());
158     }
159 
160     /**
161      * Constructs a new instance with given queue and zero timeout.
162      *
163      * @param blockingQueue backing queue for the stream, null maps to a new blocking queue instance.
164      * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}.
165      */
166     @Deprecated
167     public QueueInputStream(final BlockingQueue<Integer> blockingQueue) {
168         this(builder().setBlockingQueue(blockingQueue));
169     }
170 
171     /**
172      * Constructs a new instance.
173      *
174      * @param builder The builder.
175      */
176     private QueueInputStream(final Builder builder) {
177         this.blockingQueue = Objects.requireNonNull(builder.blockingQueue, "blockingQueue");
178         this.timeoutNanos = Objects.requireNonNull(builder.timeout, "timeout").toNanos();
179     }
180 
181     /**
182      * Gets the blocking queue.
183      *
184      * @return the blocking queue.
185      */
186     BlockingQueue<Integer> getBlockingQueue() {
187         return blockingQueue;
188     }
189 
190     /**
191      * Gets the timeout duration.
192      *
193      * @return the timeout duration.
194      */
195     Duration getTimeout() {
196         return Duration.ofNanos(timeoutNanos);
197     }
198 
199     /**
200      * Constructs a new QueueOutputStream instance connected to this. Writes to the output stream will be visible to this input stream.
201      *
202      * @return QueueOutputStream connected to this stream.
203      */
204     public QueueOutputStream newQueueOutputStream() {
205         return new QueueOutputStream(blockingQueue);
206     }
207 
208     /**
209      * Reads and returns a single byte.
210      *
211      * @return the byte read, or {@code -1} if a timeout occurs before a queue element is available.
212      * @throws IllegalStateException if thread is interrupted while waiting.
213      */
214     @Override
215     public int read() {
216         try {
217             final Integer value = blockingQueue.poll(timeoutNanos, TimeUnit.NANOSECONDS);
218             return value == null ? EOF : 0xFF & value;
219         } catch (final InterruptedException e) {
220             Thread.currentThread().interrupt();
221             // throw runtime unchecked exception to maintain signature backward-compatibility of
222             // this read method, which does not declare IOException
223             throw new IllegalStateException(e);
224         }
225     }
226 
227 }