1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17 package org.apache.commons.pool2.impl;
18
19 import java.io.IOException;
20 import java.io.ObjectInputStream;
21 import java.io.Serializable;
22 import java.time.Duration;
23 import java.util.AbstractQueue;
24 import java.util.Collection;
25 import java.util.Iterator;
26 import java.util.NoSuchElementException;
27 import java.util.Objects;
28 import java.util.concurrent.BlockingDeque;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.locks.Condition;
31
32 /**
33 * An optionally-bounded {@linkplain java.util.concurrent.BlockingDeque blocking
34 * deque} based on linked nodes.
35 *
36 * <p> The optional capacity bound constructor argument serves as a
37 * way to prevent excessive expansion. The capacity, if unspecified,
38 * is equal to {@link Integer#MAX_VALUE}. Linked nodes are
39 * dynamically created upon each insertion unless this would bring the
40 * deque above capacity.
41 * </p>
42 *
43 * <p>Most operations run in constant time (ignoring time spent
44 * blocking). Exceptions include {@link #remove(Object) remove},
45 * {@link #removeFirstOccurrence removeFirstOccurrence}, {@link
46 * #removeLastOccurrence removeLastOccurrence}, {@link #contains
47 * contains}, {@link #iterator iterator.remove()}, and the bulk
48 * operations, all of which run in linear time.
49 * </p>
50 *
51 * <p>This class and its iterator implement all of the
52 * <em>optional</em> methods of the {@link Collection} and {@link
53 * Iterator} interfaces.
54 * </p>
55 *
56 * <p>This class is a member of the
57 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
58 * Java Collections Framework</a>.
59 * </p>
60 *
61 * @param <E> the type of elements held in this collection
62 *
63 * Note: This was copied from Apache Harmony and modified to suit the needs of
64 * Commons Pool.
65 *
66 * @since 2.0
67 */
68 final class LinkedBlockingDeque<E> extends AbstractQueue<E>
69 implements BlockingDeque<E>, Serializable {
70
71 /*
72 * Implemented as a simple doubly-linked list protected by a
73 * single lock and using conditions to manage blocking.
74 *
75 * To implement weakly consistent iterators, it appears we need to
76 * keep all Nodes GC-reachable from a predecessor dequeued Node.
77 * That would cause two problems:
78 * - allow a rogue Iterator to cause unbounded memory retention
79 * - cause cross-generational linking of old Nodes to new Nodes if
80 * a Node was tenured while live, which generational GCs have a
81 * hard time dealing with, causing repeated major collections.
82 * However, only non-deleted Nodes need to be reachable from
83 * dequeued Nodes, and reachability does not necessarily have to
84 * be of the kind understood by the GC. We use the trick of
85 * linking a Node that has just been dequeued to itself. Such a
86 * self-link implicitly means to jump to "first" (for next links)
87 * or "last" (for prev links).
88 */
89
90 /*
91 * We have "diamond" multiple interface/abstract class inheritance
92 * here, and that introduces ambiguities. Often we want the
93 * BlockingDeque javadoc combined with the AbstractQueue
94 * implementation, so a lot of method specs are duplicated here.
95 */
96
97 /**
98 * Base class for Iterators for LinkedBlockingDeque
99 */
100 private abstract class AbstractItr implements Iterator<E> {
101 /**
102 * The next node to return in next()
103 */
104 Node<E> next;
105
106 /**
107 * nextItem holds on to item fields because once we claim that
108 * an element exists in hasNext(), we must return item read
109 * under lock (in advance()) even if it was in the process of
110 * being removed when hasNext() was called.
111 */
112 E nextItem;
113
114 /**
115 * Node returned by most recent call to next. Needed by remove.
116 * Reset to null if this element is deleted by a call to remove.
117 */
118 private Node<E> lastRet;
119
120 /**
121 * Constructs a new iterator. Sets the initial position.
122 */
123 AbstractItr() {
124 // set to initial position
125 lock.lock();
126 try {
127 next = firstNode();
128 nextItem = next == null ? null : next.item;
129 } finally {
130 lock.unlock();
131 }
132 }
133
134 /**
135 * Advances next.
136 */
137 void advance() {
138 lock.lock();
139 try {
140 // assert next != null;
141 next = succ(next);
142 nextItem = next == null ? null : next.item;
143 } finally {
144 lock.unlock();
145 }
146 }
147
148 /**
149 * Obtain the first node to be returned by the iterator.
150 *
151 * @return first node
152 */
153 abstract Node<E> firstNode();
154
155 @Override
156 public boolean hasNext() {
157 return next != null;
158 }
159
160 @Override
161 public E next() {
162 if (next == null) {
163 throw new NoSuchElementException();
164 }
165 lastRet = next;
166 final E x = nextItem;
167 advance();
168 return x;
169 }
170
171 /**
172 * For a given node, obtain the next node to be returned by the
173 * iterator.
174 *
175 * @param n given node
176 * @return next node
177 */
178 abstract Node<E> nextNode(Node<E> n);
179
180 @Override
181 public void remove() {
182 final Node<E> n = lastRet;
183 if (n == null) {
184 throw new IllegalStateException();
185 }
186 lastRet = null;
187 lock.lock();
188 try {
189 if (n.item != null) {
190 unlink(n);
191 }
192 } finally {
193 lock.unlock();
194 }
195 }
196
197 /**
198 * Returns the successor node of the given non-null, but
199 * possibly previously deleted, node.
200 *
201 * @param n node whose successor is sought
202 * @return successor node
203 */
204 private Node<E> succ(Node<E> n) {
205 // Chains of deleted nodes ending in null or self-links
206 // are possible if multiple interior nodes are removed.
207 for (;;) {
208 final Node<E> s = nextNode(n);
209 if (s == null) {
210 return null;
211 }
212 if (s.item != null) {
213 return s;
214 }
215 if (s == n) {
216 return firstNode();
217 }
218 n = s;
219 }
220 }
221 }
222
223 /** Descending iterator */
224 private final class DescendingItr extends AbstractItr {
225 @Override
226 Node<E> firstNode() {
227 return last;
228 }
229
230 @Override
231 Node<E> nextNode(final Node<E> n) {
232 return n.prev;
233 }
234 }
235
236 /** Forward iterator */
237 private final class Itr extends AbstractItr {
238 @Override
239 Node<E> firstNode() {
240 return first;
241 }
242
243 @Override
244 Node<E> nextNode(final Node<E> n) {
245 return n.next;
246 }
247 }
248
249 /**
250 * Doubly-linked list node class.
251 *
252 * @param <E> node item type
253 */
254 private static final class Node<E> {
255 /**
256 * The item, or null if this node has been removed.
257 */
258 E item;
259
260 /**
261 * One of:
262 * - the real predecessor Node
263 * - this Node, meaning the predecessor is tail
264 * - null, meaning there is no predecessor
265 */
266 Node<E> prev;
267
268 /**
269 * One of:
270 * - the real successor Node
271 * - this Node, meaning the successor is head
272 * - null, meaning there is no successor
273 */
274 Node<E> next;
275
276 /**
277 * Constructs a new list node.
278 *
279 * @param x The list item
280 * @param p Previous item
281 * @param n Next item
282 */
283 Node(final E x, final Node<E> p, final Node<E> n) {
284 item = x;
285 prev = p;
286 next = n;
287 }
288 }
289
290 private static final long serialVersionUID = -387911632671998426L;
291
292 /**
293 * Pointer to first node.
294 * Invariant: (first == null && last == null) ||
295 * (first.prev == null && first.item != null)
296 */
297 private transient Node<E> first; // @GuardedBy("lock")
298
299 /**
300 * Pointer to last node.
301 * Invariant: (first == null && last == null) ||
302 * (last.next == null && last.item != null)
303 */
304 private transient Node<E> last; // @GuardedBy("lock")
305
306 /** Number of items in the deque */
307 private transient int count; // @GuardedBy("lock")
308
309 /** Maximum number of items in the deque */
310 private final int capacity;
311
312 /** Main lock guarding all access */
313 private final InterruptibleReentrantLock lock;
314
315 /** Condition for waiting takes */
316 private final Condition notEmpty;
317
318 /** Condition for waiting puts */
319 private final Condition notFull;
320
321 /**
322 * Creates a {@code LinkedBlockingDeque} with a capacity of
323 * {@link Integer#MAX_VALUE}.
324 */
325 LinkedBlockingDeque() {
326 this(Integer.MAX_VALUE);
327 }
328
329 /**
330 * Creates a {@code LinkedBlockingDeque} with a capacity of
331 * {@link Integer#MAX_VALUE} and the given fairness policy.
332 * @param fairness true means threads waiting on the deque should be served
333 * as if waiting in a FIFO request queue
334 */
335 LinkedBlockingDeque(final boolean fairness) {
336 this(Integer.MAX_VALUE, fairness);
337 }
338
339 // Basic linking and unlinking operations, called only while holding lock
340
341 /**
342 * Creates a {@code LinkedBlockingDeque} with a capacity of
343 * {@link Integer#MAX_VALUE}, initially containing the elements of
344 * the given collection, added in traversal order of the
345 * collection's iterator.
346 *
347 * @param c the collection of elements to initially contain
348 * @throws NullPointerException if the specified collection or any
349 * of its elements are null
350 */
351 LinkedBlockingDeque(final Collection<? extends E> c) {
352 this(Integer.MAX_VALUE);
353 lock.lock(); // Never contended, but necessary for visibility
354 try {
355 for (final E e : c) {
356 Objects.requireNonNull(e);
357 if (!linkLast(e)) {
358 throw new IllegalStateException("Deque full");
359 }
360 }
361 } finally {
362 lock.unlock();
363 }
364 }
365
366 /**
367 * Creates a {@code LinkedBlockingDeque} with the given (fixed) capacity.
368 *
369 * @param capacity the capacity of this deque
370 * @throws IllegalArgumentException if {@code capacity} is less than 1
371 */
372 LinkedBlockingDeque(final int capacity) {
373 this(capacity, false);
374 }
375
376 /**
377 * Creates a {@code LinkedBlockingDeque} with the given (fixed) capacity
378 * and fairness policy.
379 *
380 * @param capacity the capacity of this deque
381 * @param fairness true means threads waiting on the deque should be served
382 * as if waiting in a FIFO request queue
383 * @throws IllegalArgumentException if {@code capacity} is less than 1
384 */
385 LinkedBlockingDeque(final int capacity, final boolean fairness) {
386 if (capacity <= 0) {
387 throw new IllegalArgumentException();
388 }
389 this.capacity = capacity;
390 lock = new InterruptibleReentrantLock(fairness);
391 notEmpty = lock.newCondition();
392 notFull = lock.newCondition();
393 }
394
395 /**
396 * {@inheritDoc}
397 */
398 @Override
399 public boolean add(final E e) {
400 addLast(e);
401 return true;
402 }
403
404 /**
405 * {@inheritDoc}
406 */
407 @Override
408 public void addFirst(final E e) {
409 if (!offerFirst(e)) {
410 throw new IllegalStateException("Deque full");
411 }
412 }
413
414 // BlockingDeque methods
415
416 /**
417 * {@inheritDoc}
418 */
419 @Override
420 public void addLast(final E e) {
421 if (!offerLast(e)) {
422 throw new IllegalStateException("Deque full");
423 }
424 }
425
426 /**
427 * Atomically removes all of the elements from this deque.
428 * The deque will be empty after this call returns.
429 */
430 @Override
431 public void clear() {
432 lock.lock();
433 try {
434 for (Node<E> f = first; f != null;) {
435 f.item = null;
436 final Node<E> n = f.next;
437 f.prev = null;
438 f.next = null;
439 f = n;
440 }
441 first = last = null;
442 count = 0;
443 notFull.signalAll();
444 } finally {
445 lock.unlock();
446 }
447 }
448
449 /**
450 * Returns {@code true} if this deque contains the specified element.
451 * More formally, returns {@code true} if and only if this deque contains
452 * at least one element {@code e} such that {@code o.equals(e)}.
453 *
454 * @param o object to be checked for containment in this deque
455 * @return {@code true} if this deque contains the specified element
456 */
457 @Override
458 public boolean contains(final Object o) {
459 if (o == null) {
460 return false;
461 }
462 lock.lock();
463 try {
464 for (Node<E> p = first; p != null; p = p.next) {
465 if (o.equals(p.item)) {
466 return true;
467 }
468 }
469 return false;
470 } finally {
471 lock.unlock();
472 }
473 }
474
475 /**
476 * {@inheritDoc}
477 */
478 @Override
479 public Iterator<E> descendingIterator() {
480 return new DescendingItr();
481 }
482
483 /**
484 * Drains the queue to the specified collection.
485 *
486 * @param c The collection to add the elements to
487 * @return number of elements added to the collection
488 * @throws UnsupportedOperationException if the add operation is not
489 * supported by the specified collection
490 * @throws ClassCastException if the class of the elements held by this
491 * collection prevents them from being added to the specified
492 * collection
493 * @throws NullPointerException if c is null
494 * @throws IllegalArgumentException if c is this instance
495 */
496 public int drainTo(final Collection<? super E> c) {
497 return drainTo(c, Integer.MAX_VALUE);
498 }
499
500 /**
501 * Drains no more than the specified number of elements from the queue to the
502 * specified collection.
503 *
504 * @param collection collection to add the elements to
505 * @param maxElements maximum number of elements to remove from the queue
506 * @return number of elements added to the collection
507 * @throws UnsupportedOperationException if the add operation is not
508 * supported by the specified collection
509 * @throws ClassCastException if the class of the elements held by this
510 * collection prevents them from being added to the specified
511 * collection
512 * @throws NullPointerException if c is null
513 * @throws IllegalArgumentException if c is this instance
514 */
515 public int drainTo(final Collection<? super E> collection, final int maxElements) {
516 Objects.requireNonNull(collection, "collection");
517 if (collection == this) {
518 throw new IllegalArgumentException();
519 }
520 lock.lock();
521 try {
522 final int n = Math.min(maxElements, count);
523 for (int i = 0; i < n; i++) {
524 collection.add(first.item); // In this order, in case add() throws.
525 unlinkFirst();
526 }
527 return n;
528 } finally {
529 lock.unlock();
530 }
531 }
532
533 /**
534 * Retrieves, but does not remove, the head of the queue represented by
535 * this deque. This method differs from {@link #peek peek} only in that
536 * it throws an exception if this deque is empty.
537 *
538 * <p>This method is equivalent to {@link #getFirst() getFirst}.
539 *
540 * @return the head of the queue represented by this deque
541 * @throws NoSuchElementException if this deque is empty
542 */
543 @Override
544 public E element() {
545 return getFirst();
546 }
547
548 /**
549 * {@inheritDoc}
550 */
551 @Override
552 public E getFirst() {
553 final E x = peekFirst();
554 if (x == null) {
555 throw new NoSuchElementException();
556 }
557 return x;
558 }
559
560 /**
561 * {@inheritDoc}
562 */
563 @Override
564 public E getLast() {
565 final E x = peekLast();
566 if (x == null) {
567 throw new NoSuchElementException();
568 }
569 return x;
570 }
571
572 /**
573 * Gets the length of the queue of threads waiting to take instances from this deque. See disclaimer on accuracy
574 * in {@link java.util.concurrent.locks.ReentrantLock#getWaitQueueLength(Condition)}.
575 *
576 * @return number of threads waiting on this deque's notEmpty condition.
577 */
578 int getTakeQueueLength() {
579 lock.lock();
580 try {
581 return lock.getWaitQueueLength(notEmpty);
582 } finally {
583 lock.unlock();
584 }
585 }
586
587 /**
588 * Returns true if there are threads waiting to take instances from this deque. See disclaimer on accuracy in
589 * {@link java.util.concurrent.locks.ReentrantLock#hasWaiters(Condition)}.
590 *
591 * @return true if there is at least one thread waiting on this deque's notEmpty condition.
592 */
593 boolean hasTakeWaiters() {
594 lock.lock();
595 try {
596 return lock.hasWaiters(notEmpty);
597 } finally {
598 lock.unlock();
599 }
600 }
601
602 /**
603 * Interrupts the threads currently waiting to take an object from the pool. See disclaimer on accuracy in
604 * {@link java.util.concurrent.locks.ReentrantLock#getWaitingThreads(Condition)}.
605 */
606 void interuptTakeWaiters() {
607 lock.lock();
608 try {
609 lock.interruptWaiters(notEmpty);
610 } finally {
611 lock.unlock();
612 }
613 }
614
615 /**
616 * Returns an iterator over the elements in this deque in proper sequence.
617 * The elements will be returned in order from first (head) to last (tail).
618 * The returned {@code Iterator} is a "weakly consistent" iterator that
619 * will never throw {@link java.util.ConcurrentModificationException
620 * ConcurrentModificationException},
621 * and guarantees to traverse elements as they existed upon
622 * construction of the iterator, and may (but is not guaranteed to)
623 * reflect any modifications subsequent to construction.
624 *
625 * @return an iterator over the elements in this deque in proper sequence
626 */
627 @Override
628 public Iterator<E> iterator() {
629 return new Itr();
630 }
631
632 /**
633 * Links provided element as first element, or returns false if full.
634 *
635 * @param e The element to link as the first element.
636 * @return {@code true} if successful, otherwise {@code false}
637 */
638 private boolean linkFirst(final E e) {
639 // assert lock.isHeldByCurrentThread();
640 if (count >= capacity) {
641 return false;
642 }
643 final Node<E> f = first;
644 final Node<E> x = new Node<>(e, null, f);
645 first = x;
646 if (last == null) {
647 last = x;
648 } else {
649 f.prev = x;
650 }
651 ++count;
652 notEmpty.signal();
653 return true;
654 }
655
656 /**
657 * Links provided element as last element, or returns false if full.
658 *
659 * @param e The element to link as the last element.
660 * @return {@code true} if successful, otherwise {@code false}
661 */
662 private boolean linkLast(final E e) {
663 // assert lock.isHeldByCurrentThread();
664 if (count >= capacity) {
665 return false;
666 }
667 final Node<E> l = last;
668 final Node<E> x = new Node<>(e, l, null);
669 last = x;
670 if (first == null) {
671 first = x;
672 } else {
673 l.next = x;
674 }
675 ++count;
676 notEmpty.signal();
677 return true;
678 }
679
680 /**
681 * {@inheritDoc}
682 */
683 @Override
684 public boolean offer(final E e) {
685 return offerLast(e);
686 }
687
688 /**
689 * Links the provided element as the last in the queue, waiting up to the
690 * specified time to do so if the queue is full.
691 * <p>
692 * This method is equivalent to {@link #offerLast(Object, long, TimeUnit)}
693 *
694 * @param e element to link
695 * @param timeout length of time to wait
696 * @return {@code true} if successful, otherwise {@code false}
697 * @throws NullPointerException if e is null
698 * @throws InterruptedException if the thread is interrupted whilst waiting
699 * for space
700 */
701 boolean offer(final E e, final Duration timeout) throws InterruptedException {
702 return offerLast(e, timeout);
703 }
704
705 /**
706 * Links the provided element as the last in the queue, waiting up to the
707 * specified time to do so if the queue is full.
708 * <p>
709 * This method is equivalent to {@link #offerLast(Object, long, TimeUnit)}
710 *
711 * @param e element to link
712 * @param timeout length of time to wait
713 * @param unit units that timeout is expressed in
714 * @return {@code true} if successful, otherwise {@code false}
715 * @throws NullPointerException if e is null
716 * @throws InterruptedException if the thread is interrupted whilst waiting
717 * for space
718 */
719 public boolean offer(final E e, final long timeout, final TimeUnit unit) throws InterruptedException {
720 return offerLast(e, timeout, unit);
721 }
722
723 /**
724 * {@inheritDoc}
725 */
726 @Override
727 public boolean offerFirst(final E e) {
728 Objects.requireNonNull(e, "e");
729 lock.lock();
730 try {
731 return linkFirst(e);
732 } finally {
733 lock.unlock();
734 }
735 }
736
737 /**
738 * Links the provided element as the first in the queue, waiting up to the
739 * specified time to do so if the queue is full.
740 *
741 * @param e element to link
742 * @param timeout length of time to wait
743 * @return {@code true} if successful, otherwise {@code false}
744 * @throws NullPointerException if e is null
745 * @throws InterruptedException if the thread is interrupted whilst waiting
746 * for space
747 */
748 boolean offerFirst(final E e, final Duration timeout) throws InterruptedException {
749 Objects.requireNonNull(e, "e");
750 long nanos = timeout.toNanos();
751 lock.lockInterruptibly();
752 try {
753 while (!linkFirst(e)) {
754 if (nanos <= 0) {
755 return false;
756 }
757 nanos = notFull.awaitNanos(nanos);
758 }
759 return true;
760 } finally {
761 lock.unlock();
762 }
763 }
764
765 /**
766 * Links the provided element as the first in the queue, waiting up to the
767 * specified time to do so if the queue is full.
768 *
769 * @param e element to link
770 * @param timeout length of time to wait
771 * @param unit units that timeout is expressed in
772 * @return {@code true} if successful, otherwise {@code false}
773 * @throws NullPointerException if e is null
774 * @throws InterruptedException if the thread is interrupted whilst waiting
775 * for space
776 */
777 public boolean offerFirst(final E e, final long timeout, final TimeUnit unit) throws InterruptedException {
778 return offerFirst(e, PoolImplUtils.toDuration(timeout, unit));
779 }
780
781 /**
782 * {@inheritDoc}
783 */
784 @Override
785 public boolean offerLast(final E e) {
786 Objects.requireNonNull(e, "e");
787 lock.lock();
788 try {
789 return linkLast(e);
790 } finally {
791 lock.unlock();
792 }
793 }
794
795 /**
796 * Links the provided element as the last in the queue, waiting up to the
797 * specified time to do so if the queue is full.
798 *
799 * @param e element to link
800 * @param timeout length of time to wait
801 * @return {@code true} if successful, otherwise {@code false}
802 * @throws NullPointerException if e is null
803 * @throws InterruptedException if the thread is interrupted whist waiting
804 * for space
805 */
806 boolean offerLast(final E e, final Duration timeout) throws InterruptedException {
807 Objects.requireNonNull(e, "e");
808 long nanos = timeout.toNanos();
809 lock.lockInterruptibly();
810 try {
811 while (!linkLast(e)) {
812 if (nanos <= 0) {
813 return false;
814 }
815 nanos = notFull.awaitNanos(nanos);
816 }
817 return true;
818 } finally {
819 lock.unlock();
820 }
821 }
822
823 /**
824 * Links the provided element as the last in the queue, waiting up to the
825 * specified time to do so if the queue is full.
826 *
827 * @param e element to link
828 * @param timeout length of time to wait
829 * @param unit units that timeout is expressed in
830 * @return {@code true} if successful, otherwise {@code false}
831 * @throws NullPointerException if e is null
832 * @throws InterruptedException if the thread is interrupted whist waiting
833 * for space
834 */
835 public boolean offerLast(final E e, final long timeout, final TimeUnit unit) throws InterruptedException {
836 return offerLast(e, PoolImplUtils.toDuration(timeout, unit));
837 }
838
839 @Override
840 public E peek() {
841 return peekFirst();
842 }
843
844 // BlockingQueue methods
845
846 @Override
847 public E peekFirst() {
848 lock.lock();
849 try {
850 return first == null ? null : first.item;
851 } finally {
852 lock.unlock();
853 }
854 }
855
856 @Override
857 public E peekLast() {
858 lock.lock();
859 try {
860 return last == null ? null : last.item;
861 } finally {
862 lock.unlock();
863 }
864 }
865
866 @Override
867 public E poll() {
868 return pollFirst();
869 }
870
871 /**
872 * Unlinks the first element in the queue, waiting up to the specified time
873 * to do so if the queue is empty.
874 *
875 * <p>This method is equivalent to {@link #pollFirst(long, TimeUnit)}.
876 *
877 * @param timeout length of time to wait
878 * @return the unlinked element
879 * @throws InterruptedException if the current thread is interrupted
880 */
881 E poll(final Duration timeout) throws InterruptedException {
882 return pollFirst(timeout);
883 }
884
885 /**
886 * Unlinks the first element in the queue, waiting up to the specified time
887 * to do so if the queue is empty.
888 *
889 * <p>This method is equivalent to {@link #pollFirst(long, TimeUnit)}.
890 *
891 * @param timeout length of time to wait
892 * @param unit units that timeout is expressed in
893 * @return the unlinked element
894 * @throws InterruptedException if the current thread is interrupted
895 */
896 public E poll(final long timeout, final TimeUnit unit) throws InterruptedException {
897 return pollFirst(timeout, unit);
898 }
899
900 @Override
901 public E pollFirst() {
902 lock.lock();
903 try {
904 return unlinkFirst();
905 } finally {
906 lock.unlock();
907 }
908 }
909
910 /**
911 * Unlinks the first element in the queue, waiting up to the specified time
912 * to do so if the queue is empty.
913 *
914 * @param timeout length of time to wait
915 * @return the unlinked element
916 * @throws InterruptedException if the current thread is interrupted
917 */
918 E pollFirst(final Duration timeout) throws InterruptedException {
919 long nanos = timeout.toNanos();
920 lock.lockInterruptibly();
921 try {
922 E x;
923 while ((x = unlinkFirst()) == null) {
924 if (nanos <= 0) {
925 return null;
926 }
927 nanos = notEmpty.awaitNanos(nanos);
928 }
929 return x;
930 } finally {
931 lock.unlock();
932 }
933 }
934
935 /**
936 * Unlinks the first element in the queue, waiting up to the specified time
937 * to do so if the queue is empty.
938 *
939 * @param timeout length of time to wait
940 * @param unit units that timeout is expressed in
941 * @return the unlinked element
942 * @throws InterruptedException if the current thread is interrupted
943 */
944 public E pollFirst(final long timeout, final TimeUnit unit) throws InterruptedException {
945 return pollFirst(PoolImplUtils.toDuration(timeout, unit));
946 }
947
948 @Override
949 public E pollLast() {
950 lock.lock();
951 try {
952 return unlinkLast();
953 } finally {
954 lock.unlock();
955 }
956 }
957
958 /**
959 * Unlinks the last element in the queue, waiting up to the specified time
960 * to do so if the queue is empty.
961 *
962 * @param timeout length of time to wait
963 * @return the unlinked element
964 * @throws InterruptedException if the current thread is interrupted
965 */
966 E pollLast(final Duration timeout)
967 throws InterruptedException {
968 long nanos = timeout.toNanos();
969 lock.lockInterruptibly();
970 try {
971 E x;
972 while ((x = unlinkLast()) == null) {
973 if (nanos <= 0) {
974 return null;
975 }
976 nanos = notEmpty.awaitNanos(nanos);
977 }
978 return x;
979 } finally {
980 lock.unlock();
981 }
982 }
983
984 /**
985 * Unlinks the last element in the queue, waiting up to the specified time
986 * to do so if the queue is empty.
987 *
988 * @param timeout length of time to wait
989 * @param unit units that timeout is expressed in
990 * @return the unlinked element
991 * @throws InterruptedException if the current thread is interrupted
992 */
993 public E pollLast(final long timeout, final TimeUnit unit)
994 throws InterruptedException {
995 return pollLast(PoolImplUtils.toDuration(timeout, unit));
996 }
997
998 /**
999 * {@inheritDoc}
1000 */
1001 @Override
1002 public E pop() {
1003 return removeFirst();
1004 }
1005
1006 /**
1007 * {@inheritDoc}
1008 */
1009 @Override
1010 public void push(final E e) {
1011 addFirst(e);
1012 }
1013
1014 /**
1015 * Links the provided element as the last in the queue, waiting until there
1016 * is space to do so if the queue is full.
1017 *
1018 * <p>
1019 * This method is equivalent to {@link #putLast(Object)}.
1020 * </p>
1021 *
1022 * @param e element to link
1023 * @throws NullPointerException if e is null
1024 * @throws InterruptedException if the thread is interrupted whilst waiting
1025 * for space
1026 */
1027 public void put(final E e) throws InterruptedException {
1028 putLast(e);
1029 }
1030
1031 /**
1032 * Links the provided element as the first in the queue, waiting until there
1033 * is space to do so if the queue is full.
1034 *
1035 * @param e element to link
1036 * @throws NullPointerException if e is null
1037 * @throws InterruptedException if the thread is interrupted whilst waiting
1038 * for space
1039 */
1040 public void putFirst(final E e) throws InterruptedException {
1041 Objects.requireNonNull(e, "e");
1042 lock.lock();
1043 try {
1044 while (!linkFirst(e)) {
1045 notFull.await();
1046 }
1047 } finally {
1048 lock.unlock();
1049 }
1050 }
1051
1052 /**
1053 * Links the provided element as the last in the queue, waiting until there
1054 * is space to do so if the queue is full.
1055 *
1056 * @param e element to link
1057 * @throws NullPointerException if e is null
1058 * @throws InterruptedException if the thread is interrupted whilst waiting
1059 * for space
1060 */
1061 public void putLast(final E e) throws InterruptedException {
1062 Objects.requireNonNull(e, "e");
1063 lock.lock();
1064 try {
1065 while (!linkLast(e)) {
1066 notFull.await();
1067 }
1068 } finally {
1069 lock.unlock();
1070 }
1071 }
1072
1073 // Stack methods
1074
1075 /**
1076 * Reconstitutes this deque from a stream (that is, deserialize it).
1077 *
1078 * @param s the stream
1079 */
1080 private void readObject(final ObjectInputStream s) throws IOException, ClassNotFoundException {
1081 s.defaultReadObject();
1082 count = 0;
1083 first = null;
1084 last = null;
1085 // Read in all elements and place in queue
1086 for (;;) {
1087 @SuppressWarnings("unchecked")
1088 final E item = (E) s.readObject();
1089 if (item == null) {
1090 break;
1091 }
1092 add(item);
1093 }
1094 }
1095
1096 /**
1097 * Returns the number of additional elements that this deque can ideally
1098 * (in the absence of memory or resource constraints) accept without
1099 * blocking. This is always equal to the initial capacity of this deque
1100 * less the current {@code size} of this deque.
1101 *
1102 * <p>
1103 * Note that you <em>cannot</em> always tell if an attempt to insert
1104 * an element will succeed by inspecting {@code remainingCapacity}
1105 * because it may be the case that another thread is about to
1106 * insert or remove an element.
1107 * </p>
1108 *
1109 * @return The number of additional elements the queue is able to accept
1110 */
1111 public int remainingCapacity() {
1112 lock.lock();
1113 try {
1114 return capacity - count;
1115 } finally {
1116 lock.unlock();
1117 }
1118 }
1119
1120 // Collection methods
1121
1122 /**
1123 * Retrieves and removes the head of the queue represented by this deque.
1124 * This method differs from {@link #poll poll} only in that it throws an
1125 * exception if this deque is empty.
1126 *
1127 * <p>
1128 * This method is equivalent to {@link #removeFirst() removeFirst}.
1129 * </p>
1130 *
1131 * @return the head of the queue represented by this deque
1132 * @throws NoSuchElementException if this deque is empty
1133 */
1134 @Override
1135 public E remove() {
1136 return removeFirst();
1137 }
1138
1139 /**
1140 * Removes the first occurrence of the specified element from this deque.
1141 * If the deque does not contain the element, it is unchanged.
1142 * More formally, removes the first element {@code e} such that
1143 * {@code o.equals(e)} (if such an element exists).
1144 * Returns {@code true} if this deque contained the specified element
1145 * (or equivalently, if this deque changed as a result of the call).
1146 *
1147 * <p>
1148 * This method is equivalent to
1149 * {@link #removeFirstOccurrence(Object) removeFirstOccurrence}.
1150 * </p>
1151 *
1152 * @param o element to be removed from this deque, if present
1153 * @return {@code true} if this deque changed as a result of the call
1154 */
1155 @Override
1156 public boolean remove(final Object o) {
1157 return removeFirstOccurrence(o);
1158 }
1159
1160 /**
1161 * {@inheritDoc}
1162 */
1163 @Override
1164 public E removeFirst() {
1165 final E x = pollFirst();
1166 if (x == null) {
1167 throw new NoSuchElementException();
1168 }
1169 return x;
1170 }
1171
1172 /*
1173 * TODO: Add support for more efficient bulk operations.
1174 *
1175 * We don't want to acquire the lock for every iteration, but we
1176 * also want other threads a chance to interact with the
1177 * collection, especially when count is close to capacity.
1178 */
1179
1180 // /**
1181 // * Adds all of the elements in the specified collection to this
1182 // * queue. Attempts to addAll of a queue to itself result in
1183 // * {@code IllegalArgumentException}. Further, the behavior of
1184 // * this operation is undefined if the specified collection is
1185 // * modified while the operation is in progress.
1186 // *
1187 // * @param c collection containing elements to be added to this queue
1188 // * @return {@code true} if this queue changed as a result of the call
1189 // * @throws ClassCastException
1190 // * @throws NullPointerException
1191 // * @throws IllegalArgumentException
1192 // * @throws IllegalStateException
1193 // * @see #add(Object)
1194 // */
1195 // public boolean addAll(Collection<? extends E> c) {
1196 // if (c == null)
1197 // throw new NullPointerException();
1198 // if (c == this)
1199 // throw new IllegalArgumentException();
1200 // final ReentrantLock lock = this.lock;
1201 // lock.lock();
1202 // try {
1203 // boolean modified = false;
1204 // for (E e : c)
1205 // if (linkLast(e))
1206 // modified = true;
1207 // return modified;
1208 // } finally {
1209 // lock.unlock();
1210 // }
1211 // }
1212
1213 @Override
1214 public boolean removeFirstOccurrence(final Object o) {
1215 if (o == null) {
1216 return false;
1217 }
1218 lock.lock();
1219 try {
1220 for (Node<E> p = first; p != null; p = p.next) {
1221 if (o.equals(p.item)) {
1222 unlink(p);
1223 return true;
1224 }
1225 }
1226 return false;
1227 } finally {
1228 lock.unlock();
1229 }
1230 }
1231
1232 /**
1233 * {@inheritDoc}
1234 */
1235 @Override
1236 public E removeLast() {
1237 final E x = pollLast();
1238 if (x == null) {
1239 throw new NoSuchElementException();
1240 }
1241 return x;
1242 }
1243
1244 @Override
1245 public boolean removeLastOccurrence(final Object o) {
1246 if (o == null) {
1247 return false;
1248 }
1249 lock.lock();
1250 try {
1251 for (Node<E> p = last; p != null; p = p.prev) {
1252 if (o.equals(p.item)) {
1253 unlink(p);
1254 return true;
1255 }
1256 }
1257 return false;
1258 } finally {
1259 lock.unlock();
1260 }
1261 }
1262
1263 /**
1264 * Returns the number of elements in this deque.
1265 *
1266 * @return the number of elements in this deque
1267 */
1268 @Override
1269 public int size() {
1270 lock.lock();
1271 try {
1272 return count;
1273 } finally {
1274 lock.unlock();
1275 }
1276 }
1277
1278 /**
1279 * Unlinks the first element in the queue, waiting until there is an element
1280 * to unlink if the queue is empty.
1281 *
1282 * <p>
1283 * This method is equivalent to {@link #takeFirst()}.
1284 * </p>
1285 *
1286 * @return the unlinked element
1287 * @throws InterruptedException if the current thread is interrupted
1288 */
1289 public E take() throws InterruptedException {
1290 return takeFirst();
1291 }
1292
1293 /**
1294 * Unlinks the first element in the queue, waiting until there is an element
1295 * to unlink if the queue is empty.
1296 *
1297 * @return the unlinked element
1298 * @throws InterruptedException if the current thread is interrupted
1299 */
1300 public E takeFirst() throws InterruptedException {
1301 lock.lock();
1302 try {
1303 E x;
1304 while ((x = unlinkFirst()) == null) {
1305 notEmpty.await();
1306 }
1307 return x;
1308 } finally {
1309 lock.unlock();
1310 }
1311 }
1312
1313 /**
1314 * Unlinks the last element in the queue, waiting until there is an element
1315 * to unlink if the queue is empty.
1316 *
1317 * @return the unlinked element
1318 * @throws InterruptedException if the current thread is interrupted
1319 */
1320 public E takeLast() throws InterruptedException {
1321 lock.lock();
1322 try {
1323 E x;
1324 while ((x = unlinkLast()) == null) {
1325 notEmpty.await();
1326 }
1327 return x;
1328 } finally {
1329 lock.unlock();
1330 }
1331 }
1332
1333 /**
1334 * Returns an array containing all of the elements in this deque, in
1335 * proper sequence (from first to last element).
1336 *
1337 * <p>
1338 * The returned array will be "safe" in that no references to it are
1339 * maintained by this deque. (In other words, this method must allocate
1340 * a new array). The caller is thus free to modify the returned array.
1341 * </p>
1342 * <p>
1343 * This method acts as bridge between array-based and collection-based
1344 * APIs.
1345 * </p>
1346 *
1347 * @return an array containing all of the elements in this deque
1348 */
1349 @Override
1350 public Object[] toArray() {
1351 lock.lock();
1352 try {
1353 final Object[] a = new Object[count];
1354 int k = 0;
1355 for (Node<E> p = first; p != null; p = p.next) {
1356 a[k++] = p.item;
1357 }
1358 return a;
1359 } finally {
1360 lock.unlock();
1361 }
1362 }
1363
1364 /**
1365 * {@inheritDoc}
1366 */
1367 @SuppressWarnings("unchecked")
1368 @Override
1369 public <T> T[] toArray(T[] a) {
1370 lock.lock();
1371 try {
1372 if (a.length < count) {
1373 a = (T[]) java.lang.reflect.Array.newInstance(a.getClass().getComponentType(), count);
1374 }
1375 int k = 0;
1376 for (Node<E> p = first; p != null; p = p.next) {
1377 a[k++] = (T) p.item;
1378 }
1379 if (a.length > k) {
1380 a[k] = null;
1381 }
1382 return a;
1383 } finally {
1384 lock.unlock();
1385 }
1386 }
1387
1388 @Override
1389 public String toString() {
1390 lock.lock();
1391 try {
1392 return super.toString();
1393 } finally {
1394 lock.unlock();
1395 }
1396 }
1397
1398 /**
1399 * Unlinks the provided node.
1400 *
1401 * @param x The node to unlink
1402 */
1403 private void unlink(final Node<E> x) {
1404 // assert lock.isHeldByCurrentThread();
1405 final Node<E> p = x.prev;
1406 final Node<E> n = x.next;
1407 if (p == null) {
1408 unlinkFirst();
1409 } else if (n == null) {
1410 unlinkLast();
1411 } else {
1412 p.next = n;
1413 n.prev = p;
1414 x.item = null;
1415 // Don't mess with x's links. They may still be in use by an iterator.
1416 --count;
1417 notFull.signal();
1418 }
1419 }
1420
1421 // Monitoring methods
1422
1423 /**
1424 * Removes and returns the first element, or null if empty.
1425 *
1426 * @return The first element or {@code null} if empty
1427 */
1428 private E unlinkFirst() {
1429 // assert lock.isHeldByCurrentThread();
1430 final Node<E> f = first;
1431 if (f == null) {
1432 return null;
1433 }
1434 final Node<E> n = f.next;
1435 final E item = f.item;
1436 f.item = null;
1437 f.next = f; // help GC
1438 first = n;
1439 if (n == null) {
1440 last = null;
1441 } else {
1442 n.prev = null;
1443 }
1444 --count;
1445 notFull.signal();
1446 return item;
1447 }
1448
1449 /**
1450 * Removes and returns the last element, or null if empty.
1451 *
1452 * @return The first element or {@code null} if empty
1453 */
1454 private E unlinkLast() {
1455 // assert lock.isHeldByCurrentThread();
1456 final Node<E> l = last;
1457 if (l == null) {
1458 return null;
1459 }
1460 final Node<E> p = l.prev;
1461 final E item = l.item;
1462 l.item = null;
1463 l.prev = l; // help GC
1464 last = p;
1465 if (p == null) {
1466 first = null;
1467 } else {
1468 p.next = null;
1469 }
1470 --count;
1471 notFull.signal();
1472 return item;
1473 }
1474
1475 /**
1476 * Saves the state of this deque to a stream (that is, serialize it).
1477 *
1478 * @serialData The capacity (int), followed by elements (each an
1479 * {@code Object}) in the proper order, followed by a null
1480 * @param s the stream
1481 * @throws IOException if I/O errors occur while writing to the underlying {@code OutputStream}
1482 */
1483 private void writeObject(final java.io.ObjectOutputStream s) throws IOException {
1484 lock.lock();
1485 try {
1486 // Write out capacity and any hidden stuff
1487 s.defaultWriteObject();
1488 // Write out all elements in the proper order.
1489 for (Node<E> p = first; p != null; p = p.next) {
1490 s.writeObject(p.item);
1491 }
1492 // Use trailing null as sentinel
1493 s.writeObject(null);
1494 } finally {
1495 lock.unlock();
1496 }
1497 }
1498 }