View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.commons.collections.buffer;
18  
19  import java.io.PrintWriter;
20  import java.io.StringWriter;
21  import java.util.Collection;
22  
23  import org.apache.commons.collections.Buffer;
24  import org.apache.commons.collections.BufferUnderflowException;
25  
26  /**
27   * Decorates another {@link Buffer} to make {@link #get()} and
28   * {@link #remove()} block when the <code>Buffer</code> is empty.
29   * <p>
30   * If either <code>get</code> or <code>remove</code> is called on an empty
31   * {@link Buffer}, the calling thread waits for notification that
32   * an <code>add</code> or <code>addAll</code> operation has completed.
33   * <p>
34   * When one or more entries are added to an empty {@link Buffer},
35   * all threads blocked in <code>get</code> or <code>remove</code> are notified.
36   * There is no guarantee that concurrent blocked <code>get</code> or
37   * <code>remove</code> requests will be "unblocked" and receive data in the
38   * order that they arrive.
39   * <p>
40   * This class is Serializable from Commons Collections 3.1.
41   * This class contains an extra field in 3.2, however the serialization
42   * specification will handle this gracefully.
43   *
44   * @param <E> the type of the elements in the buffer
45   * @version $Id: BlockingBuffer.java 1429905 2013-01-07 17:15:14Z ggregory $
46   * @since 3.0
47   */
48  public class BlockingBuffer<E> extends SynchronizedBuffer<E> {
49  
50      /** Serialization version. */
51      private static final long serialVersionUID = 1719328905017860541L;
52      /** The timeout value in milliseconds. */
53      private final long timeout;
54  
55      /**
56       * Factory method to create a blocking buffer.
57       *
58       * @param <E> the type of the elements in the buffer
59       * @param buffer the buffer to decorate, must not be null
60       * @return a new blocking Buffer
61       * @throws IllegalArgumentException if buffer is null
62       */
63      public static <E> BlockingBuffer<E> blockingBuffer(final Buffer<E> buffer) {
64          return new BlockingBuffer<E>(buffer);
65      }
66  
67      /**
68       * Factory method to create a blocking buffer with a timeout value.
69       *
70       * @param <E> the type of the elements in the buffer
71       * @param buffer  the buffer to decorate, must not be null
72       * @param timeoutMillis  the timeout value in milliseconds, zero or less for no timeout
73       * @return a new blocking buffer
74       * @throws IllegalArgumentException if the buffer is null
75       * @since 3.2
76       */
77      public static <E> BlockingBuffer<E> blockingBuffer(final Buffer<E> buffer, final long timeoutMillis) {
78          return new BlockingBuffer<E>(buffer, timeoutMillis);
79      }
80  
81      //-----------------------------------------------------------------------    
82      /**
83       * Constructor that wraps (not copies).
84       *
85       * @param buffer the buffer to decorate, must not be null
86       * @throws IllegalArgumentException if the buffer is null
87       */
88      protected BlockingBuffer(final Buffer<E> buffer) {
89          super(buffer);
90          this.timeout = 0;
91      }
92  
93      /**
94       * Constructor that wraps (not copies).
95       *
96       * @param buffer  the buffer to decorate, must not be null
97       * @param timeoutMillis  the timeout value in milliseconds, zero or less for no timeout
98       * @throws IllegalArgumentException if the buffer is null
99       * @since 3.2
100      */
101     protected BlockingBuffer(final Buffer<E> buffer, final long timeoutMillis) {
102         super(buffer);
103         this.timeout = timeoutMillis < 0 ? 0 : timeoutMillis;
104     }
105 
106     //-----------------------------------------------------------------------
107     @Override
108     public boolean add(final E o) {
109         synchronized (lock) {
110             final boolean result = collection.add(o);
111             lock.notifyAll();
112             return result;
113         }
114     }
115 
116     @Override
117     public boolean addAll(final Collection<? extends E> c) {
118         synchronized (lock) {
119             final boolean result = collection.addAll(c);
120             lock.notifyAll();
121             return result;
122         }
123     }
124 
125     /**
126      * Gets the next value from the buffer, waiting until an object is
127      * added if the buffer is empty. This method uses the default timeout
128      * set in the constructor.
129      *
130      * @throws BufferUnderflowException if an interrupt is received
131      * {@inheritDoc}
132      */
133     @Override
134     public E get() {
135         synchronized (lock) {
136             while (collection.isEmpty()) {
137                 try {
138                     if (timeout <= 0) {
139                         lock.wait();
140                     } else {
141                         return get(timeout);
142                     }
143                 } catch (final InterruptedException e) {
144                     final PrintWriter out = new PrintWriter(new StringWriter());
145                     e.printStackTrace(out);
146                     throw new BufferUnderflowException("Caused by InterruptedException: " + out.toString());
147                 }
148             }
149             return decorated().get();
150         }
151     }
152 
153     /**
154      * Gets the next value from the buffer, waiting until an object is
155      * added for up to the specified timeout value if the buffer is empty.
156      *
157      * @param timeout  the timeout value in milliseconds
158      * @return the next object in the buffer
159      * @throws BufferUnderflowException if an interrupt is received
160      * @throws BufferUnderflowException if the timeout expires
161      * @since 3.2
162      */
163     public E get(final long timeout) {
164         synchronized (lock) {
165             final long expiration = System.currentTimeMillis() + timeout;
166             long timeLeft = expiration - System.currentTimeMillis();
167             while (timeLeft > 0 && collection.isEmpty()) {
168                 try {
169                     lock.wait(timeLeft);
170                     timeLeft = expiration - System.currentTimeMillis();
171                 } catch(final InterruptedException e) {
172                     final PrintWriter out = new PrintWriter(new StringWriter());
173                     e.printStackTrace(out);
174                     throw new BufferUnderflowException("Caused by InterruptedException: " + out.toString());
175                 }
176             }
177             if (collection.isEmpty()) {
178                 throw new BufferUnderflowException("Timeout expired");
179             }
180             return decorated().get();
181         }
182     }
183 
184     /**
185      * Removes the next value from the buffer, waiting until an object is
186      * added if the buffer is empty. This method uses the default timeout
187      * set in the constructor.
188      *
189      * @throws BufferUnderflowException if an interrupt is received
190      * {@inheritDoc}
191      */
192     @Override
193     public E remove() {
194         synchronized (lock) {
195             while (collection.isEmpty()) {
196                 try {
197                     if (timeout <= 0) {
198                         lock.wait();
199                     } else {
200                         return remove(timeout);
201                     }
202                 } catch (final InterruptedException e) {
203                     final PrintWriter out = new PrintWriter(new StringWriter());
204                     e.printStackTrace(out);
205                     throw new BufferUnderflowException("Caused by InterruptedException: " + out.toString());
206                 }
207             }
208             return decorated().remove();
209         }
210     }
211 
212     /**
213      * Removes the next value from the buffer, waiting until an object is
214      * added for up to the specified timeout value if the buffer is empty.
215      *
216      * @param timeout  the timeout value in milliseconds
217      * @return the next object in the buffer, which is also removed
218      * @throws BufferUnderflowException if an interrupt is received
219      * @throws BufferUnderflowException if the timeout expires
220      * @since 3.2
221      */
222     public E remove(final long timeout) {
223         synchronized (lock) {
224             final long expiration = System.currentTimeMillis() + timeout;
225             long timeLeft = expiration - System.currentTimeMillis();
226             while (timeLeft > 0 && collection.isEmpty()) {
227                 try {
228                     lock.wait(timeLeft);
229                     timeLeft = expiration - System.currentTimeMillis();
230                 } catch(final InterruptedException e) {
231                     final PrintWriter out = new PrintWriter(new StringWriter());
232                     e.printStackTrace(out);
233                     throw new BufferUnderflowException("Caused by InterruptedException: " + out.toString());
234                 }
235             }
236             if (collection.isEmpty()) {
237                 throw new BufferUnderflowException("Timeout expired");
238             }
239             return decorated().remove();
240         }
241     }
242 
243 }