1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17 package org.apache.commons.collections4.queue;
18
19 import java.io.IOException;
20 import java.io.ObjectInputStream;
21 import java.io.ObjectOutputStream;
22 import java.io.Serializable;
23 import java.util.AbstractCollection;
24 import java.util.Arrays;
25 import java.util.Collection;
26 import java.util.Iterator;
27 import java.util.NoSuchElementException;
28 import java.util.Objects;
29 import java.util.Queue;
30
31 import org.apache.commons.collections4.BoundedCollection;
32
33 /**
34 * CircularFifoQueue is a first-in first-out queue with a fixed size that
35 * replaces its oldest element if full.
36 * <p>
37 * The removal order of a {@link CircularFifoQueue} is based on the
38 * insertion order; elements are removed in the same order in which they
39 * were added. The iteration order is the same as the removal order.
40 * </p>
41 * <p>
42 * The {@link #add(Object)}, {@link #remove()}, {@link #peek()}, {@link #poll()},
43 * {@link #offer(Object)} operations all perform in constant time.
44 * All other operations perform in linear time or worse.
45 * </p>
46 * <p>
47 * This queue prevents null objects from being added.
48 * </p>
49 *
50 * @param <E> the type of elements in this collection
51 * @since 4.0
52 */
53 public class CircularFifoQueue<E> extends AbstractCollection<E>
54 implements Queue<E>, BoundedCollection<E>, Serializable {
55
56 /** Serialization version. */
57 private static final long serialVersionUID = -8423413834657610406L;
58
59 /** Underlying storage array. */
60 private transient E[] elements;
61
62 /** Array index of first (oldest) queue element. */
63 private transient int start;
64
65 /**
66 * Index mod maxElements of the array position following the last queue
67 * element. Queue elements start at elements[start] and "wrap around"
68 * elements[maxElements-1], ending at elements[decrement(end)].
69 * For example, elements = {c,a,b}, start=1, end=1 corresponds to
70 * the queue [a,b,c].
71 */
72 private transient int end;
73
74 /** Flag to indicate if the queue is currently full. */
75 private transient boolean full;
76
77 /** Capacity of the queue. */
78 private final int maxElements;
79
80 /**
81 * Constructor that creates a queue with the default size of 32.
82 */
83 public CircularFifoQueue() {
84 this(32);
85 }
86
87 /**
88 * Constructor that creates a queue from the specified collection.
89 * The collection size also sets the queue size.
90 *
91 * @param coll the collection to copy into the queue, may not be null
92 * @throws NullPointerException if the collection is null
93 */
94 public CircularFifoQueue(final Collection<? extends E> coll) {
95 this(coll.size());
96 addAll(coll);
97 }
98
99 /**
100 * Constructor that creates a queue with the specified size.
101 *
102 * @param size the size of the queue (cannot be changed)
103 * @throws IllegalArgumentException if the size is < 1
104 */
105 @SuppressWarnings("unchecked")
106 public CircularFifoQueue(final int size) {
107 if (size <= 0) {
108 throw new IllegalArgumentException("The size must be greater than 0");
109 }
110 elements = (E[]) new Object[size];
111 maxElements = elements.length;
112 }
113
114 /**
115 * Adds the given element to this queue. If the queue is full, the least recently added
116 * element is discarded so that a new element can be inserted.
117 *
118 * @param element the element to add
119 * @return true, always
120 * @throws NullPointerException if the given element is null
121 */
122 @Override
123 public boolean add(final E element) {
124 Objects.requireNonNull(element, "element");
125
126 if (isAtFullCapacity()) {
127 remove();
128 }
129
130 elements[end++] = element;
131
132 if (end >= maxElements) {
133 end = 0;
134 }
135
136 if (end == start) {
137 full = true;
138 }
139
140 return true;
141 }
142
143 /**
144 * Clears this queue.
145 */
146 @Override
147 public void clear() {
148 full = false;
149 start = 0;
150 end = 0;
151 Arrays.fill(elements, null);
152 }
153
154 /**
155 * Decrements the internal index.
156 *
157 * @param index the index to decrement
158 * @return the updated index
159 */
160 private int decrement(int index) {
161 index--;
162 if (index < 0) {
163 index = maxElements - 1;
164 }
165 return index;
166 }
167
168 @Override
169 public E element() {
170 if (isEmpty()) {
171 throw new NoSuchElementException("queue is empty");
172 }
173 return peek();
174 }
175
176 /**
177 * Gets the element at the specified position in this queue.
178 *
179 * @param index the position of the element in the queue
180 * @return the element at position {@code index}
181 * @throws NoSuchElementException if the requested position is outside the range [0, size)
182 */
183 public E get(final int index) {
184 final int sz = size();
185 if (index < 0 || index >= sz) {
186 throw new NoSuchElementException(
187 String.format("The specified index %1$d is outside the available range [0, %2$d)",
188 Integer.valueOf(index), Integer.valueOf(sz)));
189 }
190
191 final int idx = (start + index) % maxElements;
192 return elements[idx];
193 }
194
195 /**
196 * Increments the internal index.
197 *
198 * @param index the index to increment
199 * @return the updated index
200 */
201 private int increment(int index) {
202 index++;
203 if (index >= maxElements) {
204 index = 0;
205 }
206 return index;
207 }
208
209 /**
210 * Returns {@code true} if the capacity limit of this queue has been reached,
211 * i.e. the number of elements stored in the queue equals its maximum size.
212 *
213 * @return {@code true} if the capacity limit has been reached, {@code false} otherwise
214 * @since 4.1
215 */
216 public boolean isAtFullCapacity() {
217 return size() == maxElements;
218 }
219
220 /**
221 * Returns true if this queue is empty; false otherwise.
222 *
223 * @return true if this queue is empty
224 */
225 @Override
226 public boolean isEmpty() {
227 return size() == 0;
228 }
229
230 /**
231 * {@inheritDoc}
232 * <p>
233 * A {@code CircularFifoQueue} can never be full, thus this returns always
234 * {@code false}.
235 *
236 * @return always returns {@code false}
237 */
238 @Override
239 public boolean isFull() {
240 return false;
241 }
242
243 /**
244 * Returns an iterator over this queue's elements.
245 *
246 * @return an iterator over this queue's elements
247 */
248 @Override
249 public Iterator<E> iterator() {
250 return new Iterator<E>() {
251
252 private int index = start;
253 private int lastReturnedIndex = -1;
254 private boolean isFirst = full;
255
256 @Override
257 public boolean hasNext() {
258 return isFirst || index != end;
259 }
260
261 @Override
262 public E next() {
263 if (!hasNext()) {
264 throw new NoSuchElementException();
265 }
266 isFirst = false;
267 lastReturnedIndex = index;
268 index = increment(index);
269 return elements[lastReturnedIndex];
270 }
271
272 @Override
273 public void remove() {
274 if (lastReturnedIndex == -1) {
275 throw new IllegalStateException();
276 }
277
278 // First element can be removed quickly
279 if (lastReturnedIndex == start) {
280 CircularFifoQueue.this.remove();
281 lastReturnedIndex = -1;
282 return;
283 }
284
285 int pos = lastReturnedIndex + 1;
286 if (start < lastReturnedIndex && pos < end) {
287 // shift in one part
288 System.arraycopy(elements, pos, elements, lastReturnedIndex, end - pos);
289 } else {
290 // Other elements require us to shift the subsequent elements
291 while (pos != end) {
292 if (pos >= maxElements) {
293 elements[pos - 1] = elements[0];
294 pos = 0;
295 } else {
296 elements[decrement(pos)] = elements[pos];
297 pos = increment(pos);
298 }
299 }
300 }
301
302 lastReturnedIndex = -1;
303 end = decrement(end);
304 elements[end] = null;
305 full = false;
306 index = decrement(index);
307 }
308
309 };
310 }
311
312 /**
313 * Gets the maximum size of the collection (the bound).
314 *
315 * @return the maximum number of elements the collection can hold
316 */
317 @Override
318 public int maxSize() {
319 return maxElements;
320 }
321
322 /**
323 * Adds the given element to this queue. If the queue is full, the least recently added
324 * element is discarded so that a new element can be inserted.
325 *
326 * @param element the element to add
327 * @return true, always
328 * @throws NullPointerException if the given element is null
329 */
330 @Override
331 public boolean offer(final E element) {
332 return add(element);
333 }
334
335 @Override
336 public E peek() {
337 if (isEmpty()) {
338 return null;
339 }
340 return elements[start];
341 }
342
343 @Override
344 public E poll() {
345 if (isEmpty()) {
346 return null;
347 }
348 return remove();
349 }
350
351 /**
352 * Deserializes the queue in using a custom routine.
353 *
354 * @param in the input stream
355 * @throws IOException if an I/O error occurs while writing to the output stream
356 * @throws ClassNotFoundException if the class of a serialized object cannot be found
357 */
358 @SuppressWarnings("unchecked")
359 private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException {
360 in.defaultReadObject();
361 elements = (E[]) new Object[maxElements];
362 final int size = in.readInt();
363 for (int i = 0; i < size; i++) {
364 elements[i] = (E) in.readObject();
365 }
366 start = 0;
367 full = size == maxElements;
368 if (full) {
369 end = 0;
370 } else {
371 end = size;
372 }
373 }
374
375 @Override
376 public E remove() {
377 if (isEmpty()) {
378 throw new NoSuchElementException("queue is empty");
379 }
380
381 final E element = elements[start];
382 if (null != element) {
383 elements[start++] = null;
384
385 if (start >= maxElements) {
386 start = 0;
387 }
388 full = false;
389 }
390 return element;
391 }
392
393 /**
394 * Returns the number of elements stored in the queue.
395 *
396 * @return this queue's size
397 */
398 @Override
399 public int size() {
400 int size = 0;
401
402 if (end < start) {
403 size = maxElements - start + end;
404 } else if (end == start) {
405 size = full ? maxElements : 0;
406 } else {
407 size = end - start;
408 }
409
410 return size;
411 }
412
413 /**
414 * Serializes this object to an ObjectOutputStream.
415 *
416 * @param out the target ObjectOutputStream.
417 * @throws IOException thrown when an I/O errors occur writing to the target stream.
418 */
419 private void writeObject(final ObjectOutputStream out) throws IOException {
420 out.defaultWriteObject();
421 out.writeInt(size());
422 for (final E e : this) {
423 out.writeObject(e);
424 }
425 }
426
427 }