1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17 package org.apache.commons.pool2.impl;
18
19 import java.io.IOException;
20 import java.io.ObjectInputStream;
21 import java.io.Serializable;
22 import java.time.Duration;
23 import java.util.AbstractQueue;
24 import java.util.Collection;
25 import java.util.Iterator;
26 import java.util.NoSuchElementException;
27 import java.util.Objects;
28 import java.util.concurrent.BlockingDeque;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.locks.Condition;
31
32 /**
33 * An optionally-bounded {@linkplain java.util.concurrent.BlockingDeque blocking
34 * deque} based on linked nodes.
35 *
36 * <p> The optional capacity bound constructor argument serves as a
37 * way to prevent excessive expansion. The capacity, if unspecified,
38 * is equal to {@link Integer#MAX_VALUE}. Linked nodes are
39 * dynamically created upon each insertion unless this would bring the
40 * deque above capacity.
41 * </p>
42 *
43 * <p>Most operations run in constant time (ignoring time spent
44 * blocking). Exceptions include {@link #remove(Object) remove},
45 * {@link #removeFirstOccurrence removeFirstOccurrence}, {@link
46 * #removeLastOccurrence removeLastOccurrence}, {@link #contains
47 * contains}, {@link #iterator iterator.remove()}, and the bulk
48 * operations, all of which run in linear time.
49 * </p>
50 *
51 * <p>This class and its iterator implement all of the
52 * <em>optional</em> methods of the {@link Collection} and {@link
53 * Iterator} interfaces.
54 * </p>
55 *
56 * <p>This class is a member of the
57 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
58 * Java Collections Framework</a>.
59 * </p>
60 *
61 * @param <E> the type of elements held in this collection
62 *
63 * Note: This was copied from Apache Harmony and modified to suit the needs of
64 * Commons Pool.
65 *
66 * @since 2.0
67 */
68 final class LinkedBlockingDeque<E> extends AbstractQueue<E>
69 implements BlockingDeque<E>, Serializable {
70
71 /*
72 * Implemented as a simple doubly-linked list protected by a
73 * single lock and using conditions to manage blocking.
74 *
75 * To implement weakly consistent iterators, it appears we need to
76 * keep all Nodes GC-reachable from a predecessor dequeued Node.
77 * That would cause two problems:
78 * - allow a rogue Iterator to cause unbounded memory retention
79 * - cause cross-generational linking of old Nodes to new Nodes if
80 * a Node was tenured while live, which generational GCs have a
81 * hard time dealing with, causing repeated major collections.
82 * However, only non-deleted Nodes need to be reachable from
83 * dequeued Nodes, and reachability does not necessarily have to
84 * be of the kind understood by the GC. We use the trick of
85 * linking a Node that has just been dequeued to itself. Such a
86 * self-link implicitly means to jump to "first" (for next links)
87 * or "last" (for prev links).
88 */
89
90 /*
91 * We have "diamond" multiple interface/abstract class inheritance
92 * here, and that introduces ambiguities. Often we want the
93 * BlockingDeque javadoc combined with the AbstractQueue
94 * implementation, so a lot of method specs are duplicated here.
95 */
96
97 /**
98 * Base class for Iterators for LinkedBlockingDeque
99 */
100 private abstract class AbstractItr implements Iterator<E> {
101 /**
102 * The next node to return in next()
103 */
104 Node<E> next;
105
106 /**
107 * nextItem holds on to item fields because once we claim that
108 * an element exists in hasNext(), we must return item read
109 * under lock (in advance()) even if it was in the process of
110 * being removed when hasNext() was called.
111 */
112 E nextItem;
113
114 /**
115 * Node returned by most recent call to next. Needed by remove.
116 * Reset to null if this element is deleted by a call to remove.
117 */
118 private Node<E> lastRet;
119
120 /**
121 * Constructs a new iterator. Sets the initial position.
122 */
123 AbstractItr() {
124 // set to initial position
125 lock.lock();
126 try {
127 next = firstNode();
128 nextItem = next == null ? null : next.item;
129 } finally {
130 lock.unlock();
131 }
132 }
133
134 /**
135 * Advances next.
136 */
137 void advance() {
138 lock.lock();
139 try {
140 // assert next != null;
141 next = succ(next);
142 nextItem = next == null ? null : next.item;
143 } finally {
144 lock.unlock();
145 }
146 }
147
148 /**
149 * Obtain the first node to be returned by the iterator.
150 *
151 * @return first node
152 */
153 abstract Node<E> firstNode();
154
155 @Override
156 public boolean hasNext() {
157 return next != null;
158 }
159
160 @Override
161 public E next() {
162 if (next == null) {
163 throw new NoSuchElementException();
164 }
165 lastRet = next;
166 final E x = nextItem;
167 advance();
168 return x;
169 }
170
171 /**
172 * For a given node, obtain the next node to be returned by the
173 * iterator.
174 *
175 * @param n given node
176 * @return next node
177 */
178 abstract Node<E> nextNode(Node<E> n);
179
180 @Override
181 public void remove() {
182 final Node<E> n = lastRet;
183 if (n == null) {
184 throw new IllegalStateException();
185 }
186 lastRet = null;
187 lock.lock();
188 try {
189 if (n.item != null) {
190 unlink(n);
191 }
192 } finally {
193 lock.unlock();
194 }
195 }
196
197 /**
198 * Returns the successor node of the given non-null, but
199 * possibly previously deleted, node.
200 *
201 * @param n node whose successor is sought
202 * @return successor node
203 */
204 private Node<E> succ(Node<E> n) {
205 // Chains of deleted nodes ending in null or self-links
206 // are possible if multiple interior nodes are removed.
207 for (;;) {
208 final Node<E> s = nextNode(n);
209 if (s == null) {
210 return null;
211 }
212 if (s.item != null) {
213 return s;
214 }
215 if (s == n) {
216 return firstNode();
217 }
218 n = s;
219 }
220 }
221 }
222
223 /** Descending iterator */
224 private final class DescendingItr extends AbstractItr {
225 @Override
226 Node<E> firstNode() {
227 return last;
228 }
229
230 @Override
231 Node<E> nextNode(final Node<E> n) {
232 return n.prev;
233 }
234 }
235
236 /** Forward iterator */
237 private final class Itr extends AbstractItr {
238 @Override
239 Node<E> firstNode() {
240 return first;
241 }
242
243 @Override
244 Node<E> nextNode(final Node<E> n) {
245 return n.next;
246 }
247 }
248
249 /**
250 * Doubly-linked list node class.
251 *
252 * @param <E> node item type
253 */
254 private static final class Node<E> {
255 /**
256 * The item, or null if this node has been removed.
257 */
258 E item;
259
260 /**
261 * One of:
262 * - the real predecessor Node
263 * - this Node, meaning the predecessor is tail
264 * - null, meaning there is no predecessor
265 */
266 Node<E> prev;
267
268 /**
269 * One of:
270 * - the real successor Node
271 * - this Node, meaning the successor is head
272 * - null, meaning there is no successor
273 */
274 Node<E> next;
275
276 /**
277 * Constructs a new list node.
278 *
279 * @param x The list item
280 * @param p Previous item
281 * @param n Next item
282 */
283 Node(final E x, final Node<E> p, final Node<E> n) {
284 item = x;
285 prev = p;
286 next = n;
287 }
288 }
289
290 private static final long serialVersionUID = -387911632671998426L;
291
292 /**
293 * Pointer to first node.
294 * Invariant: (first == null && last == null) ||
295 * (first.prev == null && first.item != null)
296 */
297 private transient Node<E> first; // @GuardedBy("lock")
298
299 /**
300 * Pointer to last node.
301 * Invariant: (first == null && last == null) ||
302 * (last.next == null && last.item != null)
303 */
304 private transient Node<E> last; // @GuardedBy("lock")
305
306 /** Number of items in the deque */
307 private transient int count; // @GuardedBy("lock")
308
309 /** Maximum number of items in the deque */
310 private final int capacity;
311
312 /** Main lock guarding all access */
313 private final InterruptibleReentrantLock lock;
314
315 /** Condition for waiting takes */
316 private final Condition notEmpty;
317
318 /** Condition for waiting puts */
319 private final Condition notFull;
320
321 /**
322 * Creates a {@code LinkedBlockingDeque} with a capacity of
323 * {@link Integer#MAX_VALUE}.
324 */
325 LinkedBlockingDeque() {
326 this(Integer.MAX_VALUE);
327 }
328
329 /**
330 * Creates a {@code LinkedBlockingDeque} with a capacity of
331 * {@link Integer#MAX_VALUE} and the given fairness policy.
332 * @param fairness true means threads waiting on the deque should be served
333 * as if waiting in a FIFO request queue
334 */
335 LinkedBlockingDeque(final boolean fairness) {
336 this(Integer.MAX_VALUE, fairness);
337 }
338
339 // Basic linking and unlinking operations, called only while holding lock
340
341 /**
342 * Creates a {@code LinkedBlockingDeque} with a capacity of
343 * {@link Integer#MAX_VALUE}, initially containing the elements of
344 * the given collection, added in traversal order of the
345 * collection's iterator.
346 *
347 * @param c the collection of elements to initially contain
348 * @throws NullPointerException if the specified collection or any
349 * of its elements are null
350 */
351 LinkedBlockingDeque(final Collection<? extends E> c) {
352 this(Integer.MAX_VALUE);
353 lock.lock(); // Never contended, but necessary for visibility
354 try {
355 for (final E e : c) {
356 Objects.requireNonNull(e);
357 if (!linkLast(e)) {
358 throw new IllegalStateException("Deque full");
359 }
360 }
361 } finally {
362 lock.unlock();
363 }
364 }
365
366 /**
367 * Creates a {@code LinkedBlockingDeque} with the given (fixed) capacity.
368 *
369 * @param capacity the capacity of this deque
370 * @throws IllegalArgumentException if {@code capacity} is less than 1
371 */
372 LinkedBlockingDeque(final int capacity) {
373 this(capacity, false);
374 }
375
376 /**
377 * Creates a {@code LinkedBlockingDeque} with the given (fixed) capacity
378 * and fairness policy.
379 *
380 * @param capacity the capacity of this deque
381 * @param fairness true means threads waiting on the deque should be served
382 * as if waiting in a FIFO request queue
383 * @throws IllegalArgumentException if {@code capacity} is less than 1
384 */
385 LinkedBlockingDeque(final int capacity, final boolean fairness) {
386 if (capacity <= 0) {
387 throw new IllegalArgumentException();
388 }
389 this.capacity = capacity;
390 lock = new InterruptibleReentrantLock(fairness);
391 notEmpty = lock.newCondition();
392 notFull = lock.newCondition();
393 }
394
395 /**
396 * {@inheritDoc}
397 */
398 @Override
399 public boolean add(final E e) {
400 addLast(e);
401 return true;
402 }
403
404 /**
405 * {@inheritDoc}
406 */
407 @Override
408 public void addFirst(final E e) {
409 if (!offerFirst(e)) {
410 throw new IllegalStateException("Deque full");
411 }
412 }
413
414 // BlockingDeque methods
415
416 /**
417 * {@inheritDoc}
418 */
419 @Override
420 public void addLast(final E e) {
421 if (!offerLast(e)) {
422 throw new IllegalStateException("Deque full");
423 }
424 }
425
426 /**
427 * Atomically removes all of the elements from this deque.
428 * The deque will be empty after this call returns.
429 */
430 @Override
431 public void clear() {
432 lock.lock();
433 try {
434 for (Node<E> f = first; f != null;) {
435 f.item = null;
436 final Node<E> n = f.next;
437 f.prev = null;
438 f.next = null;
439 f = n;
440 }
441 first = last = null;
442 count = 0;
443 notFull.signalAll();
444 } finally {
445 lock.unlock();
446 }
447 }
448
449 /**
450 * Returns {@code true} if this deque contains the specified element.
451 * More formally, returns {@code true} if and only if this deque contains
452 * at least one element {@code e} such that {@code o.equals(e)}.
453 *
454 * @param o object to be checked for containment in this deque
455 * @return {@code true} if this deque contains the specified element
456 */
457 @Override
458 public boolean contains(final Object o) {
459 if (o == null) {
460 return false;
461 }
462 lock.lock();
463 try {
464 for (Node<E> p = first; p != null; p = p.next) {
465 if (o.equals(p.item)) {
466 return true;
467 }
468 }
469 return false;
470 } finally {
471 lock.unlock();
472 }
473 }
474
475 /**
476 * {@inheritDoc}
477 */
478 @Override
479 public Iterator<E> descendingIterator() {
480 return new DescendingItr();
481 }
482
483 /**
484 * Drains the queue to the specified collection.
485 *
486 * @param c The collection to add the elements to
487 * @return number of elements added to the collection
488 * @throws UnsupportedOperationException if the add operation is not
489 * supported by the specified collection
490 * @throws ClassCastException if the class of the elements held by this
491 * collection prevents them from being added to the specified
492 * collection
493 * @throws NullPointerException if c is null
494 * @throws IllegalArgumentException if c is this instance
495 */
496 @Override
497 public int drainTo(final Collection<? super E> c) {
498 return drainTo(c, Integer.MAX_VALUE);
499 }
500
501 /**
502 * Drains no more than the specified number of elements from the queue to the
503 * specified collection.
504 *
505 * @param collection collection to add the elements to
506 * @param maxElements maximum number of elements to remove from the queue
507 * @return number of elements added to the collection
508 * @throws UnsupportedOperationException if the add operation is not
509 * supported by the specified collection
510 * @throws ClassCastException if the class of the elements held by this
511 * collection prevents them from being added to the specified
512 * collection
513 * @throws NullPointerException if c is null
514 * @throws IllegalArgumentException if c is this instance
515 */
516 @Override
517 public int drainTo(final Collection<? super E> collection, final int maxElements) {
518 Objects.requireNonNull(collection, "collection");
519 if (collection == this) {
520 throw new IllegalArgumentException();
521 }
522 lock.lock();
523 try {
524 final int n = Math.min(maxElements, count);
525 for (int i = 0; i < n; i++) {
526 collection.add(first.item); // In this order, in case add() throws.
527 unlinkFirst();
528 }
529 return n;
530 } finally {
531 lock.unlock();
532 }
533 }
534
535 /**
536 * Retrieves, but does not remove, the head of the queue represented by
537 * this deque. This method differs from {@link #peek peek} only in that
538 * it throws an exception if this deque is empty.
539 *
540 * <p>This method is equivalent to {@link #getFirst() getFirst}.
541 *
542 * @return the head of the queue represented by this deque
543 * @throws NoSuchElementException if this deque is empty
544 */
545 @Override
546 public E element() {
547 return getFirst();
548 }
549
550 /**
551 * {@inheritDoc}
552 */
553 @Override
554 public E getFirst() {
555 final E x = peekFirst();
556 if (x == null) {
557 throw new NoSuchElementException();
558 }
559 return x;
560 }
561
562 /**
563 * {@inheritDoc}
564 */
565 @Override
566 public E getLast() {
567 final E x = peekLast();
568 if (x == null) {
569 throw new NoSuchElementException();
570 }
571 return x;
572 }
573
574 /**
575 * Gets the length of the queue of threads waiting to take instances from this deque. See disclaimer on accuracy
576 * in {@link java.util.concurrent.locks.ReentrantLock#getWaitQueueLength(Condition)}.
577 *
578 * @return number of threads waiting on this deque's notEmpty condition.
579 */
580 int getTakeQueueLength() {
581 lock.lock();
582 try {
583 return lock.getWaitQueueLength(notEmpty);
584 } finally {
585 lock.unlock();
586 }
587 }
588
589 /**
590 * Returns true if there are threads waiting to take instances from this deque. See disclaimer on accuracy in
591 * {@link java.util.concurrent.locks.ReentrantLock#hasWaiters(Condition)}.
592 *
593 * @return true if there is at least one thread waiting on this deque's notEmpty condition.
594 */
595 boolean hasTakeWaiters() {
596 lock.lock();
597 try {
598 return lock.hasWaiters(notEmpty);
599 } finally {
600 lock.unlock();
601 }
602 }
603
604 /**
605 * Interrupts the threads currently waiting to take an object from the pool. See disclaimer on accuracy in
606 * {@link java.util.concurrent.locks.ReentrantLock#getWaitingThreads(Condition)}.
607 */
608 void interruptTakeWaiters() {
609 lock.lock();
610 try {
611 lock.interruptWaiters(notEmpty);
612 } finally {
613 lock.unlock();
614 }
615 }
616
617 /**
618 * Returns an iterator over the elements in this deque in proper sequence.
619 * The elements will be returned in order from first (head) to last (tail).
620 * The returned {@code Iterator} is a "weakly consistent" iterator that
621 * will never throw {@link java.util.ConcurrentModificationException
622 * ConcurrentModificationException},
623 * and guarantees to traverse elements as they existed upon
624 * construction of the iterator, and may (but is not guaranteed to)
625 * reflect any modifications subsequent to construction.
626 *
627 * @return an iterator over the elements in this deque in proper sequence
628 */
629 @Override
630 public Iterator<E> iterator() {
631 return new Itr();
632 }
633
634 /**
635 * Links provided element as first element, or returns false if full.
636 *
637 * @param e The element to link as the first element.
638 * @return {@code true} if successful, otherwise {@code false}
639 */
640 private boolean linkFirst(final E e) {
641 // assert lock.isHeldByCurrentThread();
642 if (count >= capacity) {
643 return false;
644 }
645 final Node<E> f = first;
646 final Node<E> x = new Node<>(e, null, f);
647 first = x;
648 if (last == null) {
649 last = x;
650 } else {
651 f.prev = x;
652 }
653 ++count;
654 notEmpty.signal();
655 return true;
656 }
657
658 /**
659 * Links provided element as last element, or returns false if full.
660 *
661 * @param e The element to link as the last element.
662 * @return {@code true} if successful, otherwise {@code false}
663 */
664 private boolean linkLast(final E e) {
665 // assert lock.isHeldByCurrentThread();
666 if (count >= capacity) {
667 return false;
668 }
669 final Node<E> l = last;
670 final Node<E> x = new Node<>(e, l, null);
671 last = x;
672 if (first == null) {
673 first = x;
674 } else {
675 l.next = x;
676 }
677 ++count;
678 notEmpty.signal();
679 return true;
680 }
681
682 /**
683 * {@inheritDoc}
684 */
685 @Override
686 public boolean offer(final E e) {
687 return offerLast(e);
688 }
689
690 /**
691 * Links the provided element as the last in the queue, waiting up to the
692 * specified time to do so if the queue is full.
693 * <p>
694 * This method is equivalent to {@link #offerLast(Object, long, TimeUnit)}
695 *
696 * @param e element to link
697 * @param timeout length of time to wait
698 * @return {@code true} if successful, otherwise {@code false}
699 * @throws NullPointerException if e is null
700 * @throws InterruptedException if the thread is interrupted whilst waiting
701 * for space
702 */
703 boolean offer(final E e, final Duration timeout) throws InterruptedException {
704 return offerLast(e, timeout);
705 }
706
707 /**
708 * Links the provided element as the last in the queue, waiting up to the
709 * specified time to do so if the queue is full.
710 * <p>
711 * This method is equivalent to {@link #offerLast(Object, long, TimeUnit)}
712 *
713 * @param e element to link
714 * @param timeout length of time to wait
715 * @param unit units that timeout is expressed in
716 * @return {@code true} if successful, otherwise {@code false}
717 * @throws NullPointerException if e is null
718 * @throws InterruptedException if the thread is interrupted whilst waiting
719 * for space
720 */
721 @Override
722 public boolean offer(final E e, final long timeout, final TimeUnit unit) throws InterruptedException {
723 return offerLast(e, timeout, unit);
724 }
725
726 /**
727 * {@inheritDoc}
728 */
729 @Override
730 public boolean offerFirst(final E e) {
731 Objects.requireNonNull(e, "e");
732 lock.lock();
733 try {
734 return linkFirst(e);
735 } finally {
736 lock.unlock();
737 }
738 }
739
740 /**
741 * Links the provided element as the first in the queue, waiting up to the
742 * specified time to do so if the queue is full.
743 *
744 * @param e element to link
745 * @param timeout length of time to wait
746 * @return {@code true} if successful, otherwise {@code false}
747 * @throws NullPointerException if e is null
748 * @throws InterruptedException if the thread is interrupted whilst waiting
749 * for space
750 */
751 boolean offerFirst(final E e, final Duration timeout) throws InterruptedException {
752 Objects.requireNonNull(e, "e");
753 long nanos = timeout.toNanos();
754 lock.lockInterruptibly();
755 try {
756 while (!linkFirst(e)) {
757 if (nanos <= 0) {
758 return false;
759 }
760 nanos = notFull.awaitNanos(nanos);
761 }
762 return true;
763 } finally {
764 lock.unlock();
765 }
766 }
767
768 /**
769 * Links the provided element as the first in the queue, waiting up to the
770 * specified time to do so if the queue is full.
771 *
772 * @param e element to link
773 * @param timeout length of time to wait
774 * @param unit units that timeout is expressed in
775 * @return {@code true} if successful, otherwise {@code false}
776 * @throws NullPointerException if e is null
777 * @throws InterruptedException if the thread is interrupted whilst waiting
778 * for space
779 */
780 @Override
781 public boolean offerFirst(final E e, final long timeout, final TimeUnit unit) throws InterruptedException {
782 return offerFirst(e, PoolImplUtils.toDuration(timeout, unit));
783 }
784
785 /**
786 * {@inheritDoc}
787 */
788 @Override
789 public boolean offerLast(final E e) {
790 Objects.requireNonNull(e, "e");
791 lock.lock();
792 try {
793 return linkLast(e);
794 } finally {
795 lock.unlock();
796 }
797 }
798
799 /**
800 * Links the provided element as the last in the queue, waiting up to the
801 * specified time to do so if the queue is full.
802 *
803 * @param e element to link
804 * @param timeout length of time to wait
805 * @return {@code true} if successful, otherwise {@code false}
806 * @throws NullPointerException if e is null
807 * @throws InterruptedException if the thread is interrupted whist waiting
808 * for space
809 */
810 boolean offerLast(final E e, final Duration timeout) throws InterruptedException {
811 Objects.requireNonNull(e, "e");
812 long nanos = timeout.toNanos();
813 lock.lockInterruptibly();
814 try {
815 while (!linkLast(e)) {
816 if (nanos <= 0) {
817 return false;
818 }
819 nanos = notFull.awaitNanos(nanos);
820 }
821 return true;
822 } finally {
823 lock.unlock();
824 }
825 }
826
827 /**
828 * Links the provided element as the last in the queue, waiting up to the
829 * specified time to do so if the queue is full.
830 *
831 * @param e element to link
832 * @param timeout length of time to wait
833 * @param unit units that timeout is expressed in
834 * @return {@code true} if successful, otherwise {@code false}
835 * @throws NullPointerException if e is null
836 * @throws InterruptedException if the thread is interrupted whist waiting
837 * for space
838 */
839 @Override
840 public boolean offerLast(final E e, final long timeout, final TimeUnit unit) throws InterruptedException {
841 return offerLast(e, PoolImplUtils.toDuration(timeout, unit));
842 }
843
844 @Override
845 public E peek() {
846 return peekFirst();
847 }
848
849 // BlockingQueue methods
850
851 @Override
852 public E peekFirst() {
853 lock.lock();
854 try {
855 return first == null ? null : first.item;
856 } finally {
857 lock.unlock();
858 }
859 }
860
861 @Override
862 public E peekLast() {
863 lock.lock();
864 try {
865 return last == null ? null : last.item;
866 } finally {
867 lock.unlock();
868 }
869 }
870
871 @Override
872 public E poll() {
873 return pollFirst();
874 }
875
876 /**
877 * Unlinks the first element in the queue, waiting up to the specified time
878 * to do so if the queue is empty.
879 *
880 * <p>This method is equivalent to {@link #pollFirst(long, TimeUnit)}.
881 *
882 * @param timeout length of time to wait
883 * @return the unlinked element
884 * @throws InterruptedException if the current thread is interrupted
885 */
886 E poll(final Duration timeout) throws InterruptedException {
887 return pollFirst(timeout);
888 }
889
890 /**
891 * Unlinks the first element in the queue, waiting up to the specified time
892 * to do so if the queue is empty.
893 *
894 * <p>This method is equivalent to {@link #pollFirst(long, TimeUnit)}.
895 *
896 * @param timeout length of time to wait
897 * @param unit units that timeout is expressed in
898 * @return the unlinked element
899 * @throws InterruptedException if the current thread is interrupted
900 */
901 @Override
902 public E poll(final long timeout, final TimeUnit unit) throws InterruptedException {
903 return pollFirst(timeout, unit);
904 }
905
906 @Override
907 public E pollFirst() {
908 lock.lock();
909 try {
910 return unlinkFirst();
911 } finally {
912 lock.unlock();
913 }
914 }
915
916 /**
917 * Unlinks the first element in the queue, waiting up to the specified time
918 * to do so if the queue is empty.
919 *
920 * @param timeout length of time to wait
921 * @return the unlinked element
922 * @throws InterruptedException if the current thread is interrupted
923 */
924 E pollFirst(final Duration timeout) throws InterruptedException {
925 long nanos = timeout.toNanos();
926 lock.lockInterruptibly();
927 try {
928 E x;
929 while ((x = unlinkFirst()) == null) {
930 if (nanos <= 0) {
931 return null;
932 }
933 nanos = notEmpty.awaitNanos(nanos);
934 }
935 return x;
936 } finally {
937 lock.unlock();
938 }
939 }
940
941 /**
942 * Unlinks the first element in the queue, waiting up to the specified time
943 * to do so if the queue is empty.
944 *
945 * @param timeout length of time to wait
946 * @param unit units that timeout is expressed in
947 * @return the unlinked element
948 * @throws InterruptedException if the current thread is interrupted
949 */
950 @Override
951 public E pollFirst(final long timeout, final TimeUnit unit) throws InterruptedException {
952 return pollFirst(PoolImplUtils.toDuration(timeout, unit));
953 }
954
955 @Override
956 public E pollLast() {
957 lock.lock();
958 try {
959 return unlinkLast();
960 } finally {
961 lock.unlock();
962 }
963 }
964
965 /**
966 * Unlinks the last element in the queue, waiting up to the specified time
967 * to do so if the queue is empty.
968 *
969 * @param timeout length of time to wait
970 * @return the unlinked element
971 * @throws InterruptedException if the current thread is interrupted
972 */
973 E pollLast(final Duration timeout)
974 throws InterruptedException {
975 long nanos = timeout.toNanos();
976 lock.lockInterruptibly();
977 try {
978 E x;
979 while ((x = unlinkLast()) == null) {
980 if (nanos <= 0) {
981 return null;
982 }
983 nanos = notEmpty.awaitNanos(nanos);
984 }
985 return x;
986 } finally {
987 lock.unlock();
988 }
989 }
990
991 /**
992 * Unlinks the last element in the queue, waiting up to the specified time
993 * to do so if the queue is empty.
994 *
995 * @param timeout length of time to wait
996 * @param unit units that timeout is expressed in
997 * @return the unlinked element
998 * @throws InterruptedException if the current thread is interrupted
999 */
1000 @Override
1001 public E pollLast(final long timeout, final TimeUnit unit)
1002 throws InterruptedException {
1003 return pollLast(PoolImplUtils.toDuration(timeout, unit));
1004 }
1005
1006 /**
1007 * {@inheritDoc}
1008 */
1009 @Override
1010 public E pop() {
1011 return removeFirst();
1012 }
1013
1014 /**
1015 * {@inheritDoc}
1016 */
1017 @Override
1018 public void push(final E e) {
1019 addFirst(e);
1020 }
1021
1022 /**
1023 * Links the provided element as the last in the queue, waiting until there
1024 * is space to do so if the queue is full.
1025 *
1026 * <p>
1027 * This method is equivalent to {@link #putLast(Object)}.
1028 * </p>
1029 *
1030 * @param e element to link
1031 * @throws NullPointerException if e is null
1032 * @throws InterruptedException if the thread is interrupted whilst waiting
1033 * for space
1034 */
1035 @Override
1036 public void put(final E e) throws InterruptedException {
1037 putLast(e);
1038 }
1039
1040 /**
1041 * Links the provided element as the first in the queue, waiting until there
1042 * is space to do so if the queue is full.
1043 *
1044 * @param e element to link
1045 * @throws NullPointerException if e is null
1046 * @throws InterruptedException if the thread is interrupted whilst waiting
1047 * for space
1048 */
1049 @Override
1050 public void putFirst(final E e) throws InterruptedException {
1051 Objects.requireNonNull(e, "e");
1052 lock.lock();
1053 try {
1054 while (!linkFirst(e)) {
1055 notFull.await();
1056 }
1057 } finally {
1058 lock.unlock();
1059 }
1060 }
1061
1062 /**
1063 * Links the provided element as the last in the queue, waiting until there
1064 * is space to do so if the queue is full.
1065 *
1066 * @param e element to link
1067 * @throws NullPointerException if e is null
1068 * @throws InterruptedException if the thread is interrupted whilst waiting
1069 * for space
1070 */
1071 @Override
1072 public void putLast(final E e) throws InterruptedException {
1073 Objects.requireNonNull(e, "e");
1074 lock.lock();
1075 try {
1076 while (!linkLast(e)) {
1077 notFull.await();
1078 }
1079 } finally {
1080 lock.unlock();
1081 }
1082 }
1083
1084 // Stack methods
1085
1086 /**
1087 * Reconstitutes this deque from a stream (that is, deserialize it).
1088 *
1089 * @param s the stream
1090 */
1091 private void readObject(final ObjectInputStream s) throws IOException, ClassNotFoundException {
1092 s.defaultReadObject();
1093 count = 0;
1094 first = null;
1095 last = null;
1096 // Read in all elements and place in queue
1097 for (;;) {
1098 @SuppressWarnings("unchecked")
1099 final E item = (E) s.readObject();
1100 if (item == null) {
1101 break;
1102 }
1103 add(item);
1104 }
1105 }
1106
1107 /**
1108 * Returns the number of additional elements that this deque can ideally
1109 * (in the absence of memory or resource constraints) accept without
1110 * blocking. This is always equal to the initial capacity of this deque
1111 * less the current {@code size} of this deque.
1112 *
1113 * <p>
1114 * Note that you <em>cannot</em> always tell if an attempt to insert
1115 * an element will succeed by inspecting {@code remainingCapacity}
1116 * because it may be the case that another thread is about to
1117 * insert or remove an element.
1118 * </p>
1119 *
1120 * @return The number of additional elements the queue is able to accept
1121 */
1122 @Override
1123 public int remainingCapacity() {
1124 lock.lock();
1125 try {
1126 return capacity - count;
1127 } finally {
1128 lock.unlock();
1129 }
1130 }
1131
1132 // Collection methods
1133
1134 /**
1135 * Retrieves and removes the head of the queue represented by this deque.
1136 * This method differs from {@link #poll poll} only in that it throws an
1137 * exception if this deque is empty.
1138 *
1139 * <p>
1140 * This method is equivalent to {@link #removeFirst() removeFirst}.
1141 * </p>
1142 *
1143 * @return the head of the queue represented by this deque
1144 * @throws NoSuchElementException if this deque is empty
1145 */
1146 @Override
1147 public E remove() {
1148 return removeFirst();
1149 }
1150
1151 /**
1152 * Removes the first occurrence of the specified element from this deque.
1153 * If the deque does not contain the element, it is unchanged.
1154 * More formally, removes the first element {@code e} such that
1155 * {@code o.equals(e)} (if such an element exists).
1156 * Returns {@code true} if this deque contained the specified element
1157 * (or equivalently, if this deque changed as a result of the call).
1158 *
1159 * <p>
1160 * This method is equivalent to
1161 * {@link #removeFirstOccurrence(Object) removeFirstOccurrence}.
1162 * </p>
1163 *
1164 * @param o element to be removed from this deque, if present
1165 * @return {@code true} if this deque changed as a result of the call
1166 */
1167 @Override
1168 public boolean remove(final Object o) {
1169 return removeFirstOccurrence(o);
1170 }
1171
1172 /**
1173 * {@inheritDoc}
1174 */
1175 @Override
1176 public E removeFirst() {
1177 final E x = pollFirst();
1178 if (x == null) {
1179 throw new NoSuchElementException();
1180 }
1181 return x;
1182 }
1183
1184 /*
1185 * TODO: Add support for more efficient bulk operations.
1186 *
1187 * We don't want to acquire the lock for every iteration, but we
1188 * also want other threads a chance to interact with the
1189 * collection, especially when count is close to capacity.
1190 */
1191
1192 // /**
1193 // * Adds all of the elements in the specified collection to this
1194 // * queue. Attempts to addAll of a queue to itself result in
1195 // * {@code IllegalArgumentException}. Further, the behavior of
1196 // * this operation is undefined if the specified collection is
1197 // * modified while the operation is in progress.
1198 // *
1199 // * @param c collection containing elements to be added to this queue
1200 // * @return {@code true} if this queue changed as a result of the call
1201 // * @throws ClassCastException
1202 // * @throws NullPointerException
1203 // * @throws IllegalArgumentException
1204 // * @throws IllegalStateException
1205 // * @see #add(Object)
1206 // */
1207 // public boolean addAll(Collection<? extends E> c) {
1208 // if (c == null)
1209 // throw new NullPointerException();
1210 // if (c == this)
1211 // throw new IllegalArgumentException();
1212 // final ReentrantLock lock = this.lock;
1213 // lock.lock();
1214 // try {
1215 // boolean modified = false;
1216 // for (E e : c)
1217 // if (linkLast(e))
1218 // modified = true;
1219 // return modified;
1220 // } finally {
1221 // lock.unlock();
1222 // }
1223 // }
1224
1225 @Override
1226 public boolean removeFirstOccurrence(final Object o) {
1227 if (o == null) {
1228 return false;
1229 }
1230 lock.lock();
1231 try {
1232 for (Node<E> p = first; p != null; p = p.next) {
1233 if (o.equals(p.item)) {
1234 unlink(p);
1235 return true;
1236 }
1237 }
1238 return false;
1239 } finally {
1240 lock.unlock();
1241 }
1242 }
1243
1244 /**
1245 * {@inheritDoc}
1246 */
1247 @Override
1248 public E removeLast() {
1249 final E x = pollLast();
1250 if (x == null) {
1251 throw new NoSuchElementException();
1252 }
1253 return x;
1254 }
1255
1256 @Override
1257 public boolean removeLastOccurrence(final Object o) {
1258 if (o == null) {
1259 return false;
1260 }
1261 lock.lock();
1262 try {
1263 for (Node<E> p = last; p != null; p = p.prev) {
1264 if (o.equals(p.item)) {
1265 unlink(p);
1266 return true;
1267 }
1268 }
1269 return false;
1270 } finally {
1271 lock.unlock();
1272 }
1273 }
1274
1275 /**
1276 * Returns the number of elements in this deque.
1277 *
1278 * @return the number of elements in this deque
1279 */
1280 @Override
1281 public int size() {
1282 lock.lock();
1283 try {
1284 return count;
1285 } finally {
1286 lock.unlock();
1287 }
1288 }
1289
1290 /**
1291 * Unlinks the first element in the queue, waiting until there is an element
1292 * to unlink if the queue is empty.
1293 *
1294 * <p>
1295 * This method is equivalent to {@link #takeFirst()}.
1296 * </p>
1297 *
1298 * @return the unlinked element
1299 * @throws InterruptedException if the current thread is interrupted
1300 */
1301 @Override
1302 public E take() throws InterruptedException {
1303 return takeFirst();
1304 }
1305
1306 /**
1307 * Unlinks the first element in the queue, waiting until there is an element
1308 * to unlink if the queue is empty.
1309 *
1310 * @return the unlinked element
1311 * @throws InterruptedException if the current thread is interrupted
1312 */
1313 @Override
1314 public E takeFirst() throws InterruptedException {
1315 lock.lock();
1316 try {
1317 E x;
1318 while ((x = unlinkFirst()) == null) {
1319 notEmpty.await();
1320 }
1321 return x;
1322 } finally {
1323 lock.unlock();
1324 }
1325 }
1326
1327 /**
1328 * Unlinks the last element in the queue, waiting until there is an element
1329 * to unlink if the queue is empty.
1330 *
1331 * @return the unlinked element
1332 * @throws InterruptedException if the current thread is interrupted
1333 */
1334 @Override
1335 public E takeLast() throws InterruptedException {
1336 lock.lock();
1337 try {
1338 E x;
1339 while ((x = unlinkLast()) == null) {
1340 notEmpty.await();
1341 }
1342 return x;
1343 } finally {
1344 lock.unlock();
1345 }
1346 }
1347
1348 /**
1349 * Returns an array containing all of the elements in this deque, in
1350 * proper sequence (from first to last element).
1351 *
1352 * <p>
1353 * The returned array will be "safe" in that no references to it are
1354 * maintained by this deque. (In other words, this method must allocate
1355 * a new array). The caller is thus free to modify the returned array.
1356 * </p>
1357 * <p>
1358 * This method acts as bridge between array-based and collection-based
1359 * APIs.
1360 * </p>
1361 *
1362 * @return an array containing all of the elements in this deque
1363 */
1364 @Override
1365 public Object[] toArray() {
1366 lock.lock();
1367 try {
1368 final Object[] a = new Object[count];
1369 int k = 0;
1370 for (Node<E> p = first; p != null; p = p.next) {
1371 a[k++] = p.item;
1372 }
1373 return a;
1374 } finally {
1375 lock.unlock();
1376 }
1377 }
1378
1379 /**
1380 * {@inheritDoc}
1381 */
1382 @SuppressWarnings("unchecked")
1383 @Override
1384 public <T> T[] toArray(T[] a) {
1385 lock.lock();
1386 try {
1387 if (a.length < count) {
1388 a = (T[]) java.lang.reflect.Array.newInstance(a.getClass().getComponentType(), count);
1389 }
1390 int k = 0;
1391 for (Node<E> p = first; p != null; p = p.next) {
1392 a[k++] = (T) p.item;
1393 }
1394 if (a.length > k) {
1395 a[k] = null;
1396 }
1397 return a;
1398 } finally {
1399 lock.unlock();
1400 }
1401 }
1402
1403 @Override
1404 public String toString() {
1405 lock.lock();
1406 try {
1407 return super.toString();
1408 } finally {
1409 lock.unlock();
1410 }
1411 }
1412
1413 /**
1414 * Unlinks the provided node.
1415 *
1416 * @param x The node to unlink
1417 */
1418 private void unlink(final Node<E> x) {
1419 // assert lock.isHeldByCurrentThread();
1420 final Node<E> p = x.prev;
1421 final Node<E> n = x.next;
1422 if (p == null) {
1423 unlinkFirst();
1424 } else if (n == null) {
1425 unlinkLast();
1426 } else {
1427 p.next = n;
1428 n.prev = p;
1429 x.item = null;
1430 // Don't mess with x's links. They may still be in use by an iterator.
1431 --count;
1432 notFull.signal();
1433 }
1434 }
1435
1436 // Monitoring methods
1437
1438 /**
1439 * Removes and returns the first element, or null if empty.
1440 *
1441 * @return The first element or {@code null} if empty
1442 */
1443 private E unlinkFirst() {
1444 // assert lock.isHeldByCurrentThread();
1445 final Node<E> f = first;
1446 if (f == null) {
1447 return null;
1448 }
1449 final Node<E> n = f.next;
1450 final E item = f.item;
1451 f.item = null;
1452 f.next = f; // help GC
1453 first = n;
1454 if (n == null) {
1455 last = null;
1456 } else {
1457 n.prev = null;
1458 }
1459 --count;
1460 notFull.signal();
1461 return item;
1462 }
1463
1464 /**
1465 * Removes and returns the last element, or null if empty.
1466 *
1467 * @return The first element or {@code null} if empty
1468 */
1469 private E unlinkLast() {
1470 // assert lock.isHeldByCurrentThread();
1471 final Node<E> l = last;
1472 if (l == null) {
1473 return null;
1474 }
1475 final Node<E> p = l.prev;
1476 final E item = l.item;
1477 l.item = null;
1478 l.prev = l; // help GC
1479 last = p;
1480 if (p == null) {
1481 first = null;
1482 } else {
1483 p.next = null;
1484 }
1485 --count;
1486 notFull.signal();
1487 return item;
1488 }
1489
1490 /**
1491 * Saves the state of this deque to a stream (that is, serialize it).
1492 *
1493 * @serialData The capacity (int), followed by elements (each an
1494 * {@code Object}) in the proper order, followed by a null
1495 * @param s the stream
1496 * @throws IOException if I/O errors occur while writing to the underlying {@code OutputStream}
1497 */
1498 private void writeObject(final java.io.ObjectOutputStream s) throws IOException {
1499 lock.lock();
1500 try {
1501 // Write out capacity and any hidden stuff
1502 s.defaultWriteObject();
1503 // Write out all elements in the proper order.
1504 for (Node<E> p = first; p != null; p = p.next) {
1505 s.writeObject(p.item);
1506 }
1507 // Use trailing null as sentinel
1508 s.writeObject(null);
1509 } finally {
1510 lock.unlock();
1511 }
1512 }
1513 }