001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      https://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.commons.io.input;
018
019import static org.apache.commons.io.IOUtils.EOF;
020
021import java.io.IOException;
022import java.io.InputStream;
023import java.io.PipedInputStream;
024import java.io.PipedOutputStream;
025import java.time.Duration;
026import java.util.ArrayList;
027import java.util.List;
028import java.util.Objects;
029import java.util.concurrent.BlockingQueue;
030import java.util.concurrent.LinkedBlockingQueue;
031import java.util.concurrent.TimeUnit;
032
033import org.apache.commons.io.build.AbstractStreamBuilder;
034import org.apache.commons.io.output.QueueOutputStream;
035
036/**
037 * Simple alternative to JDK {@link PipedInputStream}; queue input stream provides what's written in queue output stream.
038 * <p>
039 * To build an instance, use {@link Builder}.
040 * </p>
041 * <p>
042 * Example usage:
043 * </p>
044 * <pre>
045 * QueueInputStream inputStream = new QueueInputStream();
046 * QueueOutputStream outputStream = inputStream.newQueueOutputStream();
047 *
048 * outputStream.write("hello world".getBytes(UTF_8));
049 * inputStream.read();
050 * </pre>
051 * <p>
052 * Unlike JDK {@link PipedInputStream} and {@link PipedOutputStream}, queue input/output streams may be used safely in a single thread or multiple threads.
053 * Also, unlike JDK classes, no special meaning is attached to initial or current thread. Instances can be used longer after initial threads exited.
054 * </p>
055 * <p>
056 * Closing a {@link QueueInputStream} has no effect. The methods in this class can be called after the stream has been closed without generating an
057 * {@link IOException}.
058 * </p>
059 *
060 * @see Builder
061 * @see QueueOutputStream
062 * @since 2.9.0
063 */
064public class QueueInputStream extends InputStream {
065
066    // @formatter:off
067    /**
068     * Builds a new {@link QueueInputStream}.
069     *
070     * <p>
071     * For example:
072     * </p>
073     * <pre>{@code
074     * QueueInputStream s = QueueInputStream.builder()
075     *   .setBlockingQueue(new LinkedBlockingQueue<>())
076     *   .setTimeout(Duration.ZERO)
077     *   .get();}
078     * </pre>
079     *
080     * @see #get()
081     * @since 2.12.0
082     */
083    // @formatter:on
084    public static class Builder extends AbstractStreamBuilder<QueueInputStream, Builder> {
085
086        private BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>();
087        private Duration timeout = Duration.ZERO;
088
089        /**
090         * Constructs a new builder of {@link QueueInputStream}.
091         */
092        public Builder() {
093            // empty
094        }
095
096        /**
097         * Builds a new {@link QueueInputStream}.
098         * <p>
099         * This builder uses the following aspects:
100         * </p>
101         * <ul>
102         * <li>{@link #setBlockingQueue(BlockingQueue)}</li>
103         * <li>timeout</li>
104         * </ul>
105         *
106         * @return a new instance.
107         * @see #getUnchecked()
108         */
109        @Override
110        public QueueInputStream get() {
111            return new QueueInputStream(this);
112        }
113
114        /**
115         * Sets backing queue for the stream.
116         *
117         * @param blockingQueue backing queue for the stream, null resets to a new blocking queue instance.
118         * @return {@code this} instance.
119         */
120        public Builder setBlockingQueue(final BlockingQueue<Integer> blockingQueue) {
121            this.blockingQueue = blockingQueue != null ? blockingQueue : new LinkedBlockingQueue<>();
122            return this;
123        }
124
125        /**
126         * Sets the polling timeout.
127         *
128         * @param timeout the polling timeout.
129         * @return {@code this} instance.
130         */
131        public Builder setTimeout(final Duration timeout) {
132            if (timeout != null && timeout.toNanos() < 0) {
133                throw new IllegalArgumentException("timeout must not be negative");
134            }
135            this.timeout = timeout != null ? timeout : Duration.ZERO;
136            return this;
137        }
138
139    }
140
141    /**
142     * Constructs a new {@link Builder}.
143     *
144     * @return a new {@link Builder}.
145     * @since 2.12.0
146     */
147    public static Builder builder() {
148        return new Builder();
149    }
150
151    private final BlockingQueue<Integer> blockingQueue;
152
153    private final long timeoutNanos;
154
155    /**
156     * Constructs a new instance with no limit to its internal queue size and zero timeout.
157     */
158    public QueueInputStream() {
159        this(new LinkedBlockingQueue<>());
160    }
161
162    /**
163     * Constructs a new instance with given queue and zero timeout.
164     *
165     * @param blockingQueue backing queue for the stream, null maps to a new blocking queue instance.
166     * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}.
167     */
168    @Deprecated
169    public QueueInputStream(final BlockingQueue<Integer> blockingQueue) {
170        this(builder().setBlockingQueue(blockingQueue));
171    }
172
173    /**
174     * Constructs a new instance.
175     *
176     * @param builder The builder.
177     */
178    private QueueInputStream(final Builder builder) {
179        this.blockingQueue = Objects.requireNonNull(builder.blockingQueue, "blockingQueue");
180        this.timeoutNanos = Objects.requireNonNull(builder.timeout, "timeout").toNanos();
181    }
182
183    /**
184     * Gets the blocking queue.
185     *
186     * @return the blocking queue.
187     */
188    BlockingQueue<Integer> getBlockingQueue() {
189        return blockingQueue;
190    }
191
192    /**
193     * Gets the timeout duration.
194     *
195     * @return the timeout duration.
196     */
197    Duration getTimeout() {
198        return Duration.ofNanos(timeoutNanos);
199    }
200
201    /**
202     * Constructs a new QueueOutputStream instance connected to this. Writes to the output stream will be visible to this input stream.
203     *
204     * @return QueueOutputStream connected to this stream.
205     */
206    public QueueOutputStream newQueueOutputStream() {
207        return new QueueOutputStream(blockingQueue);
208    }
209
210    /**
211     * Reads and returns a single byte.
212     *
213     * @return the byte read, or {@code -1} if a timeout occurs before a queue element is available.
214     * @throws IllegalStateException if thread is interrupted while waiting.
215     */
216    @Override
217    public int read() {
218        try {
219            final Integer value = blockingQueue.poll(timeoutNanos, TimeUnit.NANOSECONDS);
220            return value == null ? EOF : 0xFF & value;
221        } catch (final InterruptedException e) {
222            Thread.currentThread().interrupt();
223            // throw runtime unchecked exception to maintain signature backward-compatibility of
224            // this read method, which does not declare IOException
225            throw new IllegalStateException(e);
226        }
227    }
228
229    /**
230     * Reads up to {@code length} bytes of data from the input stream into
231     * an array of bytes.  The first byte is read while honoring the timeout; the rest are read while <i>not</i> honoring
232     * the timeout. The number of bytes actually read is returned as an integer.
233     *
234     * @param b     the buffer into which the data is read.
235     * @param offset   the start offset in array {@code b} at which the data is written.
236     * @param length   the maximum number of bytes to read.
237     * @return     the total number of bytes read into the buffer, or {@code -1} if there is no more data because the
238     *              end of the stream has been reached.
239     * @throws NullPointerException If {@code b} is {@code null}.
240     * @throws IllegalStateException if thread is interrupted while waiting for the first byte.
241     * @throws IndexOutOfBoundsException if {@code offset} is negative, {@code length} is negative, or {@code length} is
242     *             greater than {@code b.length - offset}.
243     * @since 2.20.0
244     */
245    @Override
246    public int read(final byte[] b, final int offset, final int length) {
247        if (b == null) {
248            throw new NullPointerException();
249        }
250        if (offset < 0 || length < 0 || length > b.length - offset) {
251            throw new IndexOutOfBoundsException(
252                    String.format("Range [%d, %<d + %d) out of bounds for length %d", offset, length, b.length));
253        }
254        if (length == 0) {
255            return 0;
256        }
257        final List<Integer> drain = new ArrayList<>(Math.min(length, blockingQueue.size()));
258        blockingQueue.drainTo(drain, length);
259        if (drain.isEmpty()) {
260            // no data immediately available. wait for first byte
261            final int value = read();
262            if (value == EOF) {
263                return EOF;
264            }
265            drain.add(value);
266            blockingQueue.drainTo(drain, length - 1);
267        }
268        int i = 0;
269        for (final Integer value : drain) {
270            b[offset + i] = (byte) (0xFF & value);
271            i++;
272        }
273        return i;
274    }
275
276}