001 /* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.commons.lang3.concurrent; 018 019 import java.util.concurrent.ScheduledExecutorService; 020 import java.util.concurrent.ScheduledFuture; 021 import java.util.concurrent.ScheduledThreadPoolExecutor; 022 import java.util.concurrent.TimeUnit; 023 024 /** 025 * <p> 026 * A specialized <em>semaphore</em> implementation that provides a number of 027 * permits in a given time frame. 028 * </p> 029 * <p> 030 * This class is similar to the {@code java.util.concurrent.Semaphore} class 031 * provided by the JDK in that it manages a configurable number of permits. 032 * Using the {@link #acquire()} method a permit can be requested by a thread. 033 * However, there is an additional timing dimension: there is no {@code 034 * release()} method for freeing a permit, but all permits are automatically 035 * released at the end of a configurable time frame. If a thread calls 036 * {@link #acquire()} and the available permits are already exhausted for this 037 * time frame, the thread is blocked. When the time frame ends all permits 038 * requested so far are restored, and blocking threads are waked up again, so 039 * that they can try to acquire a new permit. This basically means that in the 040 * specified time frame only the given number of operations is possible. 041 * </p> 042 * <p> 043 * A use case for this class is to artificially limit the load produced by a 044 * process. As an example consider an application that issues database queries 045 * on a production system in a background process to gather statistical 046 * information. This background processing should not produce so much database 047 * load that the functionality and the performance of the production system are 048 * impacted. Here a {@code TimedSemaphore} could be installed to guarantee that 049 * only a given number of database queries are issued per second. 050 * </p> 051 * <p> 052 * A thread class for performing database queries could look as follows: 053 * 054 * <pre> 055 * public class StatisticsThread extends Thread { 056 * // The semaphore for limiting database load. 057 * private final TimedSemaphore semaphore; 058 * // Create an instance and set the semaphore 059 * public StatisticsThread(TimedSemaphore timedSemaphore) { 060 * semaphore = timedSemaphore; 061 * } 062 * // Gather statistics 063 * public void run() { 064 * try { 065 * while(true) { 066 * semaphore.acquire(); // limit database load 067 * performQuery(); // issue a query 068 * } 069 * } catch(InterruptedException) { 070 * // fall through 071 * } 072 * } 073 * ... 074 * } 075 * </pre> 076 * 077 * The following code fragment shows how a {@code TimedSemaphore} is created 078 * that allows only 10 operations per second and passed to the statistics 079 * thread: 080 * 081 * <pre> 082 * TimedSemaphore sem = new TimedSemaphore(1, TimeUnit.SECOND, 10); 083 * StatisticsThread thread = new StatisticsThread(sem); 084 * thread.start(); 085 * </pre> 086 * 087 * </p> 088 * <p> 089 * When creating an instance the time period for the semaphore must be 090 * specified. {@code TimedSemaphore} uses an executor service with a 091 * corresponding period to monitor this interval. The {@code 092 * ScheduledExecutorService} to be used for this purpose can be provided at 093 * construction time. Alternatively the class creates an internal executor 094 * service. 095 * </p> 096 * <p> 097 * Client code that uses {@code TimedSemaphore} has to call the 098 * {@link #acquire()} method in aach processing step. {@code TimedSemaphore} 099 * keeps track of the number of invocations of the {@link #acquire()} method and 100 * blocks the calling thread if the counter exceeds the limit specified. When 101 * the timer signals the end of the time period the counter is reset and all 102 * waiting threads are released. Then another cycle can start. 103 * </p> 104 * <p> 105 * It is possible to modify the limit at any time using the 106 * {@link #setLimit(int)} method. This is useful if the load produced by an 107 * operation has to be adapted dynamically. In the example scenario with the 108 * thread collecting statistics it may make sense to specify a low limit during 109 * day time while allowing a higher load in the night time. Reducing the limit 110 * takes effect immediately by blocking incoming callers. If the limit is 111 * increased, waiting threads are not released immediately, but wake up when the 112 * timer runs out. Then, in the next period more processing steps can be 113 * performed without blocking. By setting the limit to 0 the semaphore can be 114 * switched off: in this mode the {@link #acquire()} method never blocks, but 115 * lets all callers pass directly. 116 * </p> 117 * <p> 118 * When the {@code TimedSemaphore} is no more needed its {@link #shutdown()} 119 * method should be called. This causes the periodic task that monitors the time 120 * interval to be canceled. If the {@code ScheduledExecutorService} has been 121 * created by the semaphore at construction time, it is also shut down. 122 * resources. After that {@link #acquire()} must not be called any more. 123 * </p> 124 * 125 * @since 3.0 126 * @version $Id: TimedSemaphore.java 1199894 2011-11-09 17:53:59Z ggregory $ 127 */ 128 public class TimedSemaphore { 129 /** 130 * Constant for a value representing no limit. If the limit is set to a 131 * value less or equal this constant, the {@code TimedSemaphore} will be 132 * effectively switched off. 133 */ 134 public static final int NO_LIMIT = 0; 135 136 /** Constant for the thread pool size for the executor. */ 137 private static final int THREAD_POOL_SIZE = 1; 138 139 /** The executor service for managing the timer thread. */ 140 private final ScheduledExecutorService executorService; 141 142 /** Stores the period for this timed semaphore. */ 143 private final long period; 144 145 /** The time unit for the period. */ 146 private final TimeUnit unit; 147 148 /** A flag whether the executor service was created by this object. */ 149 private final boolean ownExecutor; 150 151 /** A future object representing the timer task. */ 152 private ScheduledFuture<?> task; 153 154 /** Stores the total number of invocations of the acquire() method. */ 155 private long totalAcquireCount; 156 157 /** 158 * The counter for the periods. This counter is increased every time a 159 * period ends. 160 */ 161 private long periodCount; 162 163 /** The limit. */ 164 private int limit; 165 166 /** The current counter. */ 167 private int acquireCount; 168 169 /** The number of invocations of acquire() in the last period. */ 170 private int lastCallsPerPeriod; 171 172 /** A flag whether shutdown() was called. */ 173 private boolean shutdown; 174 175 /** 176 * Creates a new instance of {@link TimedSemaphore} and initializes it with 177 * the given time period and the limit. 178 * 179 * @param timePeriod the time period 180 * @param timeUnit the unit for the period 181 * @param limit the limit for the semaphore 182 * @throws IllegalArgumentException if the period is less or equals 0 183 */ 184 public TimedSemaphore(long timePeriod, TimeUnit timeUnit, int limit) { 185 this(null, timePeriod, timeUnit, limit); 186 } 187 188 /** 189 * Creates a new instance of {@link TimedSemaphore} and initializes it with 190 * an executor service, the given time period, and the limit. The executor 191 * service will be used for creating a periodic task for monitoring the time 192 * period. It can be <b>null</b>, then a default service will be created. 193 * 194 * @param service the executor service 195 * @param timePeriod the time period 196 * @param timeUnit the unit for the period 197 * @param limit the limit for the semaphore 198 * @throws IllegalArgumentException if the period is less or equals 0 199 */ 200 public TimedSemaphore(ScheduledExecutorService service, long timePeriod, 201 TimeUnit timeUnit, int limit) { 202 if (timePeriod <= 0) { 203 throw new IllegalArgumentException("Time period must be greater 0!"); 204 } 205 206 period = timePeriod; 207 unit = timeUnit; 208 209 if (service != null) { 210 executorService = service; 211 ownExecutor = false; 212 } else { 213 ScheduledThreadPoolExecutor s = new ScheduledThreadPoolExecutor( 214 THREAD_POOL_SIZE); 215 s.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); 216 s.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); 217 executorService = s; 218 ownExecutor = true; 219 } 220 221 setLimit(limit); 222 } 223 224 /** 225 * Returns the limit enforced by this semaphore. The limit determines how 226 * many invocations of {@link #acquire()} are allowed within the monitored 227 * period. 228 * 229 * @return the limit 230 */ 231 public final synchronized int getLimit() { 232 return limit; 233 } 234 235 /** 236 * Sets the limit. This is the number of times the {@link #acquire()} method 237 * can be called within the time period specified. If this limit is reached, 238 * further invocations of {@link #acquire()} will block. Setting the limit 239 * to a value <= {@link #NO_LIMIT} will cause the limit to be disabled, 240 * i.e. an arbitrary number of{@link #acquire()} invocations is allowed in 241 * the time period. 242 * 243 * @param limit the limit 244 */ 245 public final synchronized void setLimit(int limit) { 246 this.limit = limit; 247 } 248 249 /** 250 * Initializes a shutdown. After that the object cannot be used any more. 251 * This method can be invoked an arbitrary number of times. All invocations 252 * after the first one do not have any effect. 253 */ 254 public synchronized void shutdown() { 255 if (!shutdown) { 256 257 if (ownExecutor) { 258 // if the executor was created by this instance, it has 259 // to be shutdown 260 getExecutorService().shutdownNow(); 261 } 262 if (task != null) { 263 task.cancel(false); 264 } 265 266 shutdown = true; 267 } 268 } 269 270 /** 271 * Tests whether the {@link #shutdown()} method has been called on this 272 * object. If this method returns <b>true</b>, this instance cannot be used 273 * any longer. 274 * 275 * @return a flag whether a shutdown has been performed 276 */ 277 public synchronized boolean isShutdown() { 278 return shutdown; 279 } 280 281 /** 282 * Tries to acquire a permit from this semaphore. This method will block if 283 * the limit for the current period has already been reached. If 284 * {@link #shutdown()} has already been invoked, calling this method will 285 * cause an exception. The very first call of this method starts the timer 286 * task which monitors the time period set for this {@code TimedSemaphore}. 287 * From now on the semaphore is active. 288 * 289 * @throws InterruptedException if the thread gets interrupted 290 * @throws IllegalStateException if this semaphore is already shut down 291 */ 292 public synchronized void acquire() throws InterruptedException { 293 if (isShutdown()) { 294 throw new IllegalStateException("TimedSemaphore is shut down!"); 295 } 296 297 if (task == null) { 298 task = startTimer(); 299 } 300 301 boolean canPass = false; 302 do { 303 canPass = getLimit() <= NO_LIMIT || acquireCount < getLimit(); 304 if (!canPass) { 305 wait(); 306 } else { 307 acquireCount++; 308 } 309 } while (!canPass); 310 } 311 312 /** 313 * Returns the number of (successful) acquire invocations during the last 314 * period. This is the number of times the {@link #acquire()} method was 315 * called without blocking. This can be useful for testing or debugging 316 * purposes or to determine a meaningful threshold value. If a limit is set, 317 * the value returned by this method won't be greater than this limit. 318 * 319 * @return the number of non-blocking invocations of the {@link #acquire()} 320 * method 321 */ 322 public synchronized int getLastAcquiresPerPeriod() { 323 return lastCallsPerPeriod; 324 } 325 326 /** 327 * Returns the number of invocations of the {@link #acquire()} method for 328 * the current period. This may be useful for testing or debugging purposes. 329 * 330 * @return the current number of {@link #acquire()} invocations 331 */ 332 public synchronized int getAcquireCount() { 333 return acquireCount; 334 } 335 336 /** 337 * Returns the number of calls to the {@link #acquire()} method that can 338 * still be performed in the current period without blocking. This method 339 * can give an indication whether it is safe to call the {@link #acquire()} 340 * method without risking to be suspended. However, there is no guarantee 341 * that a subsequent call to {@link #acquire()} actually is not-blocking 342 * because in the mean time other threads may have invoked the semaphore. 343 * 344 * @return the current number of available {@link #acquire()} calls in the 345 * current period 346 */ 347 public synchronized int getAvailablePermits() { 348 return getLimit() - getAcquireCount(); 349 } 350 351 /** 352 * Returns the average number of successful (i.e. non-blocking) 353 * {@link #acquire()} invocations for the entire life-time of this {@code 354 * TimedSemaphore}. This method can be used for instance for statistical 355 * calculations. 356 * 357 * @return the average number of {@link #acquire()} invocations per time 358 * unit 359 */ 360 public synchronized double getAverageCallsPerPeriod() { 361 return periodCount == 0 ? 0 : (double) totalAcquireCount 362 / (double) periodCount; 363 } 364 365 /** 366 * Returns the time period. This is the time monitored by this semaphore. 367 * Only a given number of invocations of the {@link #acquire()} method is 368 * possible in this period. 369 * 370 * @return the time period 371 */ 372 public long getPeriod() { 373 return period; 374 } 375 376 /** 377 * Returns the time unit. This is the unit used by {@link #getPeriod()}. 378 * 379 * @return the time unit 380 */ 381 public TimeUnit getUnit() { 382 return unit; 383 } 384 385 /** 386 * Returns the executor service used by this instance. 387 * 388 * @return the executor service 389 */ 390 protected ScheduledExecutorService getExecutorService() { 391 return executorService; 392 } 393 394 /** 395 * Starts the timer. This method is called when {@link #acquire()} is called 396 * for the first time. It schedules a task to be executed at fixed rate to 397 * monitor the time period specified. 398 * 399 * @return a future object representing the task scheduled 400 */ 401 protected ScheduledFuture<?> startTimer() { 402 return getExecutorService().scheduleAtFixedRate(new Runnable() { 403 public void run() { 404 endOfPeriod(); 405 } 406 }, getPeriod(), getPeriod(), getUnit()); 407 } 408 409 /** 410 * The current time period is finished. This method is called by the timer 411 * used internally to monitor the time period. It resets the counter and 412 * releases the threads waiting for this barrier. 413 */ 414 synchronized void endOfPeriod() { 415 lastCallsPerPeriod = acquireCount; 416 totalAcquireCount += acquireCount; 417 periodCount++; 418 acquireCount = 0; 419 notifyAll(); 420 } 421 }