QueueInputStream.java

  1. /*
  2.  * Licensed to the Apache Software Foundation (ASF) under one or more
  3.  * contributor license agreements.  See the NOTICE file distributed with
  4.  * this work for additional information regarding copyright ownership.
  5.  * The ASF licenses this file to You under the Apache License, Version 2.0
  6.  * (the "License"); you may not use this file except in compliance with
  7.  * the License.  You may obtain a copy of the License at
  8.  *
  9.  *      http://www.apache.org/licenses/LICENSE-2.0
  10.  *
  11.  * Unless required by applicable law or agreed to in writing, software
  12.  * distributed under the License is distributed on an "AS IS" BASIS,
  13.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14.  * See the License for the specific language governing permissions and
  15.  * limitations under the License.
  16.  */
  17. package org.apache.commons.io.input;

  18. import static org.apache.commons.io.IOUtils.EOF;

  19. import java.io.IOException;
  20. import java.io.InputStream;
  21. import java.io.PipedInputStream;
  22. import java.io.PipedOutputStream;
  23. import java.time.Duration;
  24. import java.util.Objects;
  25. import java.util.concurrent.BlockingQueue;
  26. import java.util.concurrent.LinkedBlockingQueue;
  27. import java.util.concurrent.TimeUnit;

  28. import org.apache.commons.io.build.AbstractStreamBuilder;
  29. import org.apache.commons.io.output.QueueOutputStream;

  30. /**
  31.  * Simple alternative to JDK {@link PipedInputStream}; queue input stream provides what's written in queue output stream.
  32.  * <p>
  33.  * To build an instance, use {@link Builder}.
  34.  * </p>
  35.  * <p>
  36.  * Example usage:
  37.  * </p>
  38.  * <pre>
  39.  * QueueInputStream inputStream = new QueueInputStream();
  40.  * QueueOutputStream outputStream = inputStream.newQueueOutputStream();
  41.  *
  42.  * outputStream.write("hello world".getBytes(UTF_8));
  43.  * inputStream.read();
  44.  * </pre>
  45.  * <p>
  46.  * Unlike JDK {@link PipedInputStream} and {@link PipedOutputStream}, queue input/output streams may be used safely in a single thread or multiple threads.
  47.  * Also, unlike JDK classes, no special meaning is attached to initial or current thread. Instances can be used longer after initial threads exited.
  48.  * </p>
  49.  * <p>
  50.  * Closing a {@link QueueInputStream} has no effect. The methods in this class can be called after the stream has been closed without generating an
  51.  * {@link IOException}.
  52.  * </p>
  53.  *
  54.  * @see Builder
  55.  * @see QueueOutputStream
  56.  * @since 2.9.0
  57.  */
  58. public class QueueInputStream extends InputStream {

  59.     // @formatter:off
  60.     /**
  61.      * Builds a new {@link QueueInputStream}.
  62.      *
  63.      * <p>
  64.      * For example:
  65.      * </p>
  66.      * <pre>{@code
  67.      * QueueInputStream s = QueueInputStream.builder()
  68.      *   .setBlockingQueue(new LinkedBlockingQueue<>())
  69.      *   .setTimeout(Duration.ZERO)
  70.      *   .get();}
  71.      * </pre>
  72.      *
  73.      * @see #get()
  74.      * @since 2.12.0
  75.      */
  76.     // @formatter:on
  77.     public static class Builder extends AbstractStreamBuilder<QueueInputStream, Builder> {

  78.         private BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>();
  79.         private Duration timeout = Duration.ZERO;

  80.         /**
  81.          * Constructs a new builder of {@link QueueInputStream}.
  82.          */
  83.         public Builder() {
  84.             // empty
  85.         }

  86.         /**
  87.          * Builds a new {@link QueueInputStream}.
  88.          * <p>
  89.          * This builder uses the following aspects:
  90.          * </p>
  91.          * <ul>
  92.          * <li>{@link #setBlockingQueue(BlockingQueue)}</li>
  93.          * <li>timeout</li>
  94.          * </ul>
  95.          *
  96.          * @return a new instance.
  97.          * @see #getUnchecked()
  98.          */
  99.         @Override
  100.         public QueueInputStream get() {
  101.             return new QueueInputStream(this);
  102.         }

  103.         /**
  104.          * Sets backing queue for the stream.
  105.          *
  106.          * @param blockingQueue backing queue for the stream, null resets to a new blocking queue instance.
  107.          * @return {@code this} instance.
  108.          */
  109.         public Builder setBlockingQueue(final BlockingQueue<Integer> blockingQueue) {
  110.             this.blockingQueue = blockingQueue != null ? blockingQueue : new LinkedBlockingQueue<>();
  111.             return this;
  112.         }

  113.         /**
  114.          * Sets the polling timeout.
  115.          *
  116.          * @param timeout the polling timeout.
  117.          * @return {@code this} instance.
  118.          */
  119.         public Builder setTimeout(final Duration timeout) {
  120.             if (timeout != null && timeout.toNanos() < 0) {
  121.                 throw new IllegalArgumentException("timeout must not be negative");
  122.             }
  123.             this.timeout = timeout != null ? timeout : Duration.ZERO;
  124.             return this;
  125.         }

  126.     }

  127.     /**
  128.      * Constructs a new {@link Builder}.
  129.      *
  130.      * @return a new {@link Builder}.
  131.      * @since 2.12.0
  132.      */
  133.     public static Builder builder() {
  134.         return new Builder();
  135.     }

  136.     private final BlockingQueue<Integer> blockingQueue;

  137.     private final long timeoutNanos;

  138.     /**
  139.      * Constructs a new instance with no limit to its internal queue size and zero timeout.
  140.      */
  141.     public QueueInputStream() {
  142.         this(new LinkedBlockingQueue<>());
  143.     }

  144.     /**
  145.      * Constructs a new instance with given queue and zero timeout.
  146.      *
  147.      * @param blockingQueue backing queue for the stream, null maps to a new blocking queue instance.
  148.      * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}.
  149.      */
  150.     @Deprecated
  151.     public QueueInputStream(final BlockingQueue<Integer> blockingQueue) {
  152.         this(builder().setBlockingQueue(blockingQueue));
  153.     }

  154.     /**
  155.      * Constructs a new instance.
  156.      *
  157.      * @param builder The builder.
  158.      */
  159.     private QueueInputStream(final Builder builder) {
  160.         this.blockingQueue = Objects.requireNonNull(builder.blockingQueue, "blockingQueue");
  161.         this.timeoutNanos = Objects.requireNonNull(builder.timeout, "timeout").toNanos();
  162.     }

  163.     /**
  164.      * Gets the blocking queue.
  165.      *
  166.      * @return the blocking queue.
  167.      */
  168.     BlockingQueue<Integer> getBlockingQueue() {
  169.         return blockingQueue;
  170.     }

  171.     /**
  172.      * Gets the timeout duration.
  173.      *
  174.      * @return the timeout duration.
  175.      */
  176.     Duration getTimeout() {
  177.         return Duration.ofNanos(timeoutNanos);
  178.     }

  179.     /**
  180.      * Constructs a new QueueOutputStream instance connected to this. Writes to the output stream will be visible to this input stream.
  181.      *
  182.      * @return QueueOutputStream connected to this stream.
  183.      */
  184.     public QueueOutputStream newQueueOutputStream() {
  185.         return new QueueOutputStream(blockingQueue);
  186.     }

  187.     /**
  188.      * Reads and returns a single byte.
  189.      *
  190.      * @return the byte read, or {@code -1} if a timeout occurs before a queue element is available.
  191.      * @throws IllegalStateException if thread is interrupted while waiting.
  192.      */
  193.     @Override
  194.     public int read() {
  195.         try {
  196.             final Integer value = blockingQueue.poll(timeoutNanos, TimeUnit.NANOSECONDS);
  197.             return value == null ? EOF : 0xFF & value;
  198.         } catch (final InterruptedException e) {
  199.             Thread.currentThread().interrupt();
  200.             // throw runtime unchecked exception to maintain signature backward-compatibility of
  201.             // this read method, which does not declare IOException
  202.             throw new IllegalStateException(e);
  203.         }
  204.     }

  205. }