001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing,
013     * software distributed under the License is distributed on an
014     * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015     * KIND, either express or implied.  See the License for the
016     * specific language governing permissions and limitations
017     * under the License.    
018     */ 
019    
020    package org.apache.commons.pipeline.util;
021    
022    import java.util.Comparator;
023    import java.util.concurrent.ArrayBlockingQueue;
024    import java.util.concurrent.BlockingQueue;
025    import java.util.concurrent.DelayQueue;
026    import java.util.concurrent.Delayed;
027    import java.util.concurrent.LinkedBlockingQueue;
028    import java.util.concurrent.PriorityBlockingQueue;
029    import java.util.concurrent.SynchronousQueue;
030    
031    import org.apache.commons.pipeline.StageDriver;
032    
033    /**
034     * Many {@link StageDriver} implementations require for one or more queues
035     * to be created. This interface provides a consistent API for factories used
036     * to create such queues and supplies a couple of default implementations.
037     */
038    public interface BlockingQueueFactory<T> extends QueueFactory<T> {
039        public BlockingQueue<T> createQueue();
040        
041        public static class ArrayBlockingQueueFactory<T> extends AbstractQueueFactory<T> implements BlockingQueueFactory<T> {
042            public ArrayBlockingQueue<T> createQueue() {
043                if (this.initialContents == null || this.initialContents.isEmpty()) {
044                    return new ArrayBlockingQueue<T>(this.capacity, this.fair);
045                } else {
046                    if (this.initialContents.size() > this.capacity) {
047                        throw new IllegalStateException("The number of elements in the initial contents of the queue to be created exceeds its capacity.");
048                    } else {
049                        ArrayBlockingQueue<T> queue = new ArrayBlockingQueue<T>(this.capacity, this.fair);
050                        queue.addAll(this.initialContents);
051                        return queue;
052                    }
053                }
054            }
055            
056            /**
057             * Holds value of property capacity.
058             */
059            private int capacity = Integer.MAX_VALUE;
060            
061            /**
062             * Getter for property capacity.
063             * @return Value of property capacity.
064             */
065            public void setCapacity(int capacity) {
066                this.capacity = capacity;
067            }
068            
069            /**
070             * Setter for property capacity.
071             * @param capacity New value of property capacity.
072             */
073            public int getCapacity() {
074                return this.capacity;
075            }
076            
077            /**
078             * Holds value of property fair.
079             */
080            private boolean fair = false;
081            
082            /**
083             * Getter for property fair.
084             * @return Value of property fair.
085             */
086            public boolean isFair() {
087                return this.fair;
088            }
089            
090            /**
091             * Setter for property fair.
092             * @param fair New value of property fair.
093             */
094            public void setFair(boolean fair) {
095                this.fair = fair;
096            }
097        }
098        
099        public static class DelayQueueFactoryL<T extends Delayed> extends AbstractQueueFactory<T>  implements BlockingQueueFactory<T> {
100            public DelayQueue<T> createQueue() {
101                if (this.initialContents == null || this.initialContents.isEmpty()) {
102                    return new DelayQueue<T>();
103                } else {
104                    return new DelayQueue<T>(this.initialContents);
105                }
106            }
107        }
108        
109        public static class LinkedBlockingQueueFactory<T> extends AbstractQueueFactory<T>  implements BlockingQueueFactory<T> {
110            
111            public LinkedBlockingQueue<T> createQueue() {
112                if (this.initialContents == null || this.initialContents.isEmpty()) {
113                    return new LinkedBlockingQueue<T>(capacity);
114                } else {
115                    if (this.initialContents.size() > this.capacity) {
116                        throw new IllegalStateException("The number of elements in the initial contents of the queue to be created exceeds its capacity.");
117                    } else {
118                        LinkedBlockingQueue<T> queue = new LinkedBlockingQueue<T>(capacity);
119                        queue.addAll(this.initialContents);
120                        return queue;
121                    }
122                }
123            }
124            
125            /**
126             * Holds value of property capacity.
127             */
128            private int capacity = Integer.MAX_VALUE;
129            
130            /**
131             * Getter for property capacity.
132             * @return Value of property capacity.
133             */
134            public void setCapacity(int capacity) {
135                this.capacity = capacity;
136            }
137            
138            /**
139             * Setter for property capacity.
140             * @param capacity New value of property capacity.
141             */
142            public int getCapacity() {
143                return this.capacity;
144            }
145        }
146        
147        public static class PriorityBlockingQueueFactory<T> extends AbstractQueueFactory<T> implements BlockingQueueFactory<T> {
148            public PriorityBlockingQueue<T> createQueue() {
149                if (comparator == null) {
150                    if (this.initialContents == null || this.initialContents.isEmpty()) {
151                        return new PriorityBlockingQueue<T>(initialCapacity);
152                    } else {
153                        return new PriorityBlockingQueue<T>(this.initialContents);
154                    }
155                } else {
156                    PriorityBlockingQueue<T> queue = new PriorityBlockingQueue<T>(initialCapacity, comparator);
157                    if ( !(this.initialContents == null || this.initialContents.isEmpty()) ) {
158                        queue.addAll(this.initialContents);
159                    }
160                    return queue;
161                }
162            }
163            
164            /**
165             * Holds value of property initialCapacity. Default value is the same
166             * as that for java.util.concurrent.PriorityBlockingQueue.
167             */
168            private int initialCapacity = 11;
169            
170            /**
171             * Getter for property initialCapacity.
172             * @return Value of property initialCapacity.
173             */
174            public int getInitialCapacity() {
175                return this.initialCapacity;
176            }
177            
178            /**
179             * Setter for property initialCapacity.
180             * @param initialCapacity New value of property initialCapacity.
181             */
182            public void setInitialCapacity(int initialCapacity) {
183                this.initialCapacity = initialCapacity;
184            }
185            
186            /**
187             * Holds value of property comparator.
188             */
189            private Comparator<? super T> comparator;
190            
191            /**
192             * Getter for property comparator.
193             * @return Value of property comparator.
194             */
195            public Comparator<? super T> getComparator() {
196                return this.comparator;
197            }
198            
199            /**
200             * Setter for property comparator.
201             * @param comparator New value of property comparator.
202             */
203            public void setComparator(Comparator<? super T> comparator) {
204                this.comparator = comparator;
205            }
206            
207        }
208        
209        public static class SynchronousQueueFactory<T> implements BlockingQueueFactory<T> {
210            public SynchronousQueue<T> createQueue() {
211                return new SynchronousQueue<T>(this.fair);
212            }
213            
214            /**
215             * Holds value of property fair.
216             */
217            private boolean fair = false;
218            
219            /**
220             * Getter for property fair.
221             * @return Value of property fair.
222             */
223            public boolean isFair() {
224                return this.fair;
225            }
226            
227            /**
228             * Setter for property fair.
229             * @param fair New value of property fair.
230             */
231            public void setFair(boolean fair) {
232                this.fair = fair;
233            }
234        }
235    }