001 /* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019 020 package org.apache.commons.pipeline.util; 021 022 import java.util.Comparator; 023 import java.util.concurrent.ArrayBlockingQueue; 024 import java.util.concurrent.BlockingQueue; 025 import java.util.concurrent.DelayQueue; 026 import java.util.concurrent.Delayed; 027 import java.util.concurrent.LinkedBlockingQueue; 028 import java.util.concurrent.PriorityBlockingQueue; 029 import java.util.concurrent.SynchronousQueue; 030 031 import org.apache.commons.pipeline.StageDriver; 032 033 /** 034 * Many {@link StageDriver} implementations require for one or more queues 035 * to be created. This interface provides a consistent API for factories used 036 * to create such queues and supplies a couple of default implementations. 037 */ 038 public interface BlockingQueueFactory<T> extends QueueFactory<T> { 039 public BlockingQueue<T> createQueue(); 040 041 public static class ArrayBlockingQueueFactory<T> extends AbstractQueueFactory<T> implements BlockingQueueFactory<T> { 042 public ArrayBlockingQueue<T> createQueue() { 043 if (this.initialContents == null || this.initialContents.isEmpty()) { 044 return new ArrayBlockingQueue<T>(this.capacity, this.fair); 045 } else { 046 if (this.initialContents.size() > this.capacity) { 047 throw new IllegalStateException("The number of elements in the initial contents of the queue to be created exceeds its capacity."); 048 } else { 049 ArrayBlockingQueue<T> queue = new ArrayBlockingQueue<T>(this.capacity, this.fair); 050 queue.addAll(this.initialContents); 051 return queue; 052 } 053 } 054 } 055 056 /** 057 * Holds value of property capacity. 058 */ 059 private int capacity = Integer.MAX_VALUE; 060 061 /** 062 * Getter for property capacity. 063 * @return Value of property capacity. 064 */ 065 public void setCapacity(int capacity) { 066 this.capacity = capacity; 067 } 068 069 /** 070 * Setter for property capacity. 071 * @param capacity New value of property capacity. 072 */ 073 public int getCapacity() { 074 return this.capacity; 075 } 076 077 /** 078 * Holds value of property fair. 079 */ 080 private boolean fair = false; 081 082 /** 083 * Getter for property fair. 084 * @return Value of property fair. 085 */ 086 public boolean isFair() { 087 return this.fair; 088 } 089 090 /** 091 * Setter for property fair. 092 * @param fair New value of property fair. 093 */ 094 public void setFair(boolean fair) { 095 this.fair = fair; 096 } 097 } 098 099 public static class DelayQueueFactoryL<T extends Delayed> extends AbstractQueueFactory<T> implements BlockingQueueFactory<T> { 100 public DelayQueue<T> createQueue() { 101 if (this.initialContents == null || this.initialContents.isEmpty()) { 102 return new DelayQueue<T>(); 103 } else { 104 return new DelayQueue<T>(this.initialContents); 105 } 106 } 107 } 108 109 public static class LinkedBlockingQueueFactory<T> extends AbstractQueueFactory<T> implements BlockingQueueFactory<T> { 110 111 public LinkedBlockingQueue<T> createQueue() { 112 if (this.initialContents == null || this.initialContents.isEmpty()) { 113 return new LinkedBlockingQueue<T>(capacity); 114 } else { 115 if (this.initialContents.size() > this.capacity) { 116 throw new IllegalStateException("The number of elements in the initial contents of the queue to be created exceeds its capacity."); 117 } else { 118 LinkedBlockingQueue<T> queue = new LinkedBlockingQueue<T>(capacity); 119 queue.addAll(this.initialContents); 120 return queue; 121 } 122 } 123 } 124 125 /** 126 * Holds value of property capacity. 127 */ 128 private int capacity = Integer.MAX_VALUE; 129 130 /** 131 * Getter for property capacity. 132 * @return Value of property capacity. 133 */ 134 public void setCapacity(int capacity) { 135 this.capacity = capacity; 136 } 137 138 /** 139 * Setter for property capacity. 140 * @param capacity New value of property capacity. 141 */ 142 public int getCapacity() { 143 return this.capacity; 144 } 145 } 146 147 public static class PriorityBlockingQueueFactory<T> extends AbstractQueueFactory<T> implements BlockingQueueFactory<T> { 148 public PriorityBlockingQueue<T> createQueue() { 149 if (comparator == null) { 150 if (this.initialContents == null || this.initialContents.isEmpty()) { 151 return new PriorityBlockingQueue<T>(initialCapacity); 152 } else { 153 return new PriorityBlockingQueue<T>(this.initialContents); 154 } 155 } else { 156 PriorityBlockingQueue<T> queue = new PriorityBlockingQueue<T>(initialCapacity, comparator); 157 if ( !(this.initialContents == null || this.initialContents.isEmpty()) ) { 158 queue.addAll(this.initialContents); 159 } 160 return queue; 161 } 162 } 163 164 /** 165 * Holds value of property initialCapacity. Default value is the same 166 * as that for java.util.concurrent.PriorityBlockingQueue. 167 */ 168 private int initialCapacity = 11; 169 170 /** 171 * Getter for property initialCapacity. 172 * @return Value of property initialCapacity. 173 */ 174 public int getInitialCapacity() { 175 return this.initialCapacity; 176 } 177 178 /** 179 * Setter for property initialCapacity. 180 * @param initialCapacity New value of property initialCapacity. 181 */ 182 public void setInitialCapacity(int initialCapacity) { 183 this.initialCapacity = initialCapacity; 184 } 185 186 /** 187 * Holds value of property comparator. 188 */ 189 private Comparator<? super T> comparator; 190 191 /** 192 * Getter for property comparator. 193 * @return Value of property comparator. 194 */ 195 public Comparator<? super T> getComparator() { 196 return this.comparator; 197 } 198 199 /** 200 * Setter for property comparator. 201 * @param comparator New value of property comparator. 202 */ 203 public void setComparator(Comparator<? super T> comparator) { 204 this.comparator = comparator; 205 } 206 207 } 208 209 public static class SynchronousQueueFactory<T> implements BlockingQueueFactory<T> { 210 public SynchronousQueue<T> createQueue() { 211 return new SynchronousQueue<T>(this.fair); 212 } 213 214 /** 215 * Holds value of property fair. 216 */ 217 private boolean fair = false; 218 219 /** 220 * Getter for property fair. 221 * @return Value of property fair. 222 */ 223 public boolean isFair() { 224 return this.fair; 225 } 226 227 /** 228 * Setter for property fair. 229 * @param fair New value of property fair. 230 */ 231 public void setFair(boolean fair) { 232 this.fair = fair; 233 } 234 } 235 }