001 /*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied. See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019
020 package org.apache.commons.pipeline.util;
021
022 import java.util.Comparator;
023 import java.util.concurrent.ArrayBlockingQueue;
024 import java.util.concurrent.BlockingQueue;
025 import java.util.concurrent.DelayQueue;
026 import java.util.concurrent.Delayed;
027 import java.util.concurrent.LinkedBlockingQueue;
028 import java.util.concurrent.PriorityBlockingQueue;
029 import java.util.concurrent.SynchronousQueue;
030
031 import org.apache.commons.pipeline.StageDriver;
032
033 /**
034 * Many {@link StageDriver} implementations require for one or more queues
035 * to be created. This interface provides a consistent API for factories used
036 * to create such queues and supplies a couple of default implementations.
037 */
038 public interface BlockingQueueFactory<T> extends QueueFactory<T> {
039 public BlockingQueue<T> createQueue();
040
041 public static class ArrayBlockingQueueFactory<T> extends AbstractQueueFactory<T> implements BlockingQueueFactory<T> {
042 public ArrayBlockingQueue<T> createQueue() {
043 if (this.initialContents == null || this.initialContents.isEmpty()) {
044 return new ArrayBlockingQueue<T>(this.capacity, this.fair);
045 } else {
046 if (this.initialContents.size() > this.capacity) {
047 throw new IllegalStateException("The number of elements in the initial contents of the queue to be created exceeds its capacity.");
048 } else {
049 ArrayBlockingQueue<T> queue = new ArrayBlockingQueue<T>(this.capacity, this.fair);
050 queue.addAll(this.initialContents);
051 return queue;
052 }
053 }
054 }
055
056 /**
057 * Holds value of property capacity.
058 */
059 private int capacity = Integer.MAX_VALUE;
060
061 /**
062 * Getter for property capacity.
063 * @return Value of property capacity.
064 */
065 public void setCapacity(int capacity) {
066 this.capacity = capacity;
067 }
068
069 /**
070 * Setter for property capacity.
071 * @param capacity New value of property capacity.
072 */
073 public int getCapacity() {
074 return this.capacity;
075 }
076
077 /**
078 * Holds value of property fair.
079 */
080 private boolean fair = false;
081
082 /**
083 * Getter for property fair.
084 * @return Value of property fair.
085 */
086 public boolean isFair() {
087 return this.fair;
088 }
089
090 /**
091 * Setter for property fair.
092 * @param fair New value of property fair.
093 */
094 public void setFair(boolean fair) {
095 this.fair = fair;
096 }
097 }
098
099 public static class DelayQueueFactoryL<T extends Delayed> extends AbstractQueueFactory<T> implements BlockingQueueFactory<T> {
100 public DelayQueue<T> createQueue() {
101 if (this.initialContents == null || this.initialContents.isEmpty()) {
102 return new DelayQueue<T>();
103 } else {
104 return new DelayQueue<T>(this.initialContents);
105 }
106 }
107 }
108
109 public static class LinkedBlockingQueueFactory<T> extends AbstractQueueFactory<T> implements BlockingQueueFactory<T> {
110
111 public LinkedBlockingQueue<T> createQueue() {
112 if (this.initialContents == null || this.initialContents.isEmpty()) {
113 return new LinkedBlockingQueue<T>(capacity);
114 } else {
115 if (this.initialContents.size() > this.capacity) {
116 throw new IllegalStateException("The number of elements in the initial contents of the queue to be created exceeds its capacity.");
117 } else {
118 LinkedBlockingQueue<T> queue = new LinkedBlockingQueue<T>(capacity);
119 queue.addAll(this.initialContents);
120 return queue;
121 }
122 }
123 }
124
125 /**
126 * Holds value of property capacity.
127 */
128 private int capacity = Integer.MAX_VALUE;
129
130 /**
131 * Getter for property capacity.
132 * @return Value of property capacity.
133 */
134 public void setCapacity(int capacity) {
135 this.capacity = capacity;
136 }
137
138 /**
139 * Setter for property capacity.
140 * @param capacity New value of property capacity.
141 */
142 public int getCapacity() {
143 return this.capacity;
144 }
145 }
146
147 public static class PriorityBlockingQueueFactory<T> extends AbstractQueueFactory<T> implements BlockingQueueFactory<T> {
148 public PriorityBlockingQueue<T> createQueue() {
149 if (comparator == null) {
150 if (this.initialContents == null || this.initialContents.isEmpty()) {
151 return new PriorityBlockingQueue<T>(initialCapacity);
152 } else {
153 return new PriorityBlockingQueue<T>(this.initialContents);
154 }
155 } else {
156 PriorityBlockingQueue<T> queue = new PriorityBlockingQueue<T>(initialCapacity, comparator);
157 if ( !(this.initialContents == null || this.initialContents.isEmpty()) ) {
158 queue.addAll(this.initialContents);
159 }
160 return queue;
161 }
162 }
163
164 /**
165 * Holds value of property initialCapacity. Default value is the same
166 * as that for java.util.concurrent.PriorityBlockingQueue.
167 */
168 private int initialCapacity = 11;
169
170 /**
171 * Getter for property initialCapacity.
172 * @return Value of property initialCapacity.
173 */
174 public int getInitialCapacity() {
175 return this.initialCapacity;
176 }
177
178 /**
179 * Setter for property initialCapacity.
180 * @param initialCapacity New value of property initialCapacity.
181 */
182 public void setInitialCapacity(int initialCapacity) {
183 this.initialCapacity = initialCapacity;
184 }
185
186 /**
187 * Holds value of property comparator.
188 */
189 private Comparator<? super T> comparator;
190
191 /**
192 * Getter for property comparator.
193 * @return Value of property comparator.
194 */
195 public Comparator<? super T> getComparator() {
196 return this.comparator;
197 }
198
199 /**
200 * Setter for property comparator.
201 * @param comparator New value of property comparator.
202 */
203 public void setComparator(Comparator<? super T> comparator) {
204 this.comparator = comparator;
205 }
206
207 }
208
209 public static class SynchronousQueueFactory<T> implements BlockingQueueFactory<T> {
210 public SynchronousQueue<T> createQueue() {
211 return new SynchronousQueue<T>(this.fair);
212 }
213
214 /**
215 * Holds value of property fair.
216 */
217 private boolean fair = false;
218
219 /**
220 * Getter for property fair.
221 * @return Value of property fair.
222 */
223 public boolean isFair() {
224 return this.fair;
225 }
226
227 /**
228 * Setter for property fair.
229 * @param fair New value of property fair.
230 */
231 public void setFair(boolean fair) {
232 this.fair = fair;
233 }
234 }
235 }