001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.commons.lang3.concurrent; 018 019import org.apache.commons.lang3.Validate; 020 021import java.util.concurrent.ScheduledExecutorService; 022import java.util.concurrent.ScheduledFuture; 023import java.util.concurrent.ScheduledThreadPoolExecutor; 024import java.util.concurrent.TimeUnit; 025 026/** 027 * <p> 028 * A specialized <em>semaphore</em> implementation that provides a number of 029 * permits in a given time frame. 030 * </p> 031 * <p> 032 * This class is similar to the {@code java.util.concurrent.Semaphore} class 033 * provided by the JDK in that it manages a configurable number of permits. 034 * Using the {@link #acquire()} method a permit can be requested by a thread. 035 * However, there is an additional timing dimension: there is no {@code 036 * release()} method for freeing a permit, but all permits are automatically 037 * released at the end of a configurable time frame. If a thread calls 038 * {@link #acquire()} and the available permits are already exhausted for this 039 * time frame, the thread is blocked. When the time frame ends all permits 040 * requested so far are restored, and blocking threads are waked up again, so 041 * that they can try to acquire a new permit. This basically means that in the 042 * specified time frame only the given number of operations is possible. 043 * </p> 044 * <p> 045 * A use case for this class is to artificially limit the load produced by a 046 * process. As an example consider an application that issues database queries 047 * on a production system in a background process to gather statistical 048 * information. This background processing should not produce so much database 049 * load that the functionality and the performance of the production system are 050 * impacted. Here a {@code TimedSemaphore} could be installed to guarantee that 051 * only a given number of database queries are issued per second. 052 * </p> 053 * <p> 054 * A thread class for performing database queries could look as follows: 055 * </p> 056 * 057 * <pre> 058 * public class StatisticsThread extends Thread { 059 * // The semaphore for limiting database load. 060 * private final TimedSemaphore semaphore; 061 * // Create an instance and set the semaphore 062 * public StatisticsThread(TimedSemaphore timedSemaphore) { 063 * semaphore = timedSemaphore; 064 * } 065 * // Gather statistics 066 * public void run() { 067 * try { 068 * while(true) { 069 * semaphore.acquire(); // limit database load 070 * performQuery(); // issue a query 071 * } 072 * } catch(InterruptedException) { 073 * // fall through 074 * } 075 * } 076 * ... 077 * } 078 * </pre> 079 * 080 * <p> 081 * The following code fragment shows how a {@code TimedSemaphore} is created 082 * that allows only 10 operations per second and passed to the statistics 083 * thread: 084 * </p> 085 * 086 * <pre> 087 * TimedSemaphore sem = new TimedSemaphore(1, TimeUnit.SECOND, 10); 088 * StatisticsThread thread = new StatisticsThread(sem); 089 * thread.start(); 090 * </pre> 091 * 092 * <p> 093 * When creating an instance the time period for the semaphore must be 094 * specified. {@code TimedSemaphore} uses an executor service with a 095 * corresponding period to monitor this interval. The {@code 096 * ScheduledExecutorService} to be used for this purpose can be provided at 097 * construction time. Alternatively the class creates an internal executor 098 * service. 099 * </p> 100 * <p> 101 * Client code that uses {@code TimedSemaphore} has to call the 102 * {@link #acquire()} method in each processing step. {@code TimedSemaphore} 103 * keeps track of the number of invocations of the {@link #acquire()} method and 104 * blocks the calling thread if the counter exceeds the limit specified. When 105 * the timer signals the end of the time period the counter is reset and all 106 * waiting threads are released. Then another cycle can start. 107 * </p> 108 * <p> 109 * An alternative to {@code acquire()} is the {@link #tryAcquire()} method. This 110 * method checks whether the semaphore is under the specified limit and 111 * increases the internal counter if this is the case. The return value is then 112 * <strong>true</strong>, and the calling thread can continue with its action. 113 * If the semaphore is already at its limit, {@code tryAcquire()} immediately 114 * returns <strong>false</strong> without blocking; the calling thread must 115 * then abort its action. This usage scenario prevents blocking of threads. 116 * </p> 117 * <p> 118 * It is possible to modify the limit at any time using the 119 * {@link #setLimit(int)} method. This is useful if the load produced by an 120 * operation has to be adapted dynamically. In the example scenario with the 121 * thread collecting statistics it may make sense to specify a low limit during 122 * day time while allowing a higher load in the night time. Reducing the limit 123 * takes effect immediately by blocking incoming callers. If the limit is 124 * increased, waiting threads are not released immediately, but wake up when the 125 * timer runs out. Then, in the next period more processing steps can be 126 * performed without blocking. By setting the limit to 0 the semaphore can be 127 * switched off: in this mode the {@link #acquire()} method never blocks, but 128 * lets all callers pass directly. 129 * </p> 130 * <p> 131 * When the {@code TimedSemaphore} is no more needed its {@link #shutdown()} 132 * method should be called. This causes the periodic task that monitors the time 133 * interval to be canceled. If the {@code ScheduledExecutorService} has been 134 * created by the semaphore at construction time, it is also shut down. 135 * resources. After that {@link #acquire()} must not be called any more. 136 * </p> 137 * 138 * @since 3.0 139 */ 140public class TimedSemaphore { 141 /** 142 * Constant for a value representing no limit. If the limit is set to a 143 * value less or equal this constant, the {@code TimedSemaphore} will be 144 * effectively switched off. 145 */ 146 public static final int NO_LIMIT = 0; 147 148 /** Constant for the thread pool size for the executor. */ 149 private static final int THREAD_POOL_SIZE = 1; 150 151 /** The executor service for managing the timer thread. */ 152 private final ScheduledExecutorService executorService; 153 154 /** Stores the period for this timed semaphore. */ 155 private final long period; 156 157 /** The time unit for the period. */ 158 private final TimeUnit unit; 159 160 /** A flag whether the executor service was created by this object. */ 161 private final boolean ownExecutor; 162 163 /** A future object representing the timer task. */ 164 private ScheduledFuture<?> task; // @GuardedBy("this") 165 166 /** Stores the total number of invocations of the acquire() method. */ 167 private long totalAcquireCount; // @GuardedBy("this") 168 169 /** 170 * The counter for the periods. This counter is increased every time a 171 * period ends. 172 */ 173 private long periodCount; // @GuardedBy("this") 174 175 /** The limit. */ 176 private int limit; // @GuardedBy("this") 177 178 /** The current counter. */ 179 private int acquireCount; // @GuardedBy("this") 180 181 /** The number of invocations of acquire() in the last period. */ 182 private int lastCallsPerPeriod; // @GuardedBy("this") 183 184 /** A flag whether shutdown() was called. */ 185 private boolean shutdown; // @GuardedBy("this") 186 187 /** 188 * Creates a new instance of {@link TimedSemaphore} and initializes it with 189 * the given time period and the limit. 190 * 191 * @param timePeriod the time period 192 * @param timeUnit the unit for the period 193 * @param limit the limit for the semaphore 194 * @throws IllegalArgumentException if the period is less or equals 0 195 */ 196 public TimedSemaphore(final long timePeriod, final TimeUnit timeUnit, final int limit) { 197 this(null, timePeriod, timeUnit, limit); 198 } 199 200 /** 201 * Creates a new instance of {@link TimedSemaphore} and initializes it with 202 * an executor service, the given time period, and the limit. The executor 203 * service will be used for creating a periodic task for monitoring the time 204 * period. It can be <b>null</b>, then a default service will be created. 205 * 206 * @param service the executor service 207 * @param timePeriod the time period 208 * @param timeUnit the unit for the period 209 * @param limit the limit for the semaphore 210 * @throws IllegalArgumentException if the period is less or equals 0 211 */ 212 public TimedSemaphore(final ScheduledExecutorService service, final long timePeriod, 213 final TimeUnit timeUnit, final int limit) { 214 Validate.inclusiveBetween(1, Long.MAX_VALUE, timePeriod, "Time period must be greater than 0!"); 215 216 period = timePeriod; 217 unit = timeUnit; 218 219 if (service != null) { 220 executorService = service; 221 ownExecutor = false; 222 } else { 223 final ScheduledThreadPoolExecutor s = new ScheduledThreadPoolExecutor( 224 THREAD_POOL_SIZE); 225 s.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); 226 s.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); 227 executorService = s; 228 ownExecutor = true; 229 } 230 231 setLimit(limit); 232 } 233 234 /** 235 * Returns the limit enforced by this semaphore. The limit determines how 236 * many invocations of {@link #acquire()} are allowed within the monitored 237 * period. 238 * 239 * @return the limit 240 */ 241 public final synchronized int getLimit() { 242 return limit; 243 } 244 245 /** 246 * Sets the limit. This is the number of times the {@link #acquire()} method 247 * can be called within the time period specified. If this limit is reached, 248 * further invocations of {@link #acquire()} will block. Setting the limit 249 * to a value <= {@link #NO_LIMIT} will cause the limit to be disabled, 250 * i.e. an arbitrary number of{@link #acquire()} invocations is allowed in 251 * the time period. 252 * 253 * @param limit the limit 254 */ 255 public final synchronized void setLimit(final int limit) { 256 this.limit = limit; 257 } 258 259 /** 260 * Initializes a shutdown. After that the object cannot be used any more. 261 * This method can be invoked an arbitrary number of times. All invocations 262 * after the first one do not have any effect. 263 */ 264 public synchronized void shutdown() { 265 if (!shutdown) { 266 267 if (ownExecutor) { 268 // if the executor was created by this instance, it has 269 // to be shutdown 270 getExecutorService().shutdownNow(); 271 } 272 if (task != null) { 273 task.cancel(false); 274 } 275 276 shutdown = true; 277 } 278 } 279 280 /** 281 * Tests whether the {@link #shutdown()} method has been called on this 282 * object. If this method returns <b>true</b>, this instance cannot be used 283 * any longer. 284 * 285 * @return a flag whether a shutdown has been performed 286 */ 287 public synchronized boolean isShutdown() { 288 return shutdown; 289 } 290 291 /** 292 * Acquires a permit from this semaphore. This method will block if 293 * the limit for the current period has already been reached. If 294 * {@link #shutdown()} has already been invoked, calling this method will 295 * cause an exception. The very first call of this method starts the timer 296 * task which monitors the time period set for this {@code TimedSemaphore}. 297 * From now on the semaphore is active. 298 * 299 * @throws InterruptedException if the thread gets interrupted 300 * @throws IllegalStateException if this semaphore is already shut down 301 */ 302 public synchronized void acquire() throws InterruptedException { 303 prepareAcquire(); 304 305 boolean canPass; 306 do { 307 canPass = acquirePermit(); 308 if (!canPass) { 309 wait(); 310 } 311 } while (!canPass); 312 } 313 314 /** 315 * Tries to acquire a permit from this semaphore. If the limit of this semaphore has 316 * not yet been reached, a permit is acquired, and this method returns 317 * <strong>true</strong>. Otherwise, this method returns immediately with the result 318 * <strong>false</strong>. 319 * 320 * @return <strong>true</strong> if a permit could be acquired; <strong>false</strong> 321 * otherwise 322 * @throws IllegalStateException if this semaphore is already shut down 323 * @since 3.5 324 */ 325 public synchronized boolean tryAcquire() { 326 prepareAcquire(); 327 return acquirePermit(); 328 } 329 330 /** 331 * Returns the number of (successful) acquire invocations during the last 332 * period. This is the number of times the {@link #acquire()} method was 333 * called without blocking. This can be useful for testing or debugging 334 * purposes or to determine a meaningful threshold value. If a limit is set, 335 * the value returned by this method won't be greater than this limit. 336 * 337 * @return the number of non-blocking invocations of the {@link #acquire()} 338 * method 339 */ 340 public synchronized int getLastAcquiresPerPeriod() { 341 return lastCallsPerPeriod; 342 } 343 344 /** 345 * Returns the number of invocations of the {@link #acquire()} method for 346 * the current period. This may be useful for testing or debugging purposes. 347 * 348 * @return the current number of {@link #acquire()} invocations 349 */ 350 public synchronized int getAcquireCount() { 351 return acquireCount; 352 } 353 354 /** 355 * Returns the number of calls to the {@link #acquire()} method that can 356 * still be performed in the current period without blocking. This method 357 * can give an indication whether it is safe to call the {@link #acquire()} 358 * method without risking to be suspended. However, there is no guarantee 359 * that a subsequent call to {@link #acquire()} actually is not-blocking 360 * because in the mean time other threads may have invoked the semaphore. 361 * 362 * @return the current number of available {@link #acquire()} calls in the 363 * current period 364 */ 365 public synchronized int getAvailablePermits() { 366 return getLimit() - getAcquireCount(); 367 } 368 369 /** 370 * Returns the average number of successful (i.e. non-blocking) 371 * {@link #acquire()} invocations for the entire life-time of this {@code 372 * TimedSemaphore}. This method can be used for instance for statistical 373 * calculations. 374 * 375 * @return the average number of {@link #acquire()} invocations per time 376 * unit 377 */ 378 public synchronized double getAverageCallsPerPeriod() { 379 return periodCount == 0 ? 0 : (double) totalAcquireCount 380 / (double) periodCount; 381 } 382 383 /** 384 * Returns the time period. This is the time monitored by this semaphore. 385 * Only a given number of invocations of the {@link #acquire()} method is 386 * possible in this period. 387 * 388 * @return the time period 389 */ 390 public long getPeriod() { 391 return period; 392 } 393 394 /** 395 * Returns the time unit. This is the unit used by {@link #getPeriod()}. 396 * 397 * @return the time unit 398 */ 399 public TimeUnit getUnit() { 400 return unit; 401 } 402 403 /** 404 * Returns the executor service used by this instance. 405 * 406 * @return the executor service 407 */ 408 protected ScheduledExecutorService getExecutorService() { 409 return executorService; 410 } 411 412 /** 413 * Starts the timer. This method is called when {@link #acquire()} is called 414 * for the first time. It schedules a task to be executed at fixed rate to 415 * monitor the time period specified. 416 * 417 * @return a future object representing the task scheduled 418 */ 419 protected ScheduledFuture<?> startTimer() { 420 return getExecutorService().scheduleAtFixedRate(new Runnable() { 421 @Override 422 public void run() { 423 endOfPeriod(); 424 } 425 }, getPeriod(), getPeriod(), getUnit()); 426 } 427 428 /** 429 * The current time period is finished. This method is called by the timer 430 * used internally to monitor the time period. It resets the counter and 431 * releases the threads waiting for this barrier. 432 */ 433 synchronized void endOfPeriod() { 434 lastCallsPerPeriod = acquireCount; 435 totalAcquireCount += acquireCount; 436 periodCount++; 437 acquireCount = 0; 438 notifyAll(); 439 } 440 441 /** 442 * Prepares an acquire operation. Checks for the current state and starts the internal 443 * timer if necessary. This method must be called with the lock of this object held. 444 */ 445 private void prepareAcquire() { 446 if (isShutdown()) { 447 throw new IllegalStateException("TimedSemaphore is shut down!"); 448 } 449 450 if (task == null) { 451 task = startTimer(); 452 } 453 } 454 455 /** 456 * Internal helper method for acquiring a permit. This method checks whether currently 457 * a permit can be acquired and - if so - increases the internal counter. The return 458 * value indicates whether a permit could be acquired. This method must be called with 459 * the lock of this object held. 460 * 461 * @return a flag whether a permit could be acquired 462 */ 463 private boolean acquirePermit() { 464 if (getLimit() <= NO_LIMIT || acquireCount < getLimit()) { 465 acquireCount++; 466 return true; 467 } 468 return false; 469 } 470}