View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    * 
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   * 
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.    
18   */ 
19  
20  package org.apache.commons.pipeline.util;
21  
22  import java.util.Comparator;
23  import java.util.concurrent.ArrayBlockingQueue;
24  import java.util.concurrent.BlockingQueue;
25  import java.util.concurrent.DelayQueue;
26  import java.util.concurrent.Delayed;
27  import java.util.concurrent.LinkedBlockingQueue;
28  import java.util.concurrent.PriorityBlockingQueue;
29  import java.util.concurrent.SynchronousQueue;
30  
31  import org.apache.commons.pipeline.StageDriver;
32  
33  /**
34   * Many {@link StageDriver} implementations require for one or more queues
35   * to be created. This interface provides a consistent API for factories used
36   * to create such queues and supplies a couple of default implementations.
37   */
38  public interface BlockingQueueFactory<T> extends QueueFactory<T> {
39      public BlockingQueue<T> createQueue();
40      
41      public static class ArrayBlockingQueueFactory<T> extends AbstractQueueFactory<T> implements BlockingQueueFactory<T> {
42          public ArrayBlockingQueue<T> createQueue() {
43              if (this.initialContents == null || this.initialContents.isEmpty()) {
44                  return new ArrayBlockingQueue<T>(this.capacity, this.fair);
45              } else {
46                  if (this.initialContents.size() > this.capacity) {
47                      throw new IllegalStateException("The number of elements in the initial contents of the queue to be created exceeds its capacity.");
48                  } else {
49                      ArrayBlockingQueue<T> queue = new ArrayBlockingQueue<T>(this.capacity, this.fair);
50                      queue.addAll(this.initialContents);
51                      return queue;
52                  }
53              }
54          }
55          
56          /**
57           * Holds value of property capacity.
58           */
59          private int capacity = Integer.MAX_VALUE;
60          
61          /**
62           * Getter for property capacity.
63           * @return Value of property capacity.
64           */
65          public void setCapacity(int capacity) {
66              this.capacity = capacity;
67          }
68          
69          /**
70           * Setter for property capacity.
71           * @param capacity New value of property capacity.
72           */
73          public int getCapacity() {
74              return this.capacity;
75          }
76          
77          /**
78           * Holds value of property fair.
79           */
80          private boolean fair = false;
81          
82          /**
83           * Getter for property fair.
84           * @return Value of property fair.
85           */
86          public boolean isFair() {
87              return this.fair;
88          }
89          
90          /**
91           * Setter for property fair.
92           * @param fair New value of property fair.
93           */
94          public void setFair(boolean fair) {
95              this.fair = fair;
96          }
97      }
98      
99      public static class DelayQueueFactoryL<T extends Delayed> extends AbstractQueueFactory<T>  implements BlockingQueueFactory<T> {
100         public DelayQueue<T> createQueue() {
101             if (this.initialContents == null || this.initialContents.isEmpty()) {
102                 return new DelayQueue<T>();
103             } else {
104                 return new DelayQueue<T>(this.initialContents);
105             }
106         }
107     }
108     
109     public static class LinkedBlockingQueueFactory<T> extends AbstractQueueFactory<T>  implements BlockingQueueFactory<T> {
110         
111         public LinkedBlockingQueue<T> createQueue() {
112             if (this.initialContents == null || this.initialContents.isEmpty()) {
113                 return new LinkedBlockingQueue<T>(capacity);
114             } else {
115                 if (this.initialContents.size() > this.capacity) {
116                     throw new IllegalStateException("The number of elements in the initial contents of the queue to be created exceeds its capacity.");
117                 } else {
118                     LinkedBlockingQueue<T> queue = new LinkedBlockingQueue<T>(capacity);
119                     queue.addAll(this.initialContents);
120                     return queue;
121                 }
122             }
123         }
124         
125         /**
126          * Holds value of property capacity.
127          */
128         private int capacity = Integer.MAX_VALUE;
129         
130         /**
131          * Getter for property capacity.
132          * @return Value of property capacity.
133          */
134         public void setCapacity(int capacity) {
135             this.capacity = capacity;
136         }
137         
138         /**
139          * Setter for property capacity.
140          * @param capacity New value of property capacity.
141          */
142         public int getCapacity() {
143             return this.capacity;
144         }
145     }
146     
147     public static class PriorityBlockingQueueFactory<T> extends AbstractQueueFactory<T> implements BlockingQueueFactory<T> {
148         public PriorityBlockingQueue<T> createQueue() {
149             if (comparator == null) {
150                 if (this.initialContents == null || this.initialContents.isEmpty()) {
151                     return new PriorityBlockingQueue<T>(initialCapacity);
152                 } else {
153                     return new PriorityBlockingQueue<T>(this.initialContents);
154                 }
155             } else {
156                 PriorityBlockingQueue<T> queue = new PriorityBlockingQueue<T>(initialCapacity, comparator);
157                 if ( !(this.initialContents == null || this.initialContents.isEmpty()) ) {
158                     queue.addAll(this.initialContents);
159                 }
160                 return queue;
161             }
162         }
163         
164         /**
165          * Holds value of property initialCapacity. Default value is the same
166          * as that for java.util.concurrent.PriorityBlockingQueue.
167          */
168         private int initialCapacity = 11;
169         
170         /**
171          * Getter for property initialCapacity.
172          * @return Value of property initialCapacity.
173          */
174         public int getInitialCapacity() {
175             return this.initialCapacity;
176         }
177         
178         /**
179          * Setter for property initialCapacity.
180          * @param initialCapacity New value of property initialCapacity.
181          */
182         public void setInitialCapacity(int initialCapacity) {
183             this.initialCapacity = initialCapacity;
184         }
185         
186         /**
187          * Holds value of property comparator.
188          */
189         private Comparator<? super T> comparator;
190         
191         /**
192          * Getter for property comparator.
193          * @return Value of property comparator.
194          */
195         public Comparator<? super T> getComparator() {
196             return this.comparator;
197         }
198         
199         /**
200          * Setter for property comparator.
201          * @param comparator New value of property comparator.
202          */
203         public void setComparator(Comparator<? super T> comparator) {
204             this.comparator = comparator;
205         }
206         
207     }
208     
209     public static class SynchronousQueueFactory<T> implements BlockingQueueFactory<T> {
210         public SynchronousQueue<T> createQueue() {
211             return new SynchronousQueue<T>(this.fair);
212         }
213         
214         /**
215          * Holds value of property fair.
216          */
217         private boolean fair = false;
218         
219         /**
220          * Getter for property fair.
221          * @return Value of property fair.
222          */
223         public boolean isFair() {
224             return this.fair;
225         }
226         
227         /**
228          * Setter for property fair.
229          * @param fair New value of property fair.
230          */
231         public void setFair(boolean fair) {
232             this.fair = fair;
233         }
234     }
235 }