001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.commons.lang3.concurrent; 018 019import java.util.concurrent.ScheduledExecutorService; 020import java.util.concurrent.ScheduledFuture; 021import java.util.concurrent.ScheduledThreadPoolExecutor; 022import java.util.concurrent.TimeUnit; 023 024/** 025 * <p> 026 * A specialized <em>semaphore</em> implementation that provides a number of 027 * permits in a given time frame. 028 * </p> 029 * <p> 030 * This class is similar to the {@code java.util.concurrent.Semaphore} class 031 * provided by the JDK in that it manages a configurable number of permits. 032 * Using the {@link #acquire()} method a permit can be requested by a thread. 033 * However, there is an additional timing dimension: there is no {@code 034 * release()} method for freeing a permit, but all permits are automatically 035 * released at the end of a configurable time frame. If a thread calls 036 * {@link #acquire()} and the available permits are already exhausted for this 037 * time frame, the thread is blocked. When the time frame ends all permits 038 * requested so far are restored, and blocking threads are waked up again, so 039 * that they can try to acquire a new permit. This basically means that in the 040 * specified time frame only the given number of operations is possible. 041 * </p> 042 * <p> 043 * A use case for this class is to artificially limit the load produced by a 044 * process. As an example consider an application that issues database queries 045 * on a production system in a background process to gather statistical 046 * information. This background processing should not produce so much database 047 * load that the functionality and the performance of the production system are 048 * impacted. Here a {@code TimedSemaphore} could be installed to guarantee that 049 * only a given number of database queries are issued per second. 050 * </p> 051 * <p> 052 * A thread class for performing database queries could look as follows: 053 * </p> 054 * 055 * <pre> 056 * public class StatisticsThread extends Thread { 057 * // The semaphore for limiting database load. 058 * private final TimedSemaphore semaphore; 059 * // Create an instance and set the semaphore 060 * public StatisticsThread(TimedSemaphore timedSemaphore) { 061 * semaphore = timedSemaphore; 062 * } 063 * // Gather statistics 064 * public void run() { 065 * try { 066 * while(true) { 067 * semaphore.acquire(); // limit database load 068 * performQuery(); // issue a query 069 * } 070 * } catch(InterruptedException) { 071 * // fall through 072 * } 073 * } 074 * ... 075 * } 076 * </pre> 077 * 078 * <p> 079 * The following code fragment shows how a {@code TimedSemaphore} is created 080 * that allows only 10 operations per second and passed to the statistics 081 * thread: 082 * </p> 083 * 084 * <pre> 085 * TimedSemaphore sem = new TimedSemaphore(1, TimeUnit.SECOND, 10); 086 * StatisticsThread thread = new StatisticsThread(sem); 087 * thread.start(); 088 * </pre> 089 * 090 * <p> 091 * When creating an instance the time period for the semaphore must be 092 * specified. {@code TimedSemaphore} uses an executor service with a 093 * corresponding period to monitor this interval. The {@code 094 * ScheduledExecutorService} to be used for this purpose can be provided at 095 * construction time. Alternatively the class creates an internal executor 096 * service. 097 * </p> 098 * <p> 099 * Client code that uses {@code TimedSemaphore} has to call the 100 * {@link #acquire()} method in aach processing step. {@code TimedSemaphore} 101 * keeps track of the number of invocations of the {@link #acquire()} method and 102 * blocks the calling thread if the counter exceeds the limit specified. When 103 * the timer signals the end of the time period the counter is reset and all 104 * waiting threads are released. Then another cycle can start. 105 * </p> 106 * <p> 107 * It is possible to modify the limit at any time using the 108 * {@link #setLimit(int)} method. This is useful if the load produced by an 109 * operation has to be adapted dynamically. In the example scenario with the 110 * thread collecting statistics it may make sense to specify a low limit during 111 * day time while allowing a higher load in the night time. Reducing the limit 112 * takes effect immediately by blocking incoming callers. If the limit is 113 * increased, waiting threads are not released immediately, but wake up when the 114 * timer runs out. Then, in the next period more processing steps can be 115 * performed without blocking. By setting the limit to 0 the semaphore can be 116 * switched off: in this mode the {@link #acquire()} method never blocks, but 117 * lets all callers pass directly. 118 * </p> 119 * <p> 120 * When the {@code TimedSemaphore} is no more needed its {@link #shutdown()} 121 * method should be called. This causes the periodic task that monitors the time 122 * interval to be canceled. If the {@code ScheduledExecutorService} has been 123 * created by the semaphore at construction time, it is also shut down. 124 * resources. After that {@link #acquire()} must not be called any more. 125 * </p> 126 * 127 * @since 3.0 128 * @version $Id: TimedSemaphore.java 1583482 2014-03-31 22:54:57Z niallp $ 129 */ 130public class TimedSemaphore { 131 /** 132 * Constant for a value representing no limit. If the limit is set to a 133 * value less or equal this constant, the {@code TimedSemaphore} will be 134 * effectively switched off. 135 */ 136 public static final int NO_LIMIT = 0; 137 138 /** Constant for the thread pool size for the executor. */ 139 private static final int THREAD_POOL_SIZE = 1; 140 141 /** The executor service for managing the timer thread. */ 142 private final ScheduledExecutorService executorService; 143 144 /** Stores the period for this timed semaphore. */ 145 private final long period; 146 147 /** The time unit for the period. */ 148 private final TimeUnit unit; 149 150 /** A flag whether the executor service was created by this object. */ 151 private final boolean ownExecutor; 152 153 /** A future object representing the timer task. */ 154 private ScheduledFuture<?> task; // @GuardedBy("this") 155 156 /** Stores the total number of invocations of the acquire() method. */ 157 private long totalAcquireCount; // @GuardedBy("this") 158 159 /** 160 * The counter for the periods. This counter is increased every time a 161 * period ends. 162 */ 163 private long periodCount; // @GuardedBy("this") 164 165 /** The limit. */ 166 private int limit; // @GuardedBy("this") 167 168 /** The current counter. */ 169 private int acquireCount; // @GuardedBy("this") 170 171 /** The number of invocations of acquire() in the last period. */ 172 private int lastCallsPerPeriod; // @GuardedBy("this") 173 174 /** A flag whether shutdown() was called. */ 175 private boolean shutdown; // @GuardedBy("this") 176 177 /** 178 * Creates a new instance of {@link TimedSemaphore} and initializes it with 179 * the given time period and the limit. 180 * 181 * @param timePeriod the time period 182 * @param timeUnit the unit for the period 183 * @param limit the limit for the semaphore 184 * @throws IllegalArgumentException if the period is less or equals 0 185 */ 186 public TimedSemaphore(final long timePeriod, final TimeUnit timeUnit, final int limit) { 187 this(null, timePeriod, timeUnit, limit); 188 } 189 190 /** 191 * Creates a new instance of {@link TimedSemaphore} and initializes it with 192 * an executor service, the given time period, and the limit. The executor 193 * service will be used for creating a periodic task for monitoring the time 194 * period. It can be <b>null</b>, then a default service will be created. 195 * 196 * @param service the executor service 197 * @param timePeriod the time period 198 * @param timeUnit the unit for the period 199 * @param limit the limit for the semaphore 200 * @throws IllegalArgumentException if the period is less or equals 0 201 */ 202 public TimedSemaphore(final ScheduledExecutorService service, final long timePeriod, 203 final TimeUnit timeUnit, final int limit) { 204 if (timePeriod <= 0) { 205 throw new IllegalArgumentException("Time period must be greater 0!"); 206 } 207 208 period = timePeriod; 209 unit = timeUnit; 210 211 if (service != null) { 212 executorService = service; 213 ownExecutor = false; 214 } else { 215 final ScheduledThreadPoolExecutor s = new ScheduledThreadPoolExecutor( 216 THREAD_POOL_SIZE); 217 s.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); 218 s.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); 219 executorService = s; 220 ownExecutor = true; 221 } 222 223 setLimit(limit); 224 } 225 226 /** 227 * Returns the limit enforced by this semaphore. The limit determines how 228 * many invocations of {@link #acquire()} are allowed within the monitored 229 * period. 230 * 231 * @return the limit 232 */ 233 public final synchronized int getLimit() { 234 return limit; 235 } 236 237 /** 238 * Sets the limit. This is the number of times the {@link #acquire()} method 239 * can be called within the time period specified. If this limit is reached, 240 * further invocations of {@link #acquire()} will block. Setting the limit 241 * to a value <= {@link #NO_LIMIT} will cause the limit to be disabled, 242 * i.e. an arbitrary number of{@link #acquire()} invocations is allowed in 243 * the time period. 244 * 245 * @param limit the limit 246 */ 247 public final synchronized void setLimit(final int limit) { 248 this.limit = limit; 249 } 250 251 /** 252 * Initializes a shutdown. After that the object cannot be used any more. 253 * This method can be invoked an arbitrary number of times. All invocations 254 * after the first one do not have any effect. 255 */ 256 public synchronized void shutdown() { 257 if (!shutdown) { 258 259 if (ownExecutor) { 260 // if the executor was created by this instance, it has 261 // to be shutdown 262 getExecutorService().shutdownNow(); 263 } 264 if (task != null) { 265 task.cancel(false); 266 } 267 268 shutdown = true; 269 } 270 } 271 272 /** 273 * Tests whether the {@link #shutdown()} method has been called on this 274 * object. If this method returns <b>true</b>, this instance cannot be used 275 * any longer. 276 * 277 * @return a flag whether a shutdown has been performed 278 */ 279 public synchronized boolean isShutdown() { 280 return shutdown; 281 } 282 283 /** 284 * Tries to acquire a permit from this semaphore. This method will block if 285 * the limit for the current period has already been reached. If 286 * {@link #shutdown()} has already been invoked, calling this method will 287 * cause an exception. The very first call of this method starts the timer 288 * task which monitors the time period set for this {@code TimedSemaphore}. 289 * From now on the semaphore is active. 290 * 291 * @throws InterruptedException if the thread gets interrupted 292 * @throws IllegalStateException if this semaphore is already shut down 293 */ 294 public synchronized void acquire() throws InterruptedException { 295 if (isShutdown()) { 296 throw new IllegalStateException("TimedSemaphore is shut down!"); 297 } 298 299 if (task == null) { 300 task = startTimer(); 301 } 302 303 boolean canPass = false; 304 do { 305 canPass = getLimit() <= NO_LIMIT || acquireCount < getLimit(); 306 if (!canPass) { 307 wait(); 308 } else { 309 acquireCount++; 310 } 311 } while (!canPass); 312 } 313 314 /** 315 * Returns the number of (successful) acquire invocations during the last 316 * period. This is the number of times the {@link #acquire()} method was 317 * called without blocking. This can be useful for testing or debugging 318 * purposes or to determine a meaningful threshold value. If a limit is set, 319 * the value returned by this method won't be greater than this limit. 320 * 321 * @return the number of non-blocking invocations of the {@link #acquire()} 322 * method 323 */ 324 public synchronized int getLastAcquiresPerPeriod() { 325 return lastCallsPerPeriod; 326 } 327 328 /** 329 * Returns the number of invocations of the {@link #acquire()} method for 330 * the current period. This may be useful for testing or debugging purposes. 331 * 332 * @return the current number of {@link #acquire()} invocations 333 */ 334 public synchronized int getAcquireCount() { 335 return acquireCount; 336 } 337 338 /** 339 * Returns the number of calls to the {@link #acquire()} method that can 340 * still be performed in the current period without blocking. This method 341 * can give an indication whether it is safe to call the {@link #acquire()} 342 * method without risking to be suspended. However, there is no guarantee 343 * that a subsequent call to {@link #acquire()} actually is not-blocking 344 * because in the mean time other threads may have invoked the semaphore. 345 * 346 * @return the current number of available {@link #acquire()} calls in the 347 * current period 348 */ 349 public synchronized int getAvailablePermits() { 350 return getLimit() - getAcquireCount(); 351 } 352 353 /** 354 * Returns the average number of successful (i.e. non-blocking) 355 * {@link #acquire()} invocations for the entire life-time of this {@code 356 * TimedSemaphore}. This method can be used for instance for statistical 357 * calculations. 358 * 359 * @return the average number of {@link #acquire()} invocations per time 360 * unit 361 */ 362 public synchronized double getAverageCallsPerPeriod() { 363 return periodCount == 0 ? 0 : (double) totalAcquireCount 364 / (double) periodCount; 365 } 366 367 /** 368 * Returns the time period. This is the time monitored by this semaphore. 369 * Only a given number of invocations of the {@link #acquire()} method is 370 * possible in this period. 371 * 372 * @return the time period 373 */ 374 public long getPeriod() { 375 return period; 376 } 377 378 /** 379 * Returns the time unit. This is the unit used by {@link #getPeriod()}. 380 * 381 * @return the time unit 382 */ 383 public TimeUnit getUnit() { 384 return unit; 385 } 386 387 /** 388 * Returns the executor service used by this instance. 389 * 390 * @return the executor service 391 */ 392 protected ScheduledExecutorService getExecutorService() { 393 return executorService; 394 } 395 396 /** 397 * Starts the timer. This method is called when {@link #acquire()} is called 398 * for the first time. It schedules a task to be executed at fixed rate to 399 * monitor the time period specified. 400 * 401 * @return a future object representing the task scheduled 402 */ 403 protected ScheduledFuture<?> startTimer() { 404 return getExecutorService().scheduleAtFixedRate(new Runnable() { 405 @Override 406 public void run() { 407 endOfPeriod(); 408 } 409 }, getPeriod(), getPeriod(), getUnit()); 410 } 411 412 /** 413 * The current time period is finished. This method is called by the timer 414 * used internally to monitor the time period. It resets the counter and 415 * releases the threads waiting for this barrier. 416 */ 417 synchronized void endOfPeriod() { 418 lastCallsPerPeriod = acquireCount; 419 totalAcquireCount += acquireCount; 420 periodCount++; 421 acquireCount = 0; 422 notifyAll(); 423 } 424}