001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.commons.lang3.concurrent; 018 019import java.util.concurrent.ScheduledExecutorService; 020import java.util.concurrent.ScheduledFuture; 021import java.util.concurrent.ScheduledThreadPoolExecutor; 022import java.util.concurrent.TimeUnit; 023 024import org.apache.commons.lang3.Validate; 025 026/** 027 * A specialized <em>semaphore</em> implementation that provides a number of 028 * permits in a given time frame. 029 * 030 * <p> 031 * This class is similar to the {@link java.util.concurrent.Semaphore} class 032 * provided by the JDK in that it manages a configurable number of permits. 033 * Using the {@link #acquire()} method a permit can be requested by a thread. 034 * However, there is an additional timing dimension: there is no {@code 035 * release()} method for freeing a permit, but all permits are automatically 036 * released at the end of a configurable time frame. If a thread calls 037 * {@link #acquire()} and the available permits are already exhausted for this 038 * time frame, the thread is blocked. When the time frame ends all permits 039 * requested so far are restored, and blocking threads are waked up again, so 040 * that they can try to acquire a new permit. This basically means that in the 041 * specified time frame only the given number of operations is possible. 042 * </p> 043 * <p> 044 * A use case for this class is to artificially limit the load produced by a 045 * process. As an example consider an application that issues database queries 046 * on a production system in a background process to gather statistical 047 * information. This background processing should not produce so much database 048 * load that the functionality and the performance of the production system are 049 * impacted. Here a {@link TimedSemaphore} could be installed to guarantee that 050 * only a given number of database queries are issued per second. 051 * </p> 052 * <p> 053 * A thread class for performing database queries could look as follows: 054 * </p> 055 * 056 * <pre> 057 * public class StatisticsThread extends Thread { 058 * // The semaphore for limiting database load. 059 * private final TimedSemaphore semaphore; 060 * // Create an instance and set the semaphore 061 * public StatisticsThread(TimedSemaphore timedSemaphore) { 062 * semaphore = timedSemaphore; 063 * } 064 * // Gather statistics 065 * public void run() { 066 * try { 067 * while (true) { 068 * semaphore.acquire(); // limit database load 069 * performQuery(); // issue a query 070 * } 071 * } catch (InterruptedException) { 072 * // fall through 073 * } 074 * } 075 * ... 076 * } 077 * </pre> 078 * 079 * <p> 080 * The following code fragment shows how a {@link TimedSemaphore} is created 081 * that allows only 10 operations per second and passed to the statistics 082 * thread: 083 * </p> 084 * 085 * <pre> 086 * TimedSemaphore sem = new TimedSemaphore(1, TimeUnit.SECOND, 10); 087 * StatisticsThread thread = new StatisticsThread(sem); 088 * thread.start(); 089 * </pre> 090 * 091 * <p> 092 * When creating an instance the time period for the semaphore must be 093 * specified. {@link TimedSemaphore} uses an executor service with a 094 * corresponding period to monitor this interval. The {@code 095 * ScheduledExecutorService} to be used for this purpose can be provided at 096 * construction time. Alternatively the class creates an internal executor 097 * service. 098 * </p> 099 * <p> 100 * Client code that uses {@link TimedSemaphore} has to call the 101 * {@link #acquire()} method in each processing step. {@link TimedSemaphore} 102 * keeps track of the number of invocations of the {@link #acquire()} method and 103 * blocks the calling thread if the counter exceeds the limit specified. When 104 * the timer signals the end of the time period the counter is reset and all 105 * waiting threads are released. Then another cycle can start. 106 * </p> 107 * <p> 108 * An alternative to {@code acquire()} is the {@link #tryAcquire()} method. This 109 * method checks whether the semaphore is under the specified limit and 110 * increases the internal counter if this is the case. The return value is then 111 * <strong>true</strong>, and the calling thread can continue with its action. 112 * If the semaphore is already at its limit, {@code tryAcquire()} immediately 113 * returns <strong>false</strong> without blocking; the calling thread must 114 * then abort its action. This usage scenario prevents blocking of threads. 115 * </p> 116 * <p> 117 * It is possible to modify the limit at any time using the 118 * {@link #setLimit(int)} method. This is useful if the load produced by an 119 * operation has to be adapted dynamically. In the example scenario with the 120 * thread collecting statistics it may make sense to specify a low limit during 121 * day time while allowing a higher load in the night time. Reducing the limit 122 * takes effect immediately by blocking incoming callers. If the limit is 123 * increased, waiting threads are not released immediately, but wake up when the 124 * timer runs out. Then, in the next period more processing steps can be 125 * performed without blocking. By setting the limit to 0 the semaphore can be 126 * switched off: in this mode the {@link #acquire()} method never blocks, but 127 * lets all callers pass directly. 128 * </p> 129 * <p> 130 * When the {@link TimedSemaphore} is no more needed its {@link #shutdown()} 131 * method should be called. This causes the periodic task that monitors the time 132 * interval to be canceled. If the {@link ScheduledExecutorService} has been 133 * created by the semaphore at construction time, it is also shut down. 134 * resources. After that {@link #acquire()} must not be called any more. 135 * </p> 136 * 137 * @since 3.0 138 */ 139public class TimedSemaphore { 140 /** 141 * Constant for a value representing no limit. If the limit is set to a 142 * value less or equal this constant, the {@link TimedSemaphore} will be 143 * effectively switched off. 144 */ 145 public static final int NO_LIMIT = 0; 146 147 /** Constant for the thread pool size for the executor. */ 148 private static final int THREAD_POOL_SIZE = 1; 149 150 /** The executor service for managing the timer thread. */ 151 private final ScheduledExecutorService executorService; 152 153 /** Stores the period for this timed semaphore. */ 154 private final long period; 155 156 /** The time unit for the period. */ 157 private final TimeUnit unit; 158 159 /** A flag whether the executor service was created by this object. */ 160 private final boolean ownExecutor; 161 162 /** A future object representing the timer task. */ 163 private ScheduledFuture<?> task; // @GuardedBy("this") 164 165 /** Stores the total number of invocations of the acquire() method. */ 166 private long totalAcquireCount; // @GuardedBy("this") 167 168 /** 169 * The counter for the periods. This counter is increased every time a 170 * period ends. 171 */ 172 private long periodCount; // @GuardedBy("this") 173 174 /** The limit. */ 175 private int limit; // @GuardedBy("this") 176 177 /** The current counter. */ 178 private int acquireCount; // @GuardedBy("this") 179 180 /** The number of invocations of acquire() in the last period. */ 181 private int lastCallsPerPeriod; // @GuardedBy("this") 182 183 /** A flag whether shutdown() was called. */ 184 private boolean shutdown; // @GuardedBy("this") 185 186 /** 187 * Creates a new instance of {@link TimedSemaphore} and initializes it with 188 * the given time period and the limit. 189 * 190 * @param timePeriod the time period 191 * @param timeUnit the unit for the period 192 * @param limit the limit for the semaphore 193 * @throws IllegalArgumentException if the period is less or equals 0 194 */ 195 public TimedSemaphore(final long timePeriod, final TimeUnit timeUnit, final int limit) { 196 this(null, timePeriod, timeUnit, limit); 197 } 198 199 /** 200 * Creates a new instance of {@link TimedSemaphore} and initializes it with 201 * an executor service, the given time period, and the limit. The executor 202 * service will be used for creating a periodic task for monitoring the time 203 * period. It can be <b>null</b>, then a default service will be created. 204 * 205 * @param service the executor service 206 * @param timePeriod the time period 207 * @param timeUnit the unit for the period 208 * @param limit the limit for the semaphore 209 * @throws IllegalArgumentException if the period is less or equals 0 210 */ 211 public TimedSemaphore(final ScheduledExecutorService service, final long timePeriod, 212 final TimeUnit timeUnit, final int limit) { 213 Validate.inclusiveBetween(1, Long.MAX_VALUE, timePeriod, "Time period must be greater than 0!"); 214 215 period = timePeriod; 216 unit = timeUnit; 217 218 if (service != null) { 219 executorService = service; 220 ownExecutor = false; 221 } else { 222 final ScheduledThreadPoolExecutor s = new ScheduledThreadPoolExecutor( 223 THREAD_POOL_SIZE); 224 s.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); 225 s.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); 226 executorService = s; 227 ownExecutor = true; 228 } 229 230 setLimit(limit); 231 } 232 233 /** 234 * Acquires a permit from this semaphore. This method will block if 235 * the limit for the current period has already been reached. If 236 * {@link #shutdown()} has already been invoked, calling this method will 237 * cause an exception. The very first call of this method starts the timer 238 * task which monitors the time period set for this {@link TimedSemaphore}. 239 * From now on the semaphore is active. 240 * 241 * @throws InterruptedException if the thread gets interrupted 242 * @throws IllegalStateException if this semaphore is already shut down 243 */ 244 public synchronized void acquire() throws InterruptedException { 245 prepareAcquire(); 246 247 boolean canPass; 248 do { 249 canPass = acquirePermit(); 250 if (!canPass) { 251 wait(); 252 } 253 } while (!canPass); 254 } 255 256 /** 257 * Internal helper method for acquiring a permit. This method checks whether currently 258 * a permit can be acquired and - if so - increases the internal counter. The return 259 * value indicates whether a permit could be acquired. This method must be called with 260 * the lock of this object held. 261 * 262 * @return a flag whether a permit could be acquired 263 */ 264 private boolean acquirePermit() { 265 if (getLimit() <= NO_LIMIT || acquireCount < getLimit()) { 266 acquireCount++; 267 return true; 268 } 269 return false; 270 } 271 272 /** 273 * The current time period is finished. This method is called by the timer 274 * used internally to monitor the time period. It resets the counter and 275 * releases the threads waiting for this barrier. 276 */ 277 synchronized void endOfPeriod() { 278 lastCallsPerPeriod = acquireCount; 279 totalAcquireCount += acquireCount; 280 periodCount++; 281 acquireCount = 0; 282 notifyAll(); 283 } 284 285 /** 286 * Returns the number of invocations of the {@link #acquire()} method for 287 * the current period. This may be useful for testing or debugging purposes. 288 * 289 * @return the current number of {@link #acquire()} invocations 290 */ 291 public synchronized int getAcquireCount() { 292 return acquireCount; 293 } 294 295 /** 296 * Returns the number of calls to the {@link #acquire()} method that can 297 * still be performed in the current period without blocking. This method 298 * can give an indication whether it is safe to call the {@link #acquire()} 299 * method without risking to be suspended. However, there is no guarantee 300 * that a subsequent call to {@link #acquire()} actually is not-blocking 301 * because in the meantime other threads may have invoked the semaphore. 302 * 303 * @return the current number of available {@link #acquire()} calls in the 304 * current period 305 */ 306 public synchronized int getAvailablePermits() { 307 return getLimit() - getAcquireCount(); 308 } 309 310 /** 311 * Returns the average number of successful (i.e. non-blocking) 312 * {@link #acquire()} invocations for the entire life-time of this {@code 313 * TimedSemaphore}. This method can be used for instance for statistical 314 * calculations. 315 * 316 * @return the average number of {@link #acquire()} invocations per time 317 * unit 318 */ 319 public synchronized double getAverageCallsPerPeriod() { 320 return periodCount == 0 ? 0 : (double) totalAcquireCount 321 / (double) periodCount; 322 } 323 324 /** 325 * Returns the executor service used by this instance. 326 * 327 * @return the executor service 328 */ 329 protected ScheduledExecutorService getExecutorService() { 330 return executorService; 331 } 332 333 /** 334 * Returns the number of (successful) acquire invocations during the last 335 * period. This is the number of times the {@link #acquire()} method was 336 * called without blocking. This can be useful for testing or debugging 337 * purposes or to determine a meaningful threshold value. If a limit is set, 338 * the value returned by this method won't be greater than this limit. 339 * 340 * @return the number of non-blocking invocations of the {@link #acquire()} 341 * method 342 */ 343 public synchronized int getLastAcquiresPerPeriod() { 344 return lastCallsPerPeriod; 345 } 346 347 /** 348 * Returns the limit enforced by this semaphore. The limit determines how 349 * many invocations of {@link #acquire()} are allowed within the monitored 350 * period. 351 * 352 * @return the limit 353 */ 354 public final synchronized int getLimit() { 355 return limit; 356 } 357 358 /** 359 * Returns the time period. This is the time monitored by this semaphore. 360 * Only a given number of invocations of the {@link #acquire()} method is 361 * possible in this period. 362 * 363 * @return the time period 364 */ 365 public long getPeriod() { 366 return period; 367 } 368 369 /** 370 * Returns the time unit. This is the unit used by {@link #getPeriod()}. 371 * 372 * @return the time unit 373 */ 374 public TimeUnit getUnit() { 375 return unit; 376 } 377 378 /** 379 * Tests whether the {@link #shutdown()} method has been called on this 380 * object. If this method returns <b>true</b>, this instance cannot be used 381 * any longer. 382 * 383 * @return a flag whether a shutdown has been performed 384 */ 385 public synchronized boolean isShutdown() { 386 return shutdown; 387 } 388 389 /** 390 * Prepares an acquire operation. Checks for the current state and starts the internal 391 * timer if necessary. This method must be called with the lock of this object held. 392 */ 393 private void prepareAcquire() { 394 if (isShutdown()) { 395 throw new IllegalStateException("TimedSemaphore is shut down!"); 396 } 397 398 if (task == null) { 399 task = startTimer(); 400 } 401 } 402 403 /** 404 * Sets the limit. This is the number of times the {@link #acquire()} method 405 * can be called within the time period specified. If this limit is reached, 406 * further invocations of {@link #acquire()} will block. Setting the limit 407 * to a value <= {@link #NO_LIMIT} will cause the limit to be disabled, 408 * i.e. an arbitrary number of{@link #acquire()} invocations is allowed in 409 * the time period. 410 * 411 * @param limit the limit 412 */ 413 public final synchronized void setLimit(final int limit) { 414 this.limit = limit; 415 } 416 417 /** 418 * Initializes a shutdown. After that the object cannot be used anymore. 419 * This method can be invoked an arbitrary number of times. All invocations 420 * after the first one do not have any effect. 421 */ 422 public synchronized void shutdown() { 423 if (!shutdown) { 424 425 if (ownExecutor) { 426 // if the executor was created by this instance, it has 427 // to be shutdown 428 getExecutorService().shutdownNow(); 429 } 430 if (task != null) { 431 task.cancel(false); 432 } 433 434 shutdown = true; 435 } 436 } 437 438 /** 439 * Starts the timer. This method is called when {@link #acquire()} is called 440 * for the first time. It schedules a task to be executed at fixed rate to 441 * monitor the time period specified. 442 * 443 * @return a future object representing the task scheduled 444 */ 445 protected ScheduledFuture<?> startTimer() { 446 return getExecutorService().scheduleAtFixedRate(this::endOfPeriod, getPeriod(), getPeriod(), getUnit()); 447 } 448 449 /** 450 * Tries to acquire a permit from this semaphore. If the limit of this semaphore has 451 * not yet been reached, a permit is acquired, and this method returns 452 * <strong>true</strong>. Otherwise, this method returns immediately with the result 453 * <strong>false</strong>. 454 * 455 * @return <strong>true</strong> if a permit could be acquired; <strong>false</strong> 456 * otherwise 457 * @throws IllegalStateException if this semaphore is already shut down 458 * @since 3.5 459 */ 460 public synchronized boolean tryAcquire() { 461 prepareAcquire(); 462 return acquirePermit(); 463 } 464}