1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17 package org.apache.commons.collections.buffer;
18
19 import java.io.PrintWriter;
20 import java.io.StringWriter;
21 import java.util.Collection;
22
23 import org.apache.commons.collections.Buffer;
24 import org.apache.commons.collections.BufferUnderflowException;
25
26 /**
27 * Decorates another {@link Buffer} to make {@link #get()} and
28 * {@link #remove()} block when the <code>Buffer</code> is empty.
29 * <p>
30 * If either <code>get</code> or <code>remove</code> is called on an empty
31 * {@link Buffer}, the calling thread waits for notification that
32 * an <code>add</code> or <code>addAll</code> operation has completed.
33 * <p>
34 * When one or more entries are added to an empty {@link Buffer},
35 * all threads blocked in <code>get</code> or <code>remove</code> are notified.
36 * There is no guarantee that concurrent blocked <code>get</code> or
37 * <code>remove</code> requests will be "unblocked" and receive data in the
38 * order that they arrive.
39 * <p>
40 * This class is Serializable from Commons Collections 3.1.
41 * This class contains an extra field in 3.2, however the serialization
42 * specification will handle this gracefully.
43 *
44 * @param <E> the type of the elements in the buffer
45 * @version $Id: BlockingBuffer.java 1429905 2013-01-07 17:15:14Z ggregory $
46 * @since 3.0
47 */
48 public class BlockingBuffer<E> extends SynchronizedBuffer<E> {
49
50 /** Serialization version. */
51 private static final long serialVersionUID = 1719328905017860541L;
52 /** The timeout value in milliseconds. */
53 private final long timeout;
54
55 /**
56 * Factory method to create a blocking buffer.
57 *
58 * @param <E> the type of the elements in the buffer
59 * @param buffer the buffer to decorate, must not be null
60 * @return a new blocking Buffer
61 * @throws IllegalArgumentException if buffer is null
62 */
63 public static <E> BlockingBuffer<E> blockingBuffer(final Buffer<E> buffer) {
64 return new BlockingBuffer<E>(buffer);
65 }
66
67 /**
68 * Factory method to create a blocking buffer with a timeout value.
69 *
70 * @param <E> the type of the elements in the buffer
71 * @param buffer the buffer to decorate, must not be null
72 * @param timeoutMillis the timeout value in milliseconds, zero or less for no timeout
73 * @return a new blocking buffer
74 * @throws IllegalArgumentException if the buffer is null
75 * @since 3.2
76 */
77 public static <E> BlockingBuffer<E> blockingBuffer(final Buffer<E> buffer, final long timeoutMillis) {
78 return new BlockingBuffer<E>(buffer, timeoutMillis);
79 }
80
81 //-----------------------------------------------------------------------
82 /**
83 * Constructor that wraps (not copies).
84 *
85 * @param buffer the buffer to decorate, must not be null
86 * @throws IllegalArgumentException if the buffer is null
87 */
88 protected BlockingBuffer(final Buffer<E> buffer) {
89 super(buffer);
90 this.timeout = 0;
91 }
92
93 /**
94 * Constructor that wraps (not copies).
95 *
96 * @param buffer the buffer to decorate, must not be null
97 * @param timeoutMillis the timeout value in milliseconds, zero or less for no timeout
98 * @throws IllegalArgumentException if the buffer is null
99 * @since 3.2
100 */
101 protected BlockingBuffer(final Buffer<E> buffer, final long timeoutMillis) {
102 super(buffer);
103 this.timeout = timeoutMillis < 0 ? 0 : timeoutMillis;
104 }
105
106 //-----------------------------------------------------------------------
107 @Override
108 public boolean add(final E o) {
109 synchronized (lock) {
110 final boolean result = collection.add(o);
111 lock.notifyAll();
112 return result;
113 }
114 }
115
116 @Override
117 public boolean addAll(final Collection<? extends E> c) {
118 synchronized (lock) {
119 final boolean result = collection.addAll(c);
120 lock.notifyAll();
121 return result;
122 }
123 }
124
125 /**
126 * Gets the next value from the buffer, waiting until an object is
127 * added if the buffer is empty. This method uses the default timeout
128 * set in the constructor.
129 *
130 * @throws BufferUnderflowException if an interrupt is received
131 * {@inheritDoc}
132 */
133 @Override
134 public E get() {
135 synchronized (lock) {
136 while (collection.isEmpty()) {
137 try {
138 if (timeout <= 0) {
139 lock.wait();
140 } else {
141 return get(timeout);
142 }
143 } catch (final InterruptedException e) {
144 final PrintWriter out = new PrintWriter(new StringWriter());
145 e.printStackTrace(out);
146 throw new BufferUnderflowException("Caused by InterruptedException: " + out.toString());
147 }
148 }
149 return decorated().get();
150 }
151 }
152
153 /**
154 * Gets the next value from the buffer, waiting until an object is
155 * added for up to the specified timeout value if the buffer is empty.
156 *
157 * @param timeout the timeout value in milliseconds
158 * @return the next object in the buffer
159 * @throws BufferUnderflowException if an interrupt is received
160 * @throws BufferUnderflowException if the timeout expires
161 * @since 3.2
162 */
163 public E get(final long timeout) {
164 synchronized (lock) {
165 final long expiration = System.currentTimeMillis() + timeout;
166 long timeLeft = expiration - System.currentTimeMillis();
167 while (timeLeft > 0 && collection.isEmpty()) {
168 try {
169 lock.wait(timeLeft);
170 timeLeft = expiration - System.currentTimeMillis();
171 } catch(final InterruptedException e) {
172 final PrintWriter out = new PrintWriter(new StringWriter());
173 e.printStackTrace(out);
174 throw new BufferUnderflowException("Caused by InterruptedException: " + out.toString());
175 }
176 }
177 if (collection.isEmpty()) {
178 throw new BufferUnderflowException("Timeout expired");
179 }
180 return decorated().get();
181 }
182 }
183
184 /**
185 * Removes the next value from the buffer, waiting until an object is
186 * added if the buffer is empty. This method uses the default timeout
187 * set in the constructor.
188 *
189 * @throws BufferUnderflowException if an interrupt is received
190 * {@inheritDoc}
191 */
192 @Override
193 public E remove() {
194 synchronized (lock) {
195 while (collection.isEmpty()) {
196 try {
197 if (timeout <= 0) {
198 lock.wait();
199 } else {
200 return remove(timeout);
201 }
202 } catch (final InterruptedException e) {
203 final PrintWriter out = new PrintWriter(new StringWriter());
204 e.printStackTrace(out);
205 throw new BufferUnderflowException("Caused by InterruptedException: " + out.toString());
206 }
207 }
208 return decorated().remove();
209 }
210 }
211
212 /**
213 * Removes the next value from the buffer, waiting until an object is
214 * added for up to the specified timeout value if the buffer is empty.
215 *
216 * @param timeout the timeout value in milliseconds
217 * @return the next object in the buffer, which is also removed
218 * @throws BufferUnderflowException if an interrupt is received
219 * @throws BufferUnderflowException if the timeout expires
220 * @since 3.2
221 */
222 public E remove(final long timeout) {
223 synchronized (lock) {
224 final long expiration = System.currentTimeMillis() + timeout;
225 long timeLeft = expiration - System.currentTimeMillis();
226 while (timeLeft > 0 && collection.isEmpty()) {
227 try {
228 lock.wait(timeLeft);
229 timeLeft = expiration - System.currentTimeMillis();
230 } catch(final InterruptedException e) {
231 final PrintWriter out = new PrintWriter(new StringWriter());
232 e.printStackTrace(out);
233 throw new BufferUnderflowException("Caused by InterruptedException: " + out.toString());
234 }
235 }
236 if (collection.isEmpty()) {
237 throw new BufferUnderflowException("Timeout expired");
238 }
239 return decorated().remove();
240 }
241 }
242
243 }