001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.commons.collections4.queue; 018 019import java.io.IOException; 020import java.io.ObjectInputStream; 021import java.io.ObjectOutputStream; 022import java.io.Serializable; 023import java.util.AbstractCollection; 024import java.util.Arrays; 025import java.util.Collection; 026import java.util.Iterator; 027import java.util.NoSuchElementException; 028import java.util.Queue; 029 030import org.apache.commons.collections4.BoundedCollection; 031 032/** 033 * CircularFifoQueue is a first-in first-out queue with a fixed size that 034 * replaces its oldest element if full. 035 * <p> 036 * The removal order of a {@link CircularFifoQueue} is based on the 037 * insertion order; elements are removed in the same order in which they 038 * were added. The iteration order is the same as the removal order. 039 * <p> 040 * The {@link #add(Object)}, {@link #remove()}, {@link #peek()}, {@link #poll}, 041 * {@link #offer(Object)} operations all perform in constant time. 042 * All other operations perform in linear time or worse. 043 * <p> 044 * This queue prevents null objects from being added. 045 * 046 * @since 4.0 047 * @version $Id: CircularFifoQueue.html 972421 2015-11-14 20:00:04Z tn $ 048 */ 049public class CircularFifoQueue<E> extends AbstractCollection<E> 050 implements Queue<E>, BoundedCollection<E>, Serializable { 051 052 /** Serialization version. */ 053 private static final long serialVersionUID = -8423413834657610406L; 054 055 /** Underlying storage array. */ 056 private transient E[] elements; 057 058 /** Array index of first (oldest) queue element. */ 059 private transient int start = 0; 060 061 /** 062 * Index mod maxElements of the array position following the last queue 063 * element. Queue elements start at elements[start] and "wrap around" 064 * elements[maxElements-1], ending at elements[decrement(end)]. 065 * For example, elements = {c,a,b}, start=1, end=1 corresponds to 066 * the queue [a,b,c]. 067 */ 068 private transient int end = 0; 069 070 /** Flag to indicate if the queue is currently full. */ 071 private transient boolean full = false; 072 073 /** Capacity of the queue. */ 074 private final int maxElements; 075 076 /** 077 * Constructor that creates a queue with the default size of 32. 078 */ 079 public CircularFifoQueue() { 080 this(32); 081 } 082 083 /** 084 * Constructor that creates a queue with the specified size. 085 * 086 * @param size the size of the queue (cannot be changed) 087 * @throws IllegalArgumentException if the size is < 1 088 */ 089 @SuppressWarnings("unchecked") 090 public CircularFifoQueue(final int size) { 091 if (size <= 0) { 092 throw new IllegalArgumentException("The size must be greater than 0"); 093 } 094 elements = (E[]) new Object[size]; 095 maxElements = elements.length; 096 } 097 098 /** 099 * Constructor that creates a queue from the specified collection. 100 * The collection size also sets the queue size. 101 * 102 * @param coll the collection to copy into the queue, may not be null 103 * @throws NullPointerException if the collection is null 104 */ 105 public CircularFifoQueue(final Collection<? extends E> coll) { 106 this(coll.size()); 107 addAll(coll); 108 } 109 110 //----------------------------------------------------------------------- 111 /** 112 * Write the queue out using a custom routine. 113 * 114 * @param out the output stream 115 * @throws IOException if an I/O error occurs while writing to the output stream 116 */ 117 private void writeObject(final ObjectOutputStream out) throws IOException { 118 out.defaultWriteObject(); 119 out.writeInt(size()); 120 for (final E e : this) { 121 out.writeObject(e); 122 } 123 } 124 125 /** 126 * Read the queue in using a custom routine. 127 * 128 * @param in the input stream 129 * @throws IOException if an I/O error occurs while writing to the output stream 130 * @throws ClassNotFoundException if the class of a serialized object can not be found 131 */ 132 @SuppressWarnings("unchecked") 133 private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException { 134 in.defaultReadObject(); 135 elements = (E[]) new Object[maxElements]; 136 final int size = in.readInt(); 137 for (int i = 0; i < size; i++) { 138 elements[i] = (E) in.readObject(); 139 } 140 start = 0; 141 full = size == maxElements; 142 if (full) { 143 end = 0; 144 } else { 145 end = size; 146 } 147 } 148 149 //----------------------------------------------------------------------- 150 /** 151 * Returns the number of elements stored in the queue. 152 * 153 * @return this queue's size 154 */ 155 @Override 156 public int size() { 157 int size = 0; 158 159 if (end < start) { 160 size = maxElements - start + end; 161 } else if (end == start) { 162 size = full ? maxElements : 0; 163 } else { 164 size = end - start; 165 } 166 167 return size; 168 } 169 170 /** 171 * Returns true if this queue is empty; false otherwise. 172 * 173 * @return true if this queue is empty 174 */ 175 @Override 176 public boolean isEmpty() { 177 return size() == 0; 178 } 179 180 /** 181 * {@inheritDoc} 182 * <p> 183 * A {@code CircularFifoQueue} can never be full, thus this returns always 184 * {@code false}. 185 * 186 * @return always returns {@code false} 187 */ 188 public boolean isFull() { 189 return false; 190 } 191 192 private boolean isAtFullCapacity() { 193 return size() == maxElements; 194 } 195 196 /** 197 * Gets the maximum size of the collection (the bound). 198 * 199 * @return the maximum number of elements the collection can hold 200 */ 201 public int maxSize() { 202 return maxElements; 203 } 204 205 /** 206 * Clears this queue. 207 */ 208 @Override 209 public void clear() { 210 full = false; 211 start = 0; 212 end = 0; 213 Arrays.fill(elements, null); 214 } 215 216 /** 217 * Adds the given element to this queue. If the queue is full, the least recently added 218 * element is discarded so that a new element can be inserted. 219 * 220 * @param element the element to add 221 * @return true, always 222 * @throws NullPointerException if the given element is null 223 */ 224 @Override 225 public boolean add(final E element) { 226 if (null == element) { 227 throw new NullPointerException("Attempted to add null object to queue"); 228 } 229 230 if (isAtFullCapacity()) { 231 remove(); 232 } 233 234 elements[end++] = element; 235 236 if (end >= maxElements) { 237 end = 0; 238 } 239 240 if (end == start) { 241 full = true; 242 } 243 244 return true; 245 } 246 247 /** 248 * Returns the element at the specified position in this queue. 249 * 250 * @param index the position of the element in the queue 251 * @return the element at position {@code index} 252 * @throws NoSuchElementException if the requested position is outside the range [0, size) 253 */ 254 public E get(final int index) { 255 final int sz = size(); 256 if (index < 0 || index >= sz) { 257 throw new NoSuchElementException( 258 String.format("The specified index (%1$d) is outside the available range [0, %2$d)", 259 Integer.valueOf(index), Integer.valueOf(sz))); 260 } 261 262 final int idx = (start + index) % maxElements; 263 return elements[idx]; 264 } 265 266 //----------------------------------------------------------------------- 267 268 /** 269 * Adds the given element to this queue. If the queue is full, the least recently added 270 * element is discarded so that a new element can be inserted. 271 * 272 * @param element the element to add 273 * @return true, always 274 * @throws NullPointerException if the given element is null 275 */ 276 public boolean offer(E element) { 277 return add(element); 278 } 279 280 public E poll() { 281 if (isEmpty()) { 282 return null; 283 } 284 return remove(); 285 } 286 287 public E element() { 288 if (isEmpty()) { 289 throw new NoSuchElementException("queue is empty"); 290 } 291 return peek(); 292 } 293 294 public E peek() { 295 if (isEmpty()) { 296 return null; 297 } 298 return elements[start]; 299 } 300 301 public E remove() { 302 if (isEmpty()) { 303 throw new NoSuchElementException("queue is empty"); 304 } 305 306 final E element = elements[start]; 307 if (null != element) { 308 elements[start++] = null; 309 310 if (start >= maxElements) { 311 start = 0; 312 } 313 full = false; 314 } 315 return element; 316 } 317 318 //----------------------------------------------------------------------- 319 /** 320 * Increments the internal index. 321 * 322 * @param index the index to increment 323 * @return the updated index 324 */ 325 private int increment(int index) { 326 index++; 327 if (index >= maxElements) { 328 index = 0; 329 } 330 return index; 331 } 332 333 /** 334 * Decrements the internal index. 335 * 336 * @param index the index to decrement 337 * @return the updated index 338 */ 339 private int decrement(int index) { 340 index--; 341 if (index < 0) { 342 index = maxElements - 1; 343 } 344 return index; 345 } 346 347 /** 348 * Returns an iterator over this queue's elements. 349 * 350 * @return an iterator over this queue's elements 351 */ 352 @Override 353 public Iterator<E> iterator() { 354 return new Iterator<E>() { 355 356 private int index = start; 357 private int lastReturnedIndex = -1; 358 private boolean isFirst = full; 359 360 public boolean hasNext() { 361 return isFirst || index != end; 362 } 363 364 public E next() { 365 if (!hasNext()) { 366 throw new NoSuchElementException(); 367 } 368 isFirst = false; 369 lastReturnedIndex = index; 370 index = increment(index); 371 return elements[lastReturnedIndex]; 372 } 373 374 public void remove() { 375 if (lastReturnedIndex == -1) { 376 throw new IllegalStateException(); 377 } 378 379 // First element can be removed quickly 380 if (lastReturnedIndex == start) { 381 CircularFifoQueue.this.remove(); 382 lastReturnedIndex = -1; 383 return; 384 } 385 386 int pos = lastReturnedIndex + 1; 387 if (start < lastReturnedIndex && pos < end) { 388 // shift in one part 389 System.arraycopy(elements, pos, elements, lastReturnedIndex, end - pos); 390 } else { 391 // Other elements require us to shift the subsequent elements 392 while (pos != end) { 393 if (pos >= maxElements) { 394 elements[pos - 1] = elements[0]; 395 pos = 0; 396 } else { 397 elements[decrement(pos)] = elements[pos]; 398 pos = increment(pos); 399 } 400 } 401 } 402 403 lastReturnedIndex = -1; 404 end = decrement(end); 405 elements[end] = null; 406 full = false; 407 index = decrement(index); 408 } 409 410 }; 411 } 412 413}