1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.commons.pipeline.util;
21
22 import java.util.Comparator;
23 import java.util.concurrent.ArrayBlockingQueue;
24 import java.util.concurrent.BlockingQueue;
25 import java.util.concurrent.DelayQueue;
26 import java.util.concurrent.Delayed;
27 import java.util.concurrent.LinkedBlockingQueue;
28 import java.util.concurrent.PriorityBlockingQueue;
29 import java.util.concurrent.SynchronousQueue;
30
31 import org.apache.commons.pipeline.StageDriver;
32
33
34
35
36
37
38 public interface BlockingQueueFactory<T> extends QueueFactory<T> {
39 public BlockingQueue<T> createQueue();
40
41 public static class ArrayBlockingQueueFactory<T> extends AbstractQueueFactory<T> implements BlockingQueueFactory<T> {
42 public ArrayBlockingQueue<T> createQueue() {
43 if (this.initialContents == null || this.initialContents.isEmpty()) {
44 return new ArrayBlockingQueue<T>(this.capacity, this.fair);
45 } else {
46 if (this.initialContents.size() > this.capacity) {
47 throw new IllegalStateException("The number of elements in the initial contents of the queue to be created exceeds its capacity.");
48 } else {
49 ArrayBlockingQueue<T> queue = new ArrayBlockingQueue<T>(this.capacity, this.fair);
50 queue.addAll(this.initialContents);
51 return queue;
52 }
53 }
54 }
55
56
57
58
59 private int capacity = Integer.MAX_VALUE;
60
61
62
63
64
65 public void setCapacity(int capacity) {
66 this.capacity = capacity;
67 }
68
69
70
71
72
73 public int getCapacity() {
74 return this.capacity;
75 }
76
77
78
79
80 private boolean fair = false;
81
82
83
84
85
86 public boolean isFair() {
87 return this.fair;
88 }
89
90
91
92
93
94 public void setFair(boolean fair) {
95 this.fair = fair;
96 }
97 }
98
99 public static class DelayQueueFactoryL<T extends Delayed> extends AbstractQueueFactory<T> implements BlockingQueueFactory<T> {
100 public DelayQueue<T> createQueue() {
101 if (this.initialContents == null || this.initialContents.isEmpty()) {
102 return new DelayQueue<T>();
103 } else {
104 return new DelayQueue<T>(this.initialContents);
105 }
106 }
107 }
108
109 public static class LinkedBlockingQueueFactory<T> extends AbstractQueueFactory<T> implements BlockingQueueFactory<T> {
110
111 public LinkedBlockingQueue<T> createQueue() {
112 if (this.initialContents == null || this.initialContents.isEmpty()) {
113 return new LinkedBlockingQueue<T>(capacity);
114 } else {
115 if (this.initialContents.size() > this.capacity) {
116 throw new IllegalStateException("The number of elements in the initial contents of the queue to be created exceeds its capacity.");
117 } else {
118 LinkedBlockingQueue<T> queue = new LinkedBlockingQueue<T>(capacity);
119 queue.addAll(this.initialContents);
120 return queue;
121 }
122 }
123 }
124
125
126
127
128 private int capacity = Integer.MAX_VALUE;
129
130
131
132
133
134 public void setCapacity(int capacity) {
135 this.capacity = capacity;
136 }
137
138
139
140
141
142 public int getCapacity() {
143 return this.capacity;
144 }
145 }
146
147 public static class PriorityBlockingQueueFactory<T> extends AbstractQueueFactory<T> implements BlockingQueueFactory<T> {
148 public PriorityBlockingQueue<T> createQueue() {
149 if (comparator == null) {
150 if (this.initialContents == null || this.initialContents.isEmpty()) {
151 return new PriorityBlockingQueue<T>(initialCapacity);
152 } else {
153 return new PriorityBlockingQueue<T>(this.initialContents);
154 }
155 } else {
156 PriorityBlockingQueue<T> queue = new PriorityBlockingQueue<T>(initialCapacity, comparator);
157 if ( !(this.initialContents == null || this.initialContents.isEmpty()) ) {
158 queue.addAll(this.initialContents);
159 }
160 return queue;
161 }
162 }
163
164
165
166
167
168 private int initialCapacity = 11;
169
170
171
172
173
174 public int getInitialCapacity() {
175 return this.initialCapacity;
176 }
177
178
179
180
181
182 public void setInitialCapacity(int initialCapacity) {
183 this.initialCapacity = initialCapacity;
184 }
185
186
187
188
189 private Comparator<? super T> comparator;
190
191
192
193
194
195 public Comparator<? super T> getComparator() {
196 return this.comparator;
197 }
198
199
200
201
202
203 public void setComparator(Comparator<? super T> comparator) {
204 this.comparator = comparator;
205 }
206
207 }
208
209 public static class SynchronousQueueFactory<T> implements BlockingQueueFactory<T> {
210 public SynchronousQueue<T> createQueue() {
211 return new SynchronousQueue<T>(this.fair);
212 }
213
214
215
216
217 private boolean fair = false;
218
219
220
221
222
223 public boolean isFair() {
224 return this.fair;
225 }
226
227
228
229
230
231 public void setFair(boolean fair) {
232 this.fair = fair;
233 }
234 }
235 }