001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * https://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018package org.apache.commons.lang3.concurrent; 019 020import java.util.concurrent.ScheduledExecutorService; 021import java.util.concurrent.ScheduledFuture; 022import java.util.concurrent.ScheduledThreadPoolExecutor; 023import java.util.concurrent.TimeUnit; 024import java.util.function.Supplier; 025 026import org.apache.commons.lang3.Validate; 027 028/** 029 * A specialized <em>semaphore</em> implementation that provides a number of permits in a given time frame. 030 * 031 * <p> 032 * This class is similar to the {@link java.util.concurrent.Semaphore} class provided by the JDK in that it manages a configurable number of permits. Using the 033 * {@link #acquire()} method a permit can be requested by a thread. However, there is an additional timing dimension: there is no {@code 034 * release()} method for freeing a permit, but all permits are automatically released at the end of a configurable time frame. If a thread calls 035 * {@link #acquire()} and the available permits are already exhausted for this time frame, the thread is blocked. When the time frame ends all permits requested 036 * so far are restored, and blocking threads are waked up again, so that they can try to acquire a new permit. This basically means that in the specified time 037 * frame only the given number of operations is possible. 038 * </p> 039 * <p> 040 * A use case for this class is to artificially limit the load produced by a process. As an example consider an application that issues database queries on a 041 * production system in a background process to gather statistical information. This background processing should not produce so much database load that the 042 * functionality and the performance of the production system are impacted. Here a {@link TimedSemaphore} could be installed to guarantee that only a given 043 * number of database queries are issued per second. 044 * </p> 045 * <p> 046 * A thread class for performing database queries could look as follows: 047 * </p> 048 * 049 * <pre> 050 * public class StatisticsThread extends Thread { 051 * // The semaphore for limiting database load. 052 * private final TimedSemaphore semaphore; 053 * // Create an instance and set the semaphore 054 * public StatisticsThread(TimedSemaphore timedSemaphore) { 055 * semaphore = timedSemaphore; 056 * } 057 * // Gather statistics 058 * public void run() { 059 * try { 060 * while (true) { 061 * semaphore.acquire(); // limit database load 062 * performQuery(); // issue a query 063 * } 064 * } catch (InterruptedException) { 065 * // fall through 066 * } 067 * } 068 * ... 069 * } 070 * </pre> 071 * 072 * <p> 073 * The following code fragment shows how a {@link TimedSemaphore} is created that allows only 10 operations per second and passed to the statistics thread: 074 * </p> 075 * 076 * <pre> 077 * TimedSemaphore sem = new TimedSemaphore(1, TimeUnit.SECOND, 10); 078 * StatisticsThread thread = new StatisticsThread(sem); 079 * thread.start(); 080 * </pre> 081 * 082 * <p> 083 * When creating an instance the time period for the semaphore must be specified. {@link TimedSemaphore} uses an executor service with a corresponding period to 084 * monitor this interval. The {@code 085 * ScheduledExecutorService} to be used for this purpose can be provided at construction time. Alternatively the class creates an internal executor service. 086 * </p> 087 * <p> 088 * Client code that uses {@link TimedSemaphore} has to call the {@link #acquire()} method in each processing step. {@link TimedSemaphore} keeps track of the 089 * number of invocations of the {@link #acquire()} method and blocks the calling thread if the counter exceeds the limit specified. When the timer signals the 090 * end of the time period the counter is reset and all waiting threads are released. Then another cycle can start. 091 * </p> 092 * <p> 093 * An alternative to {@code acquire()} is the {@link #tryAcquire()} method. This method checks whether the semaphore is under the specified limit and increases 094 * the internal counter if this is the case. The return value is then <strong>true</strong>, and the calling thread can continue with its action. If the 095 * semaphore is already at its limit, {@code tryAcquire()} immediately returns <strong>false</strong> without blocking; the calling thread must then abort its 096 * action. This usage scenario prevents blocking of threads. 097 * </p> 098 * <p> 099 * It is possible to modify the limit at any time using the {@link #setLimit(int)} method. This is useful if the load produced by an operation has to be adapted 100 * dynamically. In the example scenario with the thread collecting statistics it may make sense to specify a low limit during day time while allowing a higher 101 * load in the night time. Reducing the limit takes effect immediately by blocking incoming callers. If the limit is increased, waiting threads are not released 102 * immediately, but wake up when the timer runs out. Then, in the next period more processing steps can be performed without blocking. By setting the limit to 0 103 * the semaphore can be switched off: in this mode the {@link #acquire()} method never blocks, but lets all callers pass directly. 104 * </p> 105 * <p> 106 * When the {@link TimedSemaphore} is no more needed its {@link #shutdown()} method should be called. This causes the periodic task that monitors the time 107 * interval to be canceled. If the {@link ScheduledExecutorService} has been created by the semaphore at construction time, it is also shut down. resources. 108 * After that {@link #acquire()} must not be called any more. 109 * </p> 110 * 111 * @since 3.0 112 */ 113public class TimedSemaphore { 114 115 /** 116 * Builds new {@link TimedSemaphore}. 117 * 118 * @since 3.20.0 119 */ 120 public static class Builder implements Supplier<TimedSemaphore> { 121 122 private ScheduledExecutorService service; 123 private long period; 124 private TimeUnit timeUnit; 125 private int limit; 126 127 /** 128 * Constructs a new Builder. 129 */ 130 public Builder() { 131 // empty 132 } 133 134 @Override 135 public TimedSemaphore get() { 136 return new TimedSemaphore(this); 137 } 138 139 /** 140 * Sets the limit. 141 * 142 * @param limit The limit. 143 * @return {@code this} instance. 144 */ 145 public Builder setLimit(final int limit) { 146 this.limit = limit; 147 return this; 148 } 149 150 /** 151 * Sets the time period. 152 * 153 * @param period The time period. 154 * @return {@code this} instance. 155 */ 156 public Builder setPeriod(final long period) { 157 this.period = period; 158 return this; 159 } 160 161 /** 162 * Sets the executor service. 163 * 164 * @param service The executor service. 165 * @return {@code this} instance. 166 */ 167 public Builder setService(final ScheduledExecutorService service) { 168 this.service = service; 169 return this; 170 } 171 172 /** 173 * Sets the time unit for the period. 174 * 175 * @param timeUnit The time unit for the period. 176 * @return {@code this} instance. 177 */ 178 public Builder setTimeUnit(final TimeUnit timeUnit) { 179 this.timeUnit = timeUnit; 180 return this; 181 } 182 } 183 184 /** 185 * Constant for a value representing no limit. If the limit is set to a value less or equal this constant, the {@link TimedSemaphore} will be effectively 186 * switched off. 187 */ 188 public static final int NO_LIMIT = 0; 189 190 /** Constant for the thread pool size for the executor. */ 191 private static final int THREAD_POOL_SIZE = 1; 192 193 /** 194 * Constructs a new Builder. 195 * 196 * @return a new Builder. 197 * @since 3.20.0 198 */ 199 public static Builder builder() { 200 return new Builder(); 201 } 202 203 /** The executor service for managing the timer thread. */ 204 private final ScheduledExecutorService executorService; 205 206 /** The period for this timed semaphore. */ 207 private final long period; 208 209 /** The time unit for the period. */ 210 private final TimeUnit unit; 211 212 /** A flag whether the executor service was created by this object. */ 213 private final boolean ownExecutor; 214 215 /** A future object representing the timer task. */ 216 private ScheduledFuture<?> task; // @GuardedBy("this") 217 218 /** Stores the total number of invocations of the acquire() method. */ 219 private long totalAcquireCount; // @GuardedBy("this") 220 221 /** 222 * The counter for the periods. This counter is increased every time a period ends. 223 */ 224 private long periodCount; // @GuardedBy("this") 225 226 /** The limit. */ 227 private int limit; // @GuardedBy("this") 228 229 /** The current counter. */ 230 private int acquireCount; // @GuardedBy("this") 231 232 /** The number of invocations of acquire() in the last period. */ 233 private int lastCallsPerPeriod; // @GuardedBy("this") 234 235 /** A flag whether shutdown() was called. */ 236 private boolean shutdown; // @GuardedBy("this") 237 238 private TimedSemaphore(final Builder builder) { 239 Validate.inclusiveBetween(1, Long.MAX_VALUE, builder.period, "Time period must be greater than 0."); 240 period = builder.period; 241 unit = builder.timeUnit; 242 if (builder.service != null) { 243 executorService = builder.service; 244 ownExecutor = false; 245 } else { 246 final ScheduledThreadPoolExecutor stpe = new ScheduledThreadPoolExecutor(THREAD_POOL_SIZE); 247 stpe.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); 248 stpe.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); 249 executorService = stpe; 250 ownExecutor = true; 251 } 252 setLimit(builder.limit); 253 } 254 255 /** 256 * Constructs a new instance of {@link TimedSemaphore} and initializes it with the given time period and the limit. 257 * 258 * @param timePeriod the time period. 259 * @param timeUnit the unit for the period. 260 * @param limit the limit for the semaphore. 261 * @throws IllegalArgumentException if the period is less or equals 0. 262 * @deprecated Use {@link #builder()} and {@link Builder}. 263 */ 264 @Deprecated 265 public TimedSemaphore(final long timePeriod, final TimeUnit timeUnit, final int limit) { 266 this(null, timePeriod, timeUnit, limit); 267 } 268 269 /** 270 * Constructs a new instance of {@link TimedSemaphore} and initializes it with an executor service, the given time period, and the limit. The executor service 271 * will be used for creating a periodic task for monitoring the time period. It can be <strong>null</strong>, then a default service will be created. 272 * 273 * @param service the executor service. 274 * @param timePeriod the time period. 275 * @param timeUnit the unit for the period. 276 * @param limit the limit for the semaphore. 277 * @throws IllegalArgumentException if the period is less or equals 0. 278 * @deprecated Use {@link #builder()} and {@link Builder}. 279 */ 280 @Deprecated 281 public TimedSemaphore(final ScheduledExecutorService service, final long timePeriod, final TimeUnit timeUnit, final int limit) { 282 this(builder().setService(service).setPeriod(timePeriod).setTimeUnit(timeUnit).setLimit(limit)); 283 } 284 285 /** 286 * Acquires a permit from this semaphore. This method will block if the limit for the current period has already been reached. If {@link #shutdown()} has 287 * already been invoked, calling this method will cause an exception. The very first call of this method starts the timer task which monitors the time 288 * period set for this {@link TimedSemaphore}. From now on the semaphore is active. 289 * 290 * @throws InterruptedException if the thread gets interrupted. 291 * @throws IllegalStateException if this semaphore is already shut down. 292 */ 293 public synchronized void acquire() throws InterruptedException { 294 prepareAcquire(); 295 boolean canPass; 296 do { 297 canPass = acquirePermit(); 298 if (!canPass) { 299 wait(); 300 } 301 } while (!canPass); 302 } 303 304 /** 305 * Internal helper method for acquiring a permit. This method checks whether currently a permit can be acquired and - if so - increases the internal 306 * counter. The return value indicates whether a permit could be acquired. This method must be called with the lock of this object held. 307 * 308 * @return a flag whether a permit could be acquired. 309 */ 310 private boolean acquirePermit() { 311 if (getLimit() <= NO_LIMIT || acquireCount < getLimit()) { 312 acquireCount++; 313 return true; 314 } 315 return false; 316 } 317 318 /** 319 * The current time period is finished. This method is called by the timer used internally to monitor the time period. It resets the counter and releases 320 * the threads waiting for this barrier. 321 */ 322 synchronized void endOfPeriod() { 323 lastCallsPerPeriod = acquireCount; 324 totalAcquireCount += acquireCount; 325 periodCount++; 326 acquireCount = 0; 327 notifyAll(); 328 } 329 330 /** 331 * Gets the number of invocations of the {@link #acquire()} method for the current period. This may be useful for testing or debugging purposes. 332 * 333 * @return the current number of {@link #acquire()} invocations. 334 */ 335 public synchronized int getAcquireCount() { 336 return acquireCount; 337 } 338 339 /** 340 * Gets the number of calls to the {@link #acquire()} method that can still be performed in the current period without blocking. This method can give an 341 * indication whether it is safe to call the {@link #acquire()} method without risking to be suspended. However, there is no guarantee that a subsequent 342 * call to {@link #acquire()} actually is not-blocking because in the meantime other threads may have invoked the semaphore. 343 * 344 * @return the current number of available {@link #acquire()} calls in the current period. 345 */ 346 public synchronized int getAvailablePermits() { 347 return getLimit() - getAcquireCount(); 348 } 349 350 /** 351 * Gets the average number of successful (i.e. non-blocking) {@link #acquire()} invocations for the entire life-time of this {@code 352 * TimedSemaphore}. This method can be used for instance for statistical calculations. 353 * 354 * @return the average number of {@link #acquire()} invocations per time unit. 355 */ 356 public synchronized double getAverageCallsPerPeriod() { 357 return periodCount == 0 ? 0 : (double) totalAcquireCount / (double) periodCount; 358 } 359 360 /** 361 * Gets the executor service used by this instance. 362 * 363 * @return the executor service. 364 */ 365 protected ScheduledExecutorService getExecutorService() { 366 return executorService; 367 } 368 369 /** 370 * Gets the number of (successful) acquire invocations during the last period. This is the number of times the {@link #acquire()} method was called without 371 * blocking. This can be useful for testing or debugging purposes or to determine a meaningful threshold value. If a limit is set, the value returned by 372 * this method won't be greater than this limit. 373 * 374 * @return the number of non-blocking invocations of the {@link #acquire()} method. 375 */ 376 public synchronized int getLastAcquiresPerPeriod() { 377 return lastCallsPerPeriod; 378 } 379 380 /** 381 * Gets the limit enforced by this semaphore. The limit determines how many invocations of {@link #acquire()} are allowed within the monitored period. 382 * 383 * @return the limit. 384 */ 385 public final synchronized int getLimit() { 386 return limit; 387 } 388 389 /** 390 * Gets the time period. This is the time monitored by this semaphore. Only a given number of invocations of the {@link #acquire()} method is possible in 391 * this period. 392 * 393 * @return the time period. 394 */ 395 public long getPeriod() { 396 return period; 397 } 398 399 /** 400 * Gets the time unit. This is the unit used by {@link #getPeriod()}. 401 * 402 * @return the time unit. 403 */ 404 public TimeUnit getUnit() { 405 return unit; 406 } 407 408 /** 409 * Tests whether the {@link #shutdown()} method has been called on this object. If this method returns <strong>true</strong>, this instance cannot be used 410 * any longer. 411 * 412 * @return a flag whether a shutdown has been performed. 413 */ 414 public synchronized boolean isShutdown() { 415 return shutdown; 416 } 417 418 /** 419 * Prepares an acquire operation. Checks for the current state and starts the internal timer if necessary. This method must be called with the lock of this 420 * object held. 421 */ 422 private void prepareAcquire() { 423 if (isShutdown()) { 424 throw new IllegalStateException("TimedSemaphore is shut down!"); 425 } 426 if (task == null) { 427 task = startTimer(); 428 } 429 } 430 431 /** 432 * Sets the limit. This is the number of times the {@link #acquire()} method can be called within the time period specified. If this limit is reached, 433 * further invocations of {@link #acquire()} will block. Setting the limit to a value <= {@link #NO_LIMIT} will cause the limit to be disabled, i.e. an 434 * arbitrary number of{@link #acquire()} invocations is allowed in the time period. 435 * 436 * @param limit the limit. 437 */ 438 public final synchronized void setLimit(final int limit) { 439 this.limit = limit; 440 } 441 442 /** 443 * Initializes a shutdown. After that the object cannot be used anymore. This method can be invoked an arbitrary number of times. All invocations after the 444 * first one do not have any effect. 445 */ 446 public synchronized void shutdown() { 447 if (!shutdown) { 448 if (ownExecutor) { 449 // if the executor was created by this instance, it has 450 // to be shutdown 451 getExecutorService().shutdownNow(); 452 } 453 if (task != null) { 454 task.cancel(false); 455 } 456 shutdown = true; 457 } 458 } 459 460 /** 461 * Starts the timer. This method is called when {@link #acquire()} is called for the first time. It schedules a task to be executed at fixed rate to monitor 462 * the time period specified. 463 * 464 * @return a future object representing the task scheduled. 465 */ 466 protected ScheduledFuture<?> startTimer() { 467 return getExecutorService().scheduleAtFixedRate(this::endOfPeriod, getPeriod(), getPeriod(), getUnit()); 468 } 469 470 /** 471 * Tries to acquire a permit from this semaphore. If the limit of this semaphore has not yet been reached, a permit is acquired, and this method returns 472 * <strong>true</strong>. Otherwise, this method returns immediately with the result <strong>false</strong>. 473 * 474 * @return <strong>true</strong> if a permit could be acquired; <strong>false</strong> otherwise. 475 * @throws IllegalStateException if this semaphore is already shut down. 476 * @since 3.5 477 */ 478 public synchronized boolean tryAcquire() { 479 prepareAcquire(); 480 return acquirePermit(); 481 } 482}