View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.commons.collections4.queue;
18  
19  import java.io.IOException;
20  import java.io.ObjectInputStream;
21  import java.io.ObjectOutputStream;
22  import java.io.Serializable;
23  import java.util.AbstractCollection;
24  import java.util.Arrays;
25  import java.util.Collection;
26  import java.util.Iterator;
27  import java.util.NoSuchElementException;
28  import java.util.Objects;
29  import java.util.Queue;
30  
31  import org.apache.commons.collections4.BoundedCollection;
32  
33  /**
34   * CircularFifoQueue is a first-in first-out queue with a fixed size that
35   * replaces its oldest element if full.
36   * <p>
37   * The removal order of a {@link CircularFifoQueue} is based on the
38   * insertion order; elements are removed in the same order in which they
39   * were added.  The iteration order is the same as the removal order.
40   * </p>
41   * <p>
42   * The {@link #add(Object)}, {@link #remove()}, {@link #peek()}, {@link #poll()},
43   * {@link #offer(Object)} operations all perform in constant time.
44   * All other operations perform in linear time or worse.
45   * </p>
46   * <p>
47   * This queue prevents null objects from being added.
48   * </p>
49   *
50   * @param <E> the type of elements in this collection
51   * @since 4.0
52   */
53  public class CircularFifoQueue<E> extends AbstractCollection<E>
54      implements Queue<E>, BoundedCollection<E>, Serializable {
55  
56      /** Serialization version. */
57      private static final long serialVersionUID = -8423413834657610406L;
58  
59      /** Underlying storage array. */
60      private transient E[] elements;
61  
62      /** Array index of first (oldest) queue element. */
63      private transient int start;
64  
65      /**
66       * Index mod maxElements of the array position following the last queue
67       * element.  Queue elements start at elements[start] and "wrap around"
68       * elements[maxElements-1], ending at elements[decrement(end)].
69       * For example, elements = {c,a,b}, start=1, end=1 corresponds to
70       * the queue [a,b,c].
71       */
72      private transient int end;
73  
74      /** Flag to indicate if the queue is currently full. */
75      private transient boolean full;
76  
77      /** Capacity of the queue. */
78      private final int maxElements;
79  
80      /**
81       * Constructor that creates a queue with the default size of 32.
82       */
83      public CircularFifoQueue() {
84          this(32);
85      }
86  
87      /**
88       * Constructor that creates a queue from the specified collection.
89       * The collection size also sets the queue size.
90       *
91       * @param coll  the collection to copy into the queue, may not be null
92       * @throws NullPointerException if the collection is null
93       */
94      public CircularFifoQueue(final Collection<? extends E> coll) {
95          this(coll.size());
96          addAll(coll);
97      }
98  
99      /**
100      * Constructor that creates a queue with the specified size.
101      *
102      * @param size  the size of the queue (cannot be changed)
103      * @throws IllegalArgumentException  if the size is &lt; 1
104      */
105     @SuppressWarnings("unchecked")
106     public CircularFifoQueue(final int size) {
107         if (size <= 0) {
108             throw new IllegalArgumentException("The size must be greater than 0");
109         }
110         elements = (E[]) new Object[size];
111         maxElements = elements.length;
112     }
113 
114     /**
115      * Adds the given element to this queue. If the queue is full, the least recently added
116      * element is discarded so that a new element can be inserted.
117      *
118      * @param element  the element to add
119      * @return true, always
120      * @throws NullPointerException  if the given element is null
121      */
122     @Override
123     public boolean add(final E element) {
124         Objects.requireNonNull(element, "element");
125 
126         if (isAtFullCapacity()) {
127             remove();
128         }
129 
130         elements[end++] = element;
131 
132         if (end >= maxElements) {
133             end = 0;
134         }
135 
136         if (end == start) {
137             full = true;
138         }
139 
140         return true;
141     }
142 
143     /**
144      * Clears this queue.
145      */
146     @Override
147     public void clear() {
148         full = false;
149         start = 0;
150         end = 0;
151         Arrays.fill(elements, null);
152     }
153 
154     /**
155      * Decrements the internal index.
156      *
157      * @param index  the index to decrement
158      * @return the updated index
159      */
160     private int decrement(int index) {
161         index--;
162         if (index < 0) {
163             index = maxElements - 1;
164         }
165         return index;
166     }
167 
168     @Override
169     public E element() {
170         if (isEmpty()) {
171             throw new NoSuchElementException("queue is empty");
172         }
173         return peek();
174     }
175 
176     /**
177      * Returns the element at the specified position in this queue.
178      *
179      * @param index the position of the element in the queue
180      * @return the element at position {@code index}
181      * @throws NoSuchElementException if the requested position is outside the range [0, size)
182      */
183     public E get(final int index) {
184         final int sz = size();
185         if (index < 0 || index >= sz) {
186             throw new NoSuchElementException(
187                     String.format("The specified index %1$d is outside the available range [0, %2$d)",
188                                   Integer.valueOf(index), Integer.valueOf(sz)));
189         }
190 
191         final int idx = (start + index) % maxElements;
192         return elements[idx];
193     }
194 
195     /**
196      * Increments the internal index.
197      *
198      * @param index  the index to increment
199      * @return the updated index
200      */
201     private int increment(int index) {
202         index++;
203         if (index >= maxElements) {
204             index = 0;
205         }
206         return index;
207     }
208 
209     /**
210      * Returns {@code true} if the capacity limit of this queue has been reached,
211      * i.e. the number of elements stored in the queue equals its maximum size.
212      *
213      * @return {@code true} if the capacity limit has been reached, {@code false} otherwise
214      * @since 4.1
215      */
216     public boolean isAtFullCapacity() {
217         return size() == maxElements;
218     }
219 
220     /**
221      * Returns true if this queue is empty; false otherwise.
222      *
223      * @return true if this queue is empty
224      */
225     @Override
226     public boolean isEmpty() {
227         return size() == 0;
228     }
229 
230     /**
231      * {@inheritDoc}
232      * <p>
233      * A {@code CircularFifoQueue} can never be full, thus this returns always
234      * {@code false}.
235      *
236      * @return always returns {@code false}
237      */
238     @Override
239     public boolean isFull() {
240         return false;
241     }
242 
243     /**
244      * Returns an iterator over this queue's elements.
245      *
246      * @return an iterator over this queue's elements
247      */
248     @Override
249     public Iterator<E> iterator() {
250         return new Iterator<E>() {
251 
252             private int index = start;
253             private int lastReturnedIndex = -1;
254             private boolean isFirst = full;
255 
256             @Override
257             public boolean hasNext() {
258                 return isFirst || index != end;
259             }
260 
261             @Override
262             public E next() {
263                 if (!hasNext()) {
264                     throw new NoSuchElementException();
265                 }
266                 isFirst = false;
267                 lastReturnedIndex = index;
268                 index = increment(index);
269                 return elements[lastReturnedIndex];
270             }
271 
272             @Override
273             public void remove() {
274                 if (lastReturnedIndex == -1) {
275                     throw new IllegalStateException();
276                 }
277 
278                 // First element can be removed quickly
279                 if (lastReturnedIndex == start) {
280                     CircularFifoQueue.this.remove();
281                     lastReturnedIndex = -1;
282                     return;
283                 }
284 
285                 int pos = lastReturnedIndex + 1;
286                 if (start < lastReturnedIndex && pos < end) {
287                     // shift in one part
288                     System.arraycopy(elements, pos, elements, lastReturnedIndex, end - pos);
289                 } else {
290                     // Other elements require us to shift the subsequent elements
291                     while (pos != end) {
292                         if (pos >= maxElements) {
293                             elements[pos - 1] = elements[0];
294                             pos = 0;
295                         } else {
296                             elements[decrement(pos)] = elements[pos];
297                             pos = increment(pos);
298                         }
299                     }
300                 }
301 
302                 lastReturnedIndex = -1;
303                 end = decrement(end);
304                 elements[end] = null;
305                 full = false;
306                 index = decrement(index);
307             }
308 
309         };
310     }
311 
312     /**
313      * Gets the maximum size of the collection (the bound).
314      *
315      * @return the maximum number of elements the collection can hold
316      */
317     @Override
318     public int maxSize() {
319         return maxElements;
320     }
321 
322     /**
323      * Adds the given element to this queue. If the queue is full, the least recently added
324      * element is discarded so that a new element can be inserted.
325      *
326      * @param element  the element to add
327      * @return true, always
328      * @throws NullPointerException  if the given element is null
329      */
330     @Override
331     public boolean offer(final E element) {
332         return add(element);
333     }
334 
335     @Override
336     public E peek() {
337         if (isEmpty()) {
338             return null;
339         }
340         return elements[start];
341     }
342 
343     @Override
344     public E poll() {
345         if (isEmpty()) {
346             return null;
347         }
348         return remove();
349     }
350 
351     /**
352      * Deserializes the queue in using a custom routine.
353      *
354      * @param in  the input stream
355      * @throws IOException if an I/O error occurs while writing to the output stream
356      * @throws ClassNotFoundException if the class of a serialized object cannot be found
357      */
358     @SuppressWarnings("unchecked")
359     private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException {
360         in.defaultReadObject();
361         elements = (E[]) new Object[maxElements];
362         final int size = in.readInt();
363         for (int i = 0; i < size; i++) {
364             elements[i] = (E) in.readObject();
365         }
366         start = 0;
367         full = size == maxElements;
368         if (full) {
369             end = 0;
370         } else {
371             end = size;
372         }
373     }
374 
375     @Override
376     public E remove() {
377         if (isEmpty()) {
378             throw new NoSuchElementException("queue is empty");
379         }
380 
381         final E element = elements[start];
382         if (null != element) {
383             elements[start++] = null;
384 
385             if (start >= maxElements) {
386                 start = 0;
387             }
388             full = false;
389         }
390         return element;
391     }
392 
393     /**
394      * Returns the number of elements stored in the queue.
395      *
396      * @return this queue's size
397      */
398     @Override
399     public int size() {
400         int size = 0;
401 
402         if (end < start) {
403             size = maxElements - start + end;
404         } else if (end == start) {
405             size = full ? maxElements : 0;
406         } else {
407             size = end - start;
408         }
409 
410         return size;
411     }
412 
413     /**
414      * Serializes this object to an ObjectOutputStream.
415      *
416      * @param out the target ObjectOutputStream.
417      * @throws IOException thrown when an I/O errors occur writing to the target stream.
418      */
419     private void writeObject(final ObjectOutputStream out) throws IOException {
420         out.defaultWriteObject();
421         out.writeInt(size());
422         for (final E e : this) {
423             out.writeObject(e);
424         }
425     }
426 
427 }