001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * https://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.commons.io.input; 018 019import static org.apache.commons.io.IOUtils.EOF; 020 021import java.io.IOException; 022import java.io.InputStream; 023import java.io.PipedInputStream; 024import java.io.PipedOutputStream; 025import java.time.Duration; 026import java.util.ArrayList; 027import java.util.List; 028import java.util.Objects; 029import java.util.concurrent.BlockingQueue; 030import java.util.concurrent.LinkedBlockingQueue; 031import java.util.concurrent.TimeUnit; 032 033import org.apache.commons.io.IOUtils; 034import org.apache.commons.io.build.AbstractStreamBuilder; 035import org.apache.commons.io.output.QueueOutputStream; 036 037/** 038 * Simple alternative to JDK {@link PipedInputStream}; queue input stream provides what's written in queue output stream. 039 * <p> 040 * To build an instance, use {@link Builder}. 041 * </p> 042 * <p> 043 * Example usage: 044 * </p> 045 * <pre> 046 * QueueInputStream inputStream = new QueueInputStream(); 047 * QueueOutputStream outputStream = inputStream.newQueueOutputStream(); 048 * 049 * outputStream.write("hello world".getBytes(UTF_8)); 050 * inputStream.read(); 051 * </pre> 052 * <p> 053 * Unlike JDK {@link PipedInputStream} and {@link PipedOutputStream}, queue input/output streams may be used safely in a single thread or multiple threads. 054 * Also, unlike JDK classes, no special meaning is attached to initial or current thread. Instances can be used longer after initial threads exited. 055 * </p> 056 * <p> 057 * Closing a {@link QueueInputStream} has no effect. The methods in this class can be called after the stream has been closed without generating an 058 * {@link IOException}. 059 * </p> 060 * 061 * @see Builder 062 * @see QueueOutputStream 063 * @since 2.9.0 064 */ 065public class QueueInputStream extends InputStream { 066 067 // @formatter:off 068 /** 069 * Builds a new {@link QueueInputStream}. 070 * 071 * <p> 072 * For example: 073 * </p> 074 * <pre>{@code 075 * QueueInputStream s = QueueInputStream.builder() 076 * .setBlockingQueue(new LinkedBlockingQueue<>()) 077 * .setTimeout(Duration.ZERO) 078 * .get();} 079 * </pre> 080 * 081 * @see #get() 082 * @since 2.12.0 083 */ 084 // @formatter:on 085 public static class Builder extends AbstractStreamBuilder<QueueInputStream, Builder> { 086 087 private BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>(); 088 private Duration timeout = Duration.ZERO; 089 090 /** 091 * Constructs a new builder of {@link QueueInputStream}. 092 */ 093 public Builder() { 094 // empty 095 } 096 097 /** 098 * Builds a new {@link QueueInputStream}. 099 * <p> 100 * This builder uses the following aspects: 101 * </p> 102 * <ul> 103 * <li>{@link #setBlockingQueue(BlockingQueue)}</li> 104 * <li>timeout</li> 105 * </ul> 106 * 107 * @return a new instance. 108 * @see #getUnchecked() 109 */ 110 @Override 111 public QueueInputStream get() { 112 return new QueueInputStream(this); 113 } 114 115 /** 116 * Sets backing queue for the stream. 117 * 118 * @param blockingQueue backing queue for the stream, null resets to a new blocking queue instance. 119 * @return {@code this} instance. 120 */ 121 public Builder setBlockingQueue(final BlockingQueue<Integer> blockingQueue) { 122 this.blockingQueue = blockingQueue != null ? blockingQueue : new LinkedBlockingQueue<>(); 123 return this; 124 } 125 126 /** 127 * Sets the polling timeout. 128 * 129 * @param timeout the polling timeout. 130 * @return {@code this} instance. 131 */ 132 public Builder setTimeout(final Duration timeout) { 133 if (timeout != null && timeout.toNanos() < 0) { 134 throw new IllegalArgumentException("timeout must not be negative"); 135 } 136 this.timeout = timeout != null ? timeout : Duration.ZERO; 137 return this; 138 } 139 140 } 141 142 /** 143 * Constructs a new {@link Builder}. 144 * 145 * @return a new {@link Builder}. 146 * @since 2.12.0 147 */ 148 public static Builder builder() { 149 return new Builder(); 150 } 151 152 private final BlockingQueue<Integer> blockingQueue; 153 154 private final long timeoutNanos; 155 156 /** 157 * Constructs a new instance with no limit to its internal queue size and zero timeout. 158 */ 159 public QueueInputStream() { 160 this(new LinkedBlockingQueue<>()); 161 } 162 163 /** 164 * Constructs a new instance with given queue and zero timeout. 165 * 166 * @param blockingQueue backing queue for the stream, null maps to a new blocking queue instance. 167 * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}. 168 */ 169 @Deprecated 170 public QueueInputStream(final BlockingQueue<Integer> blockingQueue) { 171 this(builder().setBlockingQueue(blockingQueue)); 172 } 173 174 /** 175 * Constructs a new instance. 176 * 177 * @param builder The builder. 178 */ 179 private QueueInputStream(final Builder builder) { 180 this.blockingQueue = Objects.requireNonNull(builder.blockingQueue, "blockingQueue"); 181 this.timeoutNanos = Objects.requireNonNull(builder.timeout, "timeout").toNanos(); 182 } 183 184 /** 185 * Gets the blocking queue. 186 * 187 * @return the blocking queue. 188 */ 189 BlockingQueue<Integer> getBlockingQueue() { 190 return blockingQueue; 191 } 192 193 /** 194 * Gets the timeout duration. 195 * 196 * @return the timeout duration. 197 */ 198 Duration getTimeout() { 199 return Duration.ofNanos(timeoutNanos); 200 } 201 202 /** 203 * Constructs a new QueueOutputStream instance connected to this. Writes to the output stream will be visible to this input stream. 204 * 205 * @return QueueOutputStream connected to this stream. 206 */ 207 public QueueOutputStream newQueueOutputStream() { 208 return new QueueOutputStream(blockingQueue); 209 } 210 211 /** 212 * Reads and returns a single byte. 213 * 214 * @return the byte read, or {@code -1} if a timeout occurs before a queue element is available. 215 * @throws IllegalStateException if thread is interrupted while waiting. 216 */ 217 @Override 218 public int read() { 219 try { 220 final Integer value = blockingQueue.poll(timeoutNanos, TimeUnit.NANOSECONDS); 221 return value == null ? EOF : 0xFF & value; 222 } catch (final InterruptedException e) { 223 Thread.currentThread().interrupt(); 224 // throw runtime unchecked exception to maintain signature backward-compatibility of 225 // this read method, which does not declare IOException 226 throw new IllegalStateException(e); 227 } 228 } 229 230 /** 231 * Reads up to {@code length} bytes of data from the input stream into 232 * an array of bytes. The first byte is read while honoring the timeout; the rest are read while <i>not</i> honoring 233 * the timeout. The number of bytes actually read is returned as an integer. 234 * 235 * @param b the buffer into which the data is read. 236 * @param offset the start offset in array {@code b} at which the data is written. 237 * @param length the maximum number of bytes to read. 238 * @return the total number of bytes read into the buffer, or {@code -1} if there is no more data because the 239 * end of the stream has been reached. 240 * @throws NullPointerException If {@code b} is {@code null}. 241 * @throws IllegalStateException if thread is interrupted while waiting for the first byte. 242 * @throws IndexOutOfBoundsException if {@code offset} is negative, {@code length} is negative, or {@code length} is 243 * greater than {@code b.length - offset}. 244 * @since 2.20.0 245 */ 246 @Override 247 public int read(final byte[] b, final int offset, final int length) { 248 IOUtils.checkFromIndexSize(b, offset, length); 249 if (length == 0) { 250 return 0; 251 } 252 final List<Integer> drain = new ArrayList<>(Math.min(length, blockingQueue.size())); 253 blockingQueue.drainTo(drain, length); 254 if (drain.isEmpty()) { 255 // no data immediately available. wait for first byte 256 final int value = read(); 257 if (value == EOF) { 258 return EOF; 259 } 260 drain.add(value); 261 blockingQueue.drainTo(drain, length - 1); 262 } 263 int i = 0; 264 for (final Integer value : drain) { 265 b[offset + i] = (byte) (0xFF & value); 266 i++; 267 } 268 return i; 269 } 270 271}