001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      https://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.commons.io.input;
018
019import static org.apache.commons.io.IOUtils.EOF;
020
021import java.io.IOException;
022import java.io.InputStream;
023import java.io.PipedInputStream;
024import java.io.PipedOutputStream;
025import java.time.Duration;
026import java.util.ArrayList;
027import java.util.List;
028import java.util.Objects;
029import java.util.concurrent.BlockingQueue;
030import java.util.concurrent.LinkedBlockingQueue;
031import java.util.concurrent.TimeUnit;
032
033import org.apache.commons.io.IOUtils;
034import org.apache.commons.io.build.AbstractStreamBuilder;
035import org.apache.commons.io.output.QueueOutputStream;
036
037/**
038 * Simple alternative to JDK {@link PipedInputStream}; queue input stream provides what's written in queue output stream.
039 * <p>
040 * To build an instance, use {@link Builder}.
041 * </p>
042 * <p>
043 * Example usage:
044 * </p>
045 * <pre>
046 * QueueInputStream inputStream = new QueueInputStream();
047 * QueueOutputStream outputStream = inputStream.newQueueOutputStream();
048 *
049 * outputStream.write("hello world".getBytes(UTF_8));
050 * inputStream.read();
051 * </pre>
052 * <p>
053 * Unlike JDK {@link PipedInputStream} and {@link PipedOutputStream}, queue input/output streams may be used safely in a single thread or multiple threads.
054 * Also, unlike JDK classes, no special meaning is attached to initial or current thread. Instances can be used longer after initial threads exited.
055 * </p>
056 * <p>
057 * Closing a {@link QueueInputStream} has no effect. The methods in this class can be called after the stream has been closed without generating an
058 * {@link IOException}.
059 * </p>
060 *
061 * @see Builder
062 * @see QueueOutputStream
063 * @since 2.9.0
064 */
065public class QueueInputStream extends InputStream {
066
067    // @formatter:off
068    /**
069     * Builds a new {@link QueueInputStream}.
070     *
071     * <p>
072     * For example:
073     * </p>
074     * <pre>{@code
075     * QueueInputStream s = QueueInputStream.builder()
076     *   .setBlockingQueue(new LinkedBlockingQueue<>())
077     *   .setTimeout(Duration.ZERO)
078     *   .get();}
079     * </pre>
080     *
081     * @see #get()
082     * @since 2.12.0
083     */
084    // @formatter:on
085    public static class Builder extends AbstractStreamBuilder<QueueInputStream, Builder> {
086
087        private BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>();
088        private Duration timeout = Duration.ZERO;
089
090        /**
091         * Constructs a new builder of {@link QueueInputStream}.
092         */
093        public Builder() {
094            // empty
095        }
096
097        /**
098         * Builds a new {@link QueueInputStream}.
099         * <p>
100         * This builder uses the following aspects:
101         * </p>
102         * <ul>
103         * <li>{@link #setBlockingQueue(BlockingQueue)}</li>
104         * <li>timeout</li>
105         * </ul>
106         *
107         * @return a new instance.
108         * @see #getUnchecked()
109         */
110        @Override
111        public QueueInputStream get() {
112            return new QueueInputStream(this);
113        }
114
115        /**
116         * Sets backing queue for the stream.
117         *
118         * @param blockingQueue backing queue for the stream, null resets to a new blocking queue instance.
119         * @return {@code this} instance.
120         */
121        public Builder setBlockingQueue(final BlockingQueue<Integer> blockingQueue) {
122            this.blockingQueue = blockingQueue != null ? blockingQueue : new LinkedBlockingQueue<>();
123            return this;
124        }
125
126        /**
127         * Sets the polling timeout.
128         *
129         * @param timeout the polling timeout.
130         * @return {@code this} instance.
131         */
132        public Builder setTimeout(final Duration timeout) {
133            if (timeout != null && timeout.toNanos() < 0) {
134                throw new IllegalArgumentException("timeout must not be negative");
135            }
136            this.timeout = timeout != null ? timeout : Duration.ZERO;
137            return this;
138        }
139
140    }
141
142    /**
143     * Constructs a new {@link Builder}.
144     *
145     * @return a new {@link Builder}.
146     * @since 2.12.0
147     */
148    public static Builder builder() {
149        return new Builder();
150    }
151
152    private final BlockingQueue<Integer> blockingQueue;
153
154    private final long timeoutNanos;
155
156    /**
157     * Constructs a new instance with no limit to its internal queue size and zero timeout.
158     */
159    public QueueInputStream() {
160        this(new LinkedBlockingQueue<>());
161    }
162
163    /**
164     * Constructs a new instance with given queue and zero timeout.
165     *
166     * @param blockingQueue backing queue for the stream, null maps to a new blocking queue instance.
167     * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}.
168     */
169    @Deprecated
170    public QueueInputStream(final BlockingQueue<Integer> blockingQueue) {
171        this(builder().setBlockingQueue(blockingQueue));
172    }
173
174    /**
175     * Constructs a new instance.
176     *
177     * @param builder The builder.
178     */
179    private QueueInputStream(final Builder builder) {
180        this.blockingQueue = Objects.requireNonNull(builder.blockingQueue, "blockingQueue");
181        this.timeoutNanos = Objects.requireNonNull(builder.timeout, "timeout").toNanos();
182    }
183
184    /**
185     * Gets the blocking queue.
186     *
187     * @return the blocking queue.
188     */
189    BlockingQueue<Integer> getBlockingQueue() {
190        return blockingQueue;
191    }
192
193    /**
194     * Gets the timeout duration.
195     *
196     * @return the timeout duration.
197     */
198    Duration getTimeout() {
199        return Duration.ofNanos(timeoutNanos);
200    }
201
202    /**
203     * Constructs a new QueueOutputStream instance connected to this. Writes to the output stream will be visible to this input stream.
204     *
205     * @return QueueOutputStream connected to this stream.
206     */
207    public QueueOutputStream newQueueOutputStream() {
208        return new QueueOutputStream(blockingQueue);
209    }
210
211    /**
212     * Reads and returns a single byte.
213     *
214     * @return the byte read, or {@code -1} if a timeout occurs before a queue element is available.
215     * @throws IllegalStateException if thread is interrupted while waiting.
216     */
217    @Override
218    public int read() {
219        try {
220            final Integer value = blockingQueue.poll(timeoutNanos, TimeUnit.NANOSECONDS);
221            return value == null ? EOF : 0xFF & value;
222        } catch (final InterruptedException e) {
223            Thread.currentThread().interrupt();
224            // throw runtime unchecked exception to maintain signature backward-compatibility of
225            // this read method, which does not declare IOException
226            throw new IllegalStateException(e);
227        }
228    }
229
230    /**
231     * Reads up to {@code length} bytes of data from the input stream into
232     * an array of bytes.  The first byte is read while honoring the timeout; the rest are read while <i>not</i> honoring
233     * the timeout. The number of bytes actually read is returned as an integer.
234     *
235     * @param b     the buffer into which the data is read.
236     * @param offset   the start offset in array {@code b} at which the data is written.
237     * @param length   the maximum number of bytes to read.
238     * @return     the total number of bytes read into the buffer, or {@code -1} if there is no more data because the
239     *              end of the stream has been reached.
240     * @throws NullPointerException If {@code b} is {@code null}.
241     * @throws IllegalStateException if thread is interrupted while waiting for the first byte.
242     * @throws IndexOutOfBoundsException if {@code offset} is negative, {@code length} is negative, or {@code length} is
243     *             greater than {@code b.length - offset}.
244     * @since 2.20.0
245     */
246    @Override
247    public int read(final byte[] b, final int offset, final int length) {
248        IOUtils.checkFromIndexSize(b, offset, length);
249        if (length == 0) {
250            return 0;
251        }
252        final List<Integer> drain = new ArrayList<>(Math.min(length, blockingQueue.size()));
253        blockingQueue.drainTo(drain, length);
254        if (drain.isEmpty()) {
255            // no data immediately available. wait for first byte
256            final int value = read();
257            if (value == EOF) {
258                return EOF;
259            }
260            drain.add(value);
261            blockingQueue.drainTo(drain, length - 1);
262        }
263        int i = 0;
264        for (final Integer value : drain) {
265            b[offset + i] = (byte) (0xFF & value);
266            i++;
267        }
268        return i;
269    }
270
271}