1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 package org.apache.commons.pool2.impl; 18 19 import java.io.IOException; 20 import java.io.ObjectInputStream; 21 import java.io.Serializable; 22 import java.time.Duration; 23 import java.util.AbstractQueue; 24 import java.util.Collection; 25 import java.util.Deque; 26 import java.util.Iterator; 27 import java.util.NoSuchElementException; 28 import java.util.Objects; 29 import java.util.concurrent.TimeUnit; 30 import java.util.concurrent.locks.Condition; 31 32 /** 33 * An optionally-bounded {@linkplain java.util.concurrent.BlockingDeque blocking 34 * deque} based on linked nodes. 35 * 36 * <p> The optional capacity bound constructor argument serves as a 37 * way to prevent excessive expansion. The capacity, if unspecified, 38 * is equal to {@link Integer#MAX_VALUE}. Linked nodes are 39 * dynamically created upon each insertion unless this would bring the 40 * deque above capacity. 41 * </p> 42 * 43 * <p>Most operations run in constant time (ignoring time spent 44 * blocking). Exceptions include {@link #remove(Object) remove}, 45 * {@link #removeFirstOccurrence removeFirstOccurrence}, {@link 46 * #removeLastOccurrence removeLastOccurrence}, {@link #contains 47 * contains}, {@link #iterator iterator.remove()}, and the bulk 48 * operations, all of which run in linear time. 49 * </p> 50 * 51 * <p>This class and its iterator implement all of the 52 * <em>optional</em> methods of the {@link Collection} and {@link 53 * Iterator} interfaces. 54 * </p> 55 * 56 * <p>This class is a member of the 57 * <a href="{@docRoot}/../technotes/guides/collections/index.html"> 58 * Java Collections Framework</a>. 59 * </p> 60 * 61 * @param <E> the type of elements held in this collection 62 * 63 * Note: This was copied from Apache Harmony and modified to suit the needs of 64 * Commons Pool. 65 * 66 * @since 2.0 67 */ 68 class LinkedBlockingDeque<E> extends AbstractQueue<E> 69 implements Deque<E>, Serializable { 70 71 /* 72 * Implemented as a simple doubly-linked list protected by a 73 * single lock and using conditions to manage blocking. 74 * 75 * To implement weakly consistent iterators, it appears we need to 76 * keep all Nodes GC-reachable from a predecessor dequeued Node. 77 * That would cause two problems: 78 * - allow a rogue Iterator to cause unbounded memory retention 79 * - cause cross-generational linking of old Nodes to new Nodes if 80 * a Node was tenured while live, which generational GCs have a 81 * hard time dealing with, causing repeated major collections. 82 * However, only non-deleted Nodes need to be reachable from 83 * dequeued Nodes, and reachability does not necessarily have to 84 * be of the kind understood by the GC. We use the trick of 85 * linking a Node that has just been dequeued to itself. Such a 86 * self-link implicitly means to jump to "first" (for next links) 87 * or "last" (for prev links). 88 */ 89 90 /* 91 * We have "diamond" multiple interface/abstract class inheritance 92 * here, and that introduces ambiguities. Often we want the 93 * BlockingDeque javadoc combined with the AbstractQueue 94 * implementation, so a lot of method specs are duplicated here. 95 */ 96 97 /** 98 * Base class for Iterators for LinkedBlockingDeque 99 */ 100 private abstract class AbstractItr implements Iterator<E> { 101 /** 102 * The next node to return in next() 103 */ 104 Node<E> next; 105 106 /** 107 * nextItem holds on to item fields because once we claim that 108 * an element exists in hasNext(), we must return item read 109 * under lock (in advance()) even if it was in the process of 110 * being removed when hasNext() was called. 111 */ 112 E nextItem; 113 114 /** 115 * Node returned by most recent call to next. Needed by remove. 116 * Reset to null if this element is deleted by a call to remove. 117 */ 118 private Node<E> lastRet; 119 120 /** 121 * Constructs a new iterator. Sets the initial position. 122 */ 123 AbstractItr() { 124 // set to initial position 125 lock.lock(); 126 try { 127 next = firstNode(); 128 nextItem = next == null ? null : next.item; 129 } finally { 130 lock.unlock(); 131 } 132 } 133 134 /** 135 * Advances next. 136 */ 137 void advance() { 138 lock.lock(); 139 try { 140 // assert next != null; 141 next = succ(next); 142 nextItem = next == null ? null : next.item; 143 } finally { 144 lock.unlock(); 145 } 146 } 147 148 /** 149 * Obtain the first node to be returned by the iterator. 150 * 151 * @return first node 152 */ 153 abstract Node<E> firstNode(); 154 155 @Override 156 public boolean hasNext() { 157 return next != null; 158 } 159 160 @Override 161 public E next() { 162 if (next == null) { 163 throw new NoSuchElementException(); 164 } 165 lastRet = next; 166 final E x = nextItem; 167 advance(); 168 return x; 169 } 170 171 /** 172 * For a given node, obtain the next node to be returned by the 173 * iterator. 174 * 175 * @param n given node 176 * 177 * @return next node 178 */ 179 abstract Node<E> nextNode(Node<E> n); 180 181 @Override 182 public void remove() { 183 final Node<E> n = lastRet; 184 if (n == null) { 185 throw new IllegalStateException(); 186 } 187 lastRet = null; 188 lock.lock(); 189 try { 190 if (n.item != null) { 191 unlink(n); 192 } 193 } finally { 194 lock.unlock(); 195 } 196 } 197 198 /** 199 * Returns the successor node of the given non-null, but 200 * possibly previously deleted, node. 201 * 202 * @param n node whose successor is sought 203 * @return successor node 204 */ 205 private Node<E> succ(Node<E> n) { 206 // Chains of deleted nodes ending in null or self-links 207 // are possible if multiple interior nodes are removed. 208 for (;;) { 209 final Node<E> s = nextNode(n); 210 if (s == null) { 211 return null; 212 } 213 if (s.item != null) { 214 return s; 215 } 216 if (s == n) { 217 return firstNode(); 218 } 219 n = s; 220 } 221 } 222 } 223 224 /** Descending iterator */ 225 private class DescendingItr extends AbstractItr { 226 @Override 227 Node<E> firstNode() { return last; } 228 @Override 229 Node<E> nextNode(final Node<E> n) { return n.prev; } 230 } 231 232 /** Forward iterator */ 233 private class Itr extends AbstractItr { 234 @Override 235 Node<E> firstNode() { return first; } 236 @Override 237 Node<E> nextNode(final Node<E> n) { return n.next; } 238 } 239 240 /** 241 * Doubly-linked list node class. 242 * 243 * @param <E> node item type 244 */ 245 private static final class Node<E> { 246 /** 247 * The item, or null if this node has been removed. 248 */ 249 E item; 250 251 /** 252 * One of: 253 * - the real predecessor Node 254 * - this Node, meaning the predecessor is tail 255 * - null, meaning there is no predecessor 256 */ 257 Node<E> prev; 258 259 /** 260 * One of: 261 * - the real successor Node 262 * - this Node, meaning the successor is head 263 * - null, meaning there is no successor 264 */ 265 Node<E> next; 266 267 /** 268 * Constructs a new list node. 269 * 270 * @param x The list item 271 * @param p Previous item 272 * @param n Next item 273 */ 274 Node(final E x, final Node<E> p, final Node<E> n) { 275 item = x; 276 prev = p; 277 next = n; 278 } 279 } 280 281 private static final long serialVersionUID = -387911632671998426L; 282 283 /** 284 * Pointer to first node. 285 * Invariant: (first == null && last == null) || 286 * (first.prev == null && first.item != null) 287 */ 288 private transient Node<E> first; // @GuardedBy("lock") 289 290 /** 291 * Pointer to last node. 292 * Invariant: (first == null && last == null) || 293 * (last.next == null && last.item != null) 294 */ 295 private transient Node<E> last; // @GuardedBy("lock") 296 297 /** Number of items in the deque */ 298 private transient int count; // @GuardedBy("lock") 299 300 /** Maximum number of items in the deque */ 301 private final int capacity; 302 303 /** Main lock guarding all access */ 304 private final InterruptibleReentrantLock lock; 305 306 /** Condition for waiting takes */ 307 private final Condition notEmpty; 308 309 /** Condition for waiting puts */ 310 private final Condition notFull; 311 312 /** 313 * Creates a {@code LinkedBlockingDeque} with a capacity of 314 * {@link Integer#MAX_VALUE}. 315 */ 316 public LinkedBlockingDeque() { 317 this(Integer.MAX_VALUE); 318 } 319 320 /** 321 * Creates a {@code LinkedBlockingDeque} with a capacity of 322 * {@link Integer#MAX_VALUE} and the given fairness policy. 323 * @param fairness true means threads waiting on the deque should be served 324 * as if waiting in a FIFO request queue 325 */ 326 public LinkedBlockingDeque(final boolean fairness) { 327 this(Integer.MAX_VALUE, fairness); 328 } 329 330 331 // Basic linking and unlinking operations, called only while holding lock 332 333 /** 334 * Creates a {@code LinkedBlockingDeque} with a capacity of 335 * {@link Integer#MAX_VALUE}, initially containing the elements of 336 * the given collection, added in traversal order of the 337 * collection's iterator. 338 * 339 * @param c the collection of elements to initially contain 340 * @throws NullPointerException if the specified collection or any 341 * of its elements are null 342 */ 343 public LinkedBlockingDeque(final Collection<? extends E> c) { 344 this(Integer.MAX_VALUE); 345 lock.lock(); // Never contended, but necessary for visibility 346 try { 347 for (final E e : c) { 348 Objects.requireNonNull(e); 349 if (!linkLast(e)) { 350 throw new IllegalStateException("Deque full"); 351 } 352 } 353 } finally { 354 lock.unlock(); 355 } 356 } 357 358 /** 359 * Creates a {@code LinkedBlockingDeque} with the given (fixed) capacity. 360 * 361 * @param capacity the capacity of this deque 362 * @throws IllegalArgumentException if {@code capacity} is less than 1 363 */ 364 public LinkedBlockingDeque(final int capacity) { 365 this(capacity, false); 366 } 367 368 /** 369 * Creates a {@code LinkedBlockingDeque} with the given (fixed) capacity 370 * and fairness policy. 371 * 372 * @param capacity the capacity of this deque 373 * @param fairness true means threads waiting on the deque should be served 374 * as if waiting in a FIFO request queue 375 * @throws IllegalArgumentException if {@code capacity} is less than 1 376 */ 377 public LinkedBlockingDeque(final int capacity, final boolean fairness) { 378 if (capacity <= 0) { 379 throw new IllegalArgumentException(); 380 } 381 this.capacity = capacity; 382 lock = new InterruptibleReentrantLock(fairness); 383 notEmpty = lock.newCondition(); 384 notFull = lock.newCondition(); 385 } 386 387 /** 388 * {@inheritDoc} 389 */ 390 @Override 391 public boolean add(final E e) { 392 addLast(e); 393 return true; 394 } 395 396 /** 397 * {@inheritDoc} 398 */ 399 @Override 400 public void addFirst(final E e) { 401 if (!offerFirst(e)) { 402 throw new IllegalStateException("Deque full"); 403 } 404 } 405 406 // BlockingDeque methods 407 408 /** 409 * {@inheritDoc} 410 */ 411 @Override 412 public void addLast(final E e) { 413 if (!offerLast(e)) { 414 throw new IllegalStateException("Deque full"); 415 } 416 } 417 418 /** 419 * Atomically removes all of the elements from this deque. 420 * The deque will be empty after this call returns. 421 */ 422 @Override 423 public void clear() { 424 lock.lock(); 425 try { 426 for (Node<E> f = first; f != null;) { 427 f.item = null; 428 final Node<E> n = f.next; 429 f.prev = null; 430 f.next = null; 431 f = n; 432 } 433 first = last = null; 434 count = 0; 435 notFull.signalAll(); 436 } finally { 437 lock.unlock(); 438 } 439 } 440 441 /** 442 * Returns {@code true} if this deque contains the specified element. 443 * More formally, returns {@code true} if and only if this deque contains 444 * at least one element {@code e} such that {@code o.equals(e)}. 445 * 446 * @param o object to be checked for containment in this deque 447 * @return {@code true} if this deque contains the specified element 448 */ 449 @Override 450 public boolean contains(final Object o) { 451 if (o == null) { 452 return false; 453 } 454 lock.lock(); 455 try { 456 for (Node<E> p = first; p != null; p = p.next) { 457 if (o.equals(p.item)) { 458 return true; 459 } 460 } 461 return false; 462 } finally { 463 lock.unlock(); 464 } 465 } 466 467 /** 468 * {@inheritDoc} 469 */ 470 @Override 471 public Iterator<E> descendingIterator() { 472 return new DescendingItr(); 473 } 474 475 /** 476 * Drains the queue to the specified collection. 477 * 478 * @param c The collection to add the elements to 479 * 480 * @return number of elements added to the collection 481 * 482 * @throws UnsupportedOperationException if the add operation is not 483 * supported by the specified collection 484 * @throws ClassCastException if the class of the elements held by this 485 * collection prevents them from being added to the specified 486 * collection 487 * @throws NullPointerException if c is null 488 * @throws IllegalArgumentException if c is this instance 489 */ 490 public int drainTo(final Collection<? super E> c) { 491 return drainTo(c, Integer.MAX_VALUE); 492 } 493 494 /** 495 * Drains no more than the specified number of elements from the queue to the 496 * specified collection. 497 * 498 * @param c collection to add the elements to 499 * @param maxElements maximum number of elements to remove from the queue 500 * 501 * @return number of elements added to the collection 502 * @throws UnsupportedOperationException if the add operation is not 503 * supported by the specified collection 504 * @throws ClassCastException if the class of the elements held by this 505 * collection prevents them from being added to the specified 506 * collection 507 * @throws NullPointerException if c is null 508 * @throws IllegalArgumentException if c is this instance 509 */ 510 public int drainTo(final Collection<? super E> c, final int maxElements) { 511 Objects.requireNonNull(c, "c"); 512 if (c == this) { 513 throw new IllegalArgumentException(); 514 } 515 lock.lock(); 516 try { 517 final int n = Math.min(maxElements, count); 518 for (int i = 0; i < n; i++) { 519 c.add(first.item); // In this order, in case add() throws. 520 unlinkFirst(); 521 } 522 return n; 523 } finally { 524 lock.unlock(); 525 } 526 } 527 528 /** 529 * Retrieves, but does not remove, the head of the queue represented by 530 * this deque. This method differs from {@link #peek peek} only in that 531 * it throws an exception if this deque is empty. 532 * 533 * <p>This method is equivalent to {@link #getFirst() getFirst}. 534 * 535 * @return the head of the queue represented by this deque 536 * @throws NoSuchElementException if this deque is empty 537 */ 538 @Override 539 public E element() { 540 return getFirst(); 541 } 542 543 /** 544 * {@inheritDoc} 545 */ 546 @Override 547 public E getFirst() { 548 final E x = peekFirst(); 549 if (x == null) { 550 throw new NoSuchElementException(); 551 } 552 return x; 553 } 554 555 /** 556 * {@inheritDoc} 557 */ 558 @Override 559 public E getLast() { 560 final E x = peekLast(); 561 if (x == null) { 562 throw new NoSuchElementException(); 563 } 564 return x; 565 } 566 567 /** 568 * Gets the length of the queue of threads waiting to take instances from this deque. See disclaimer on accuracy 569 * in {@link java.util.concurrent.locks.ReentrantLock#getWaitQueueLength(Condition)}. 570 * 571 * @return number of threads waiting on this deque's notEmpty condition. 572 */ 573 public int getTakeQueueLength() { 574 lock.lock(); 575 try { 576 return lock.getWaitQueueLength(notEmpty); 577 } finally { 578 lock.unlock(); 579 } 580 } 581 582 /** 583 * Returns true if there are threads waiting to take instances from this deque. See disclaimer on accuracy in 584 * {@link java.util.concurrent.locks.ReentrantLock#hasWaiters(Condition)}. 585 * 586 * @return true if there is at least one thread waiting on this deque's notEmpty condition. 587 */ 588 public boolean hasTakeWaiters() { 589 lock.lock(); 590 try { 591 return lock.hasWaiters(notEmpty); 592 } finally { 593 lock.unlock(); 594 } 595 } 596 597 /** 598 * Interrupts the threads currently waiting to take an object from the pool. See disclaimer on accuracy in 599 * {@link java.util.concurrent.locks.ReentrantLock#getWaitingThreads(Condition)}. 600 */ 601 public void interuptTakeWaiters() { 602 lock.lock(); 603 try { 604 lock.interruptWaiters(notEmpty); 605 } finally { 606 lock.unlock(); 607 } 608 } 609 610 /** 611 * Returns an iterator over the elements in this deque in proper sequence. 612 * The elements will be returned in order from first (head) to last (tail). 613 * The returned {@code Iterator} is a "weakly consistent" iterator that 614 * will never throw {@link java.util.ConcurrentModificationException 615 * ConcurrentModificationException}, 616 * and guarantees to traverse elements as they existed upon 617 * construction of the iterator, and may (but is not guaranteed to) 618 * reflect any modifications subsequent to construction. 619 * 620 * @return an iterator over the elements in this deque in proper sequence 621 */ 622 @Override 623 public Iterator<E> iterator() { 624 return new Itr(); 625 } 626 627 /** 628 * Links provided element as first element, or returns false if full. 629 * 630 * @param e The element to link as the first element. 631 * 632 * @return {@code true} if successful, otherwise {@code false} 633 */ 634 private boolean linkFirst(final E e) { 635 // assert lock.isHeldByCurrentThread(); 636 if (count >= capacity) { 637 return false; 638 } 639 final Node<E> f = first; 640 final Node<E> x = new Node<>(e, null, f); 641 first = x; 642 if (last == null) { 643 last = x; 644 } else { 645 f.prev = x; 646 } 647 ++count; 648 notEmpty.signal(); 649 return true; 650 } 651 652 /** 653 * Links provided element as last element, or returns false if full. 654 * 655 * @param e The element to link as the last element. 656 * 657 * @return {@code true} if successful, otherwise {@code false} 658 */ 659 private boolean linkLast(final E e) { 660 // assert lock.isHeldByCurrentThread(); 661 if (count >= capacity) { 662 return false; 663 } 664 final Node<E> l = last; 665 final Node<E> x = new Node<>(e, l, null); 666 last = x; 667 if (first == null) { 668 first = x; 669 } else { 670 l.next = x; 671 } 672 ++count; 673 notEmpty.signal(); 674 return true; 675 } 676 677 /** 678 * {@inheritDoc} 679 */ 680 @Override 681 public boolean offer(final E e) { 682 return offerLast(e); 683 } 684 685 /** 686 * Links the provided element as the last in the queue, waiting up to the 687 * specified time to do so if the queue is full. 688 * <p> 689 * This method is equivalent to {@link #offerLast(Object, long, TimeUnit)} 690 * 691 * @param e element to link 692 * @param timeout length of time to wait 693 * 694 * @return {@code true} if successful, otherwise {@code false} 695 * 696 * @throws NullPointerException if e is null 697 * @throws InterruptedException if the thread is interrupted whilst waiting 698 * for space 699 */ 700 boolean offer(final E e, final Duration timeout) throws InterruptedException { 701 return offerLast(e, timeout); 702 } 703 704 /** 705 * Links the provided element as the last in the queue, waiting up to the 706 * specified time to do so if the queue is full. 707 * <p> 708 * This method is equivalent to {@link #offerLast(Object, long, TimeUnit)} 709 * 710 * @param e element to link 711 * @param timeout length of time to wait 712 * @param unit units that timeout is expressed in 713 * 714 * @return {@code true} if successful, otherwise {@code false} 715 * 716 * @throws NullPointerException if e is null 717 * @throws InterruptedException if the thread is interrupted whilst waiting 718 * for space 719 */ 720 public boolean offer(final E e, final long timeout, final TimeUnit unit) throws InterruptedException { 721 return offerLast(e, timeout, unit); 722 } 723 724 /** 725 * {@inheritDoc} 726 */ 727 @Override 728 public boolean offerFirst(final E e) { 729 Objects.requireNonNull(e, "e"); 730 lock.lock(); 731 try { 732 return linkFirst(e); 733 } finally { 734 lock.unlock(); 735 } 736 } 737 738 /** 739 * Links the provided element as the first in the queue, waiting up to the 740 * specified time to do so if the queue is full. 741 * 742 * @param e element to link 743 * @param timeout length of time to wait 744 * 745 * @return {@code true} if successful, otherwise {@code false} 746 * 747 * @throws NullPointerException if e is null 748 * @throws InterruptedException if the thread is interrupted whilst waiting 749 * for space 750 */ 751 public boolean offerFirst(final E e, final Duration timeout) throws InterruptedException { 752 Objects.requireNonNull(e, "e"); 753 long nanos = timeout.toNanos(); 754 lock.lockInterruptibly(); 755 try { 756 while (!linkFirst(e)) { 757 if (nanos <= 0) { 758 return false; 759 } 760 nanos = notFull.awaitNanos(nanos); 761 } 762 return true; 763 } finally { 764 lock.unlock(); 765 } 766 } 767 768 /** 769 * Links the provided element as the first in the queue, waiting up to the 770 * specified time to do so if the queue is full. 771 * 772 * @param e element to link 773 * @param timeout length of time to wait 774 * @param unit units that timeout is expressed in 775 * 776 * @return {@code true} if successful, otherwise {@code false} 777 * 778 * @throws NullPointerException if e is null 779 * @throws InterruptedException if the thread is interrupted whilst waiting 780 * for space 781 */ 782 public boolean offerFirst(final E e, final long timeout, final TimeUnit unit) throws InterruptedException { 783 return offerFirst(e, PoolImplUtils.toDuration(timeout, unit)); 784 } 785 786 /** 787 * {@inheritDoc} 788 */ 789 @Override 790 public boolean offerLast(final E e) { 791 Objects.requireNonNull(e, "e"); 792 lock.lock(); 793 try { 794 return linkLast(e); 795 } finally { 796 lock.unlock(); 797 } 798 } 799 800 /** 801 * Links the provided element as the last in the queue, waiting up to the 802 * specified time to do so if the queue is full. 803 * 804 * @param e element to link 805 * @param timeout length of time to wait 806 * 807 * @return {@code true} if successful, otherwise {@code false} 808 * 809 * @throws NullPointerException if e is null 810 * @throws InterruptedException if the thread is interrupted whist waiting 811 * for space 812 */ 813 boolean offerLast(final E e, final Duration timeout) throws InterruptedException { 814 Objects.requireNonNull(e, "e"); 815 long nanos = timeout.toNanos(); 816 lock.lockInterruptibly(); 817 try { 818 while (!linkLast(e)) { 819 if (nanos <= 0) { 820 return false; 821 } 822 nanos = notFull.awaitNanos(nanos); 823 } 824 return true; 825 } finally { 826 lock.unlock(); 827 } 828 } 829 830 /** 831 * Links the provided element as the last in the queue, waiting up to the 832 * specified time to do so if the queue is full. 833 * 834 * @param e element to link 835 * @param timeout length of time to wait 836 * @param unit units that timeout is expressed in 837 * 838 * @return {@code true} if successful, otherwise {@code false} 839 * 840 * @throws NullPointerException if e is null 841 * @throws InterruptedException if the thread is interrupted whist waiting 842 * for space 843 */ 844 public boolean offerLast(final E e, final long timeout, final TimeUnit unit) throws InterruptedException { 845 return offerLast(e, PoolImplUtils.toDuration(timeout, unit)); 846 } 847 848 @Override 849 public E peek() { 850 return peekFirst(); 851 } 852 853 // BlockingQueue methods 854 855 @Override 856 public E peekFirst() { 857 lock.lock(); 858 try { 859 return first == null ? null : first.item; 860 } finally { 861 lock.unlock(); 862 } 863 } 864 865 @Override 866 public E peekLast() { 867 lock.lock(); 868 try { 869 return last == null ? null : last.item; 870 } finally { 871 lock.unlock(); 872 } 873 } 874 875 @Override 876 public E poll() { 877 return pollFirst(); 878 } 879 880 /** 881 * Unlinks the first element in the queue, waiting up to the specified time 882 * to do so if the queue is empty. 883 * 884 * <p>This method is equivalent to {@link #pollFirst(long, TimeUnit)}. 885 * 886 * @param timeout length of time to wait 887 * 888 * @return the unlinked element 889 * @throws InterruptedException if the current thread is interrupted 890 */ 891 E poll(final Duration timeout) throws InterruptedException { 892 return pollFirst(timeout); 893 } 894 895 /** 896 * Unlinks the first element in the queue, waiting up to the specified time 897 * to do so if the queue is empty. 898 * 899 * <p>This method is equivalent to {@link #pollFirst(long, TimeUnit)}. 900 * 901 * @param timeout length of time to wait 902 * @param unit units that timeout is expressed in 903 * 904 * @return the unlinked element 905 * @throws InterruptedException if the current thread is interrupted 906 */ 907 public E poll(final long timeout, final TimeUnit unit) throws InterruptedException { 908 return pollFirst(timeout, unit); 909 } 910 911 @Override 912 public E pollFirst() { 913 lock.lock(); 914 try { 915 return unlinkFirst(); 916 } finally { 917 lock.unlock(); 918 } 919 } 920 921 /** 922 * Unlinks the first element in the queue, waiting up to the specified time 923 * to do so if the queue is empty. 924 * 925 * @param timeout length of time to wait 926 * 927 * @return the unlinked element 928 * @throws InterruptedException if the current thread is interrupted 929 */ 930 E pollFirst(final Duration timeout) throws InterruptedException { 931 long nanos = timeout.toNanos(); 932 lock.lockInterruptibly(); 933 try { 934 E x; 935 while ((x = unlinkFirst()) == null) { 936 if (nanos <= 0) { 937 return null; 938 } 939 nanos = notEmpty.awaitNanos(nanos); 940 } 941 return x; 942 } finally { 943 lock.unlock(); 944 } 945 } 946 947 /** 948 * Unlinks the first element in the queue, waiting up to the specified time 949 * to do so if the queue is empty. 950 * 951 * @param timeout length of time to wait 952 * @param unit units that timeout is expressed in 953 * 954 * @return the unlinked element 955 * @throws InterruptedException if the current thread is interrupted 956 */ 957 public E pollFirst(final long timeout, final TimeUnit unit) throws InterruptedException { 958 return pollFirst(PoolImplUtils.toDuration(timeout, unit)); 959 } 960 961 @Override 962 public E pollLast() { 963 lock.lock(); 964 try { 965 return unlinkLast(); 966 } finally { 967 lock.unlock(); 968 } 969 } 970 971 /** 972 * Unlinks the last element in the queue, waiting up to the specified time 973 * to do so if the queue is empty. 974 * 975 * @param timeout length of time to wait 976 * 977 * @return the unlinked element 978 * @throws InterruptedException if the current thread is interrupted 979 */ 980 public E pollLast(final Duration timeout) 981 throws InterruptedException { 982 long nanos = timeout.toNanos(); 983 lock.lockInterruptibly(); 984 try { 985 E x; 986 while ((x = unlinkLast()) == null) { 987 if (nanos <= 0) { 988 return null; 989 } 990 nanos = notEmpty.awaitNanos(nanos); 991 } 992 return x; 993 } finally { 994 lock.unlock(); 995 } 996 } 997 998 /** 999 * Unlinks the last element in the queue, waiting up to the specified time 1000 * to do so if the queue is empty. 1001 * 1002 * @param timeout length of time to wait 1003 * @param unit units that timeout is expressed in 1004 * 1005 * @return the unlinked element 1006 * @throws InterruptedException if the current thread is interrupted 1007 */ 1008 public E pollLast(final long timeout, final TimeUnit unit) 1009 throws InterruptedException { 1010 return pollLast(PoolImplUtils.toDuration(timeout, unit)); 1011 } 1012 1013 /** 1014 * {@inheritDoc} 1015 */ 1016 @Override 1017 public E pop() { 1018 return removeFirst(); 1019 } 1020 1021 /** 1022 * {@inheritDoc} 1023 */ 1024 @Override 1025 public void push(final E e) { 1026 addFirst(e); 1027 } 1028 1029 /** 1030 * Links the provided element as the last in the queue, waiting until there 1031 * is space to do so if the queue is full. 1032 * 1033 * <p> 1034 * This method is equivalent to {@link #putLast(Object)}. 1035 * </p> 1036 * 1037 * @param e element to link 1038 * 1039 * @throws NullPointerException if e is null 1040 * @throws InterruptedException if the thread is interrupted whilst waiting 1041 * for space 1042 */ 1043 public void put(final E e) throws InterruptedException { 1044 putLast(e); 1045 } 1046 1047 /** 1048 * Links the provided element as the first in the queue, waiting until there 1049 * is space to do so if the queue is full. 1050 * 1051 * @param e element to link 1052 * 1053 * @throws NullPointerException if e is null 1054 * @throws InterruptedException if the thread is interrupted whilst waiting 1055 * for space 1056 */ 1057 public void putFirst(final E e) throws InterruptedException { 1058 Objects.requireNonNull(e, "e"); 1059 lock.lock(); 1060 try { 1061 while (!linkFirst(e)) { 1062 notFull.await(); 1063 } 1064 } finally { 1065 lock.unlock(); 1066 } 1067 } 1068 1069 /** 1070 * Links the provided element as the last in the queue, waiting until there 1071 * is space to do so if the queue is full. 1072 * 1073 * @param e element to link 1074 * 1075 * @throws NullPointerException if e is null 1076 * @throws InterruptedException if the thread is interrupted whilst waiting 1077 * for space 1078 */ 1079 public void putLast(final E e) throws InterruptedException { 1080 Objects.requireNonNull(e, "e"); 1081 lock.lock(); 1082 try { 1083 while (!linkLast(e)) { 1084 notFull.await(); 1085 } 1086 } finally { 1087 lock.unlock(); 1088 } 1089 } 1090 1091 // Stack methods 1092 1093 /** 1094 * Reconstitutes this deque from a stream (that is, 1095 * deserialize it). 1096 * @param s the stream 1097 */ 1098 private void readObject(final ObjectInputStream s) 1099 throws IOException, ClassNotFoundException { 1100 s.defaultReadObject(); 1101 count = 0; 1102 first = null; 1103 last = null; 1104 // Read in all elements and place in queue 1105 for (;;) { 1106 @SuppressWarnings("unchecked") 1107 final E item = (E)s.readObject(); 1108 if (item == null) { 1109 break; 1110 } 1111 add(item); 1112 } 1113 } 1114 1115 /** 1116 * Returns the number of additional elements that this deque can ideally 1117 * (in the absence of memory or resource constraints) accept without 1118 * blocking. This is always equal to the initial capacity of this deque 1119 * less the current {@code size} of this deque. 1120 * 1121 * <p> 1122 * Note that you <em>cannot</em> always tell if an attempt to insert 1123 * an element will succeed by inspecting {@code remainingCapacity} 1124 * because it may be the case that another thread is about to 1125 * insert or remove an element. 1126 * </p> 1127 * 1128 * @return The number of additional elements the queue is able to accept 1129 */ 1130 public int remainingCapacity() { 1131 lock.lock(); 1132 try { 1133 return capacity - count; 1134 } finally { 1135 lock.unlock(); 1136 } 1137 } 1138 1139 // Collection methods 1140 1141 /** 1142 * Retrieves and removes the head of the queue represented by this deque. 1143 * This method differs from {@link #poll poll} only in that it throws an 1144 * exception if this deque is empty. 1145 * 1146 * <p> 1147 * This method is equivalent to {@link #removeFirst() removeFirst}. 1148 * </p> 1149 * 1150 * @return the head of the queue represented by this deque 1151 * @throws NoSuchElementException if this deque is empty 1152 */ 1153 @Override 1154 public E remove() { 1155 return removeFirst(); 1156 } 1157 1158 /** 1159 * Removes the first occurrence of the specified element from this deque. 1160 * If the deque does not contain the element, it is unchanged. 1161 * More formally, removes the first element {@code e} such that 1162 * {@code o.equals(e)} (if such an element exists). 1163 * Returns {@code true} if this deque contained the specified element 1164 * (or equivalently, if this deque changed as a result of the call). 1165 * 1166 * <p> 1167 * This method is equivalent to 1168 * {@link #removeFirstOccurrence(Object) removeFirstOccurrence}. 1169 * </p> 1170 * 1171 * @param o element to be removed from this deque, if present 1172 * @return {@code true} if this deque changed as a result of the call 1173 */ 1174 @Override 1175 public boolean remove(final Object o) { 1176 return removeFirstOccurrence(o); 1177 } 1178 1179 /** 1180 * {@inheritDoc} 1181 */ 1182 @Override 1183 public E removeFirst() { 1184 final E x = pollFirst(); 1185 if (x == null) { 1186 throw new NoSuchElementException(); 1187 } 1188 return x; 1189 } 1190 1191 /* 1192 * TODO: Add support for more efficient bulk operations. 1193 * 1194 * We don't want to acquire the lock for every iteration, but we 1195 * also want other threads a chance to interact with the 1196 * collection, especially when count is close to capacity. 1197 */ 1198 1199 // /** 1200 // * Adds all of the elements in the specified collection to this 1201 // * queue. Attempts to addAll of a queue to itself result in 1202 // * {@code IllegalArgumentException}. Further, the behavior of 1203 // * this operation is undefined if the specified collection is 1204 // * modified while the operation is in progress. 1205 // * 1206 // * @param c collection containing elements to be added to this queue 1207 // * @return {@code true} if this queue changed as a result of the call 1208 // * @throws ClassCastException 1209 // * @throws NullPointerException 1210 // * @throws IllegalArgumentException 1211 // * @throws IllegalStateException 1212 // * @see #add(Object) 1213 // */ 1214 // public boolean addAll(Collection<? extends E> c) { 1215 // if (c == null) 1216 // throw new NullPointerException(); 1217 // if (c == this) 1218 // throw new IllegalArgumentException(); 1219 // final ReentrantLock lock = this.lock; 1220 // lock.lock(); 1221 // try { 1222 // boolean modified = false; 1223 // for (E e : c) 1224 // if (linkLast(e)) 1225 // modified = true; 1226 // return modified; 1227 // } finally { 1228 // lock.unlock(); 1229 // } 1230 // } 1231 1232 @Override 1233 public boolean removeFirstOccurrence(final Object o) { 1234 if (o == null) { 1235 return false; 1236 } 1237 lock.lock(); 1238 try { 1239 for (Node<E> p = first; p != null; p = p.next) { 1240 if (o.equals(p.item)) { 1241 unlink(p); 1242 return true; 1243 } 1244 } 1245 return false; 1246 } finally { 1247 lock.unlock(); 1248 } 1249 } 1250 1251 /** 1252 * {@inheritDoc} 1253 */ 1254 @Override 1255 public E removeLast() { 1256 final E x = pollLast(); 1257 if (x == null) { 1258 throw new NoSuchElementException(); 1259 } 1260 return x; 1261 } 1262 1263 @Override 1264 public boolean removeLastOccurrence(final Object o) { 1265 if (o == null) { 1266 return false; 1267 } 1268 lock.lock(); 1269 try { 1270 for (Node<E> p = last; p != null; p = p.prev) { 1271 if (o.equals(p.item)) { 1272 unlink(p); 1273 return true; 1274 } 1275 } 1276 return false; 1277 } finally { 1278 lock.unlock(); 1279 } 1280 } 1281 1282 /** 1283 * Returns the number of elements in this deque. 1284 * 1285 * @return the number of elements in this deque 1286 */ 1287 @Override 1288 public int size() { 1289 lock.lock(); 1290 try { 1291 return count; 1292 } finally { 1293 lock.unlock(); 1294 } 1295 } 1296 1297 /** 1298 * Unlinks the first element in the queue, waiting until there is an element 1299 * to unlink if the queue is empty. 1300 * 1301 * <p> 1302 * This method is equivalent to {@link #takeFirst()}. 1303 * </p> 1304 * 1305 * @return the unlinked element 1306 * @throws InterruptedException if the current thread is interrupted 1307 */ 1308 public E take() throws InterruptedException { 1309 return takeFirst(); 1310 } 1311 1312 /** 1313 * Unlinks the first element in the queue, waiting until there is an element 1314 * to unlink if the queue is empty. 1315 * 1316 * @return the unlinked element 1317 * @throws InterruptedException if the current thread is interrupted 1318 */ 1319 public E takeFirst() throws InterruptedException { 1320 lock.lock(); 1321 try { 1322 E x; 1323 while ((x = unlinkFirst()) == null) { 1324 notEmpty.await(); 1325 } 1326 return x; 1327 } finally { 1328 lock.unlock(); 1329 } 1330 } 1331 1332 /** 1333 * Unlinks the last element in the queue, waiting until there is an element 1334 * to unlink if the queue is empty. 1335 * 1336 * @return the unlinked element 1337 * @throws InterruptedException if the current thread is interrupted 1338 */ 1339 public E takeLast() throws InterruptedException { 1340 lock.lock(); 1341 try { 1342 E x; 1343 while ((x = unlinkLast()) == null) { 1344 notEmpty.await(); 1345 } 1346 return x; 1347 } finally { 1348 lock.unlock(); 1349 } 1350 } 1351 1352 /** 1353 * Returns an array containing all of the elements in this deque, in 1354 * proper sequence (from first to last element). 1355 * 1356 * <p> 1357 * The returned array will be "safe" in that no references to it are 1358 * maintained by this deque. (In other words, this method must allocate 1359 * a new array). The caller is thus free to modify the returned array. 1360 * </p> 1361 * <p> 1362 * This method acts as bridge between array-based and collection-based 1363 * APIs. 1364 * </p> 1365 * 1366 * @return an array containing all of the elements in this deque 1367 */ 1368 @Override 1369 public Object[] toArray() { 1370 lock.lock(); 1371 try { 1372 final Object[] a = new Object[count]; 1373 int k = 0; 1374 for (Node<E> p = first; p != null; p = p.next) { 1375 a[k++] = p.item; 1376 } 1377 return a; 1378 } finally { 1379 lock.unlock(); 1380 } 1381 } 1382 1383 /** 1384 * {@inheritDoc} 1385 */ 1386 @SuppressWarnings("unchecked") 1387 @Override 1388 public <T> T[] toArray(T[] a) { 1389 lock.lock(); 1390 try { 1391 if (a.length < count) { 1392 a = (T[])java.lang.reflect.Array.newInstance 1393 (a.getClass().getComponentType(), count); 1394 } 1395 int k = 0; 1396 for (Node<E> p = first; p != null; p = p.next) { 1397 a[k++] = (T)p.item; 1398 } 1399 if (a.length > k) { 1400 a[k] = null; 1401 } 1402 return a; 1403 } finally { 1404 lock.unlock(); 1405 } 1406 } 1407 1408 @Override 1409 public String toString() { 1410 lock.lock(); 1411 try { 1412 return super.toString(); 1413 } finally { 1414 lock.unlock(); 1415 } 1416 } 1417 1418 /** 1419 * Unlinks the provided node. 1420 * 1421 * @param x The node to unlink 1422 */ 1423 private void unlink(final Node<E> x) { 1424 // assert lock.isHeldByCurrentThread(); 1425 final Node<E> p = x.prev; 1426 final Node<E> n = x.next; 1427 if (p == null) { 1428 unlinkFirst(); 1429 } else if (n == null) { 1430 unlinkLast(); 1431 } else { 1432 p.next = n; 1433 n.prev = p; 1434 x.item = null; 1435 // Don't mess with x's links. They may still be in use by 1436 // an iterator. 1437 --count; 1438 notFull.signal(); 1439 } 1440 } 1441 1442 // Monitoring methods 1443 1444 /** 1445 * Removes and returns the first element, or null if empty. 1446 * 1447 * @return The first element or {@code null} if empty 1448 */ 1449 private E unlinkFirst() { 1450 // assert lock.isHeldByCurrentThread(); 1451 final Node<E> f = first; 1452 if (f == null) { 1453 return null; 1454 } 1455 final Node<E> n = f.next; 1456 final E item = f.item; 1457 f.item = null; 1458 f.next = f; // help GC 1459 first = n; 1460 if (n == null) { 1461 last = null; 1462 } else { 1463 n.prev = null; 1464 } 1465 --count; 1466 notFull.signal(); 1467 return item; 1468 } 1469 1470 /** 1471 * Removes and returns the last element, or null if empty. 1472 * 1473 * @return The first element or {@code null} if empty 1474 */ 1475 private E unlinkLast() { 1476 // assert lock.isHeldByCurrentThread(); 1477 final Node<E> l = last; 1478 if (l == null) { 1479 return null; 1480 } 1481 final Node<E> p = l.prev; 1482 final E item = l.item; 1483 l.item = null; 1484 l.prev = l; // help GC 1485 last = p; 1486 if (p == null) { 1487 first = null; 1488 } else { 1489 p.next = null; 1490 } 1491 --count; 1492 notFull.signal(); 1493 return item; 1494 } 1495 1496 /** 1497 * Saves the state of this deque to a stream (that is, serialize it). 1498 * 1499 * @serialData The capacity (int), followed by elements (each an 1500 * {@code Object}) in the proper order, followed by a null 1501 * @param s the stream 1502 * @throws IOException if I/O errors occur while writing to the underlying {@code OutputStream} 1503 */ 1504 private void writeObject(final java.io.ObjectOutputStream s) throws IOException { 1505 lock.lock(); 1506 try { 1507 // Write out capacity and any hidden stuff 1508 s.defaultWriteObject(); 1509 // Write out all elements in the proper order. 1510 for (Node<E> p = first; p != null; p = p.next) { 1511 s.writeObject(p.item); 1512 } 1513 // Use trailing null as sentinel 1514 s.writeObject(null); 1515 } finally { 1516 lock.unlock(); 1517 } 1518 } 1519 }