1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 package org.apache.commons.lang3.concurrent;
19
20 import java.util.concurrent.ScheduledExecutorService;
21 import java.util.concurrent.ScheduledFuture;
22 import java.util.concurrent.ScheduledThreadPoolExecutor;
23 import java.util.concurrent.TimeUnit;
24 import java.util.function.Supplier;
25
26 import org.apache.commons.lang3.Validate;
27
28 /**
29 * A specialized <em>semaphore</em> implementation that provides a number of permits in a given time frame.
30 *
31 * <p>
32 * This class is similar to the {@link java.util.concurrent.Semaphore} class provided by the JDK in that it manages a configurable number of permits. Using the
33 * {@link #acquire()} method a permit can be requested by a thread. However, there is an additional timing dimension: there is no {@code
34 * release()} method for freeing a permit, but all permits are automatically released at the end of a configurable time frame. If a thread calls
35 * {@link #acquire()} and the available permits are already exhausted for this time frame, the thread is blocked. When the time frame ends all permits requested
36 * so far are restored, and blocking threads are waked up again, so that they can try to acquire a new permit. This basically means that in the specified time
37 * frame only the given number of operations is possible.
38 * </p>
39 * <p>
40 * A use case for this class is to artificially limit the load produced by a process. As an example consider an application that issues database queries on a
41 * production system in a background process to gather statistical information. This background processing should not produce so much database load that the
42 * functionality and the performance of the production system are impacted. Here a {@link TimedSemaphore} could be installed to guarantee that only a given
43 * number of database queries are issued per second.
44 * </p>
45 * <p>
46 * A thread class for performing database queries could look as follows:
47 * </p>
48 *
49 * <pre>
50 * public class StatisticsThread extends Thread {
51 * // The semaphore for limiting database load.
52 * private final TimedSemaphore semaphore;
53 * // Create an instance and set the semaphore
54 * public StatisticsThread(TimedSemaphore timedSemaphore) {
55 * semaphore = timedSemaphore;
56 * }
57 * // Gather statistics
58 * public void run() {
59 * try {
60 * while (true) {
61 * semaphore.acquire(); // limit database load
62 * performQuery(); // issue a query
63 * }
64 * } catch (InterruptedException) {
65 * // fall through
66 * }
67 * }
68 * ...
69 * }
70 * </pre>
71 *
72 * <p>
73 * The following code fragment shows how a {@link TimedSemaphore} is created that allows only 10 operations per second and passed to the statistics thread:
74 * </p>
75 *
76 * <pre>
77 * TimedSemaphore sem = new TimedSemaphore(1, TimeUnit.SECOND, 10);
78 * StatisticsThread thread = new StatisticsThread(sem);
79 * thread.start();
80 * </pre>
81 *
82 * <p>
83 * When creating an instance the time period for the semaphore must be specified. {@link TimedSemaphore} uses an executor service with a corresponding period to
84 * monitor this interval. The {@code
85 * ScheduledExecutorService} to be used for this purpose can be provided at construction time. Alternatively the class creates an internal executor service.
86 * </p>
87 * <p>
88 * Client code that uses {@link TimedSemaphore} has to call the {@link #acquire()} method in each processing step. {@link TimedSemaphore} keeps track of the
89 * number of invocations of the {@link #acquire()} method and blocks the calling thread if the counter exceeds the limit specified. When the timer signals the
90 * end of the time period the counter is reset and all waiting threads are released. Then another cycle can start.
91 * </p>
92 * <p>
93 * An alternative to {@code acquire()} is the {@link #tryAcquire()} method. This method checks whether the semaphore is under the specified limit and increases
94 * the internal counter if this is the case. The return value is then <strong>true</strong>, and the calling thread can continue with its action. If the
95 * semaphore is already at its limit, {@code tryAcquire()} immediately returns <strong>false</strong> without blocking; the calling thread must then abort its
96 * action. This usage scenario prevents blocking of threads.
97 * </p>
98 * <p>
99 * It is possible to modify the limit at any time using the {@link #setLimit(int)} method. This is useful if the load produced by an operation has to be adapted
100 * dynamically. In the example scenario with the thread collecting statistics it may make sense to specify a low limit during day time while allowing a higher
101 * load in the night time. Reducing the limit takes effect immediately by blocking incoming callers. If the limit is increased, waiting threads are not released
102 * immediately, but wake up when the timer runs out. Then, in the next period more processing steps can be performed without blocking. By setting the limit to 0
103 * the semaphore can be switched off: in this mode the {@link #acquire()} method never blocks, but lets all callers pass directly.
104 * </p>
105 * <p>
106 * When the {@link TimedSemaphore} is no more needed its {@link #shutdown()} method should be called. This causes the periodic task that monitors the time
107 * interval to be canceled. If the {@link ScheduledExecutorService} has been created by the semaphore at construction time, it is also shut down. resources.
108 * After that {@link #acquire()} must not be called any more.
109 * </p>
110 *
111 * @since 3.0
112 */
113 public class TimedSemaphore {
114
115 /**
116 * Builds new {@link TimedSemaphore}.
117 *
118 * @since 3.20.0
119 */
120 public static class Builder implements Supplier<TimedSemaphore> {
121
122 private ScheduledExecutorService service;
123 private long period;
124 private TimeUnit timeUnit;
125 private int limit;
126
127 /**
128 * Constructs a new Builder.
129 */
130 public Builder() {
131 // empty
132 }
133
134 @Override
135 public TimedSemaphore get() {
136 return new TimedSemaphore(this);
137 }
138
139 /**
140 * Sets the limit.
141 *
142 * @param limit The limit.
143 * @return {@code this} instance.
144 */
145 public Builder setLimit(final int limit) {
146 this.limit = limit;
147 return this;
148 }
149
150 /**
151 * Sets the time period.
152 *
153 * @param period The time period.
154 * @return {@code this} instance.
155 */
156 public Builder setPeriod(final long period) {
157 this.period = period;
158 return this;
159 }
160
161 /**
162 * Sets the executor service.
163 *
164 * @param service The executor service.
165 * @return {@code this} instance.
166 */
167 public Builder setService(final ScheduledExecutorService service) {
168 this.service = service;
169 return this;
170 }
171
172 /**
173 * Sets the time unit for the period.
174 *
175 * @param timeUnit The time unit for the period.
176 * @return {@code this} instance.
177 */
178 public Builder setTimeUnit(final TimeUnit timeUnit) {
179 this.timeUnit = timeUnit;
180 return this;
181 }
182 }
183
184 /**
185 * Constant for a value representing no limit. If the limit is set to a value less or equal this constant, the {@link TimedSemaphore} will be effectively
186 * switched off.
187 */
188 public static final int NO_LIMIT = 0;
189
190 /** Constant for the thread pool size for the executor. */
191 private static final int THREAD_POOL_SIZE = 1;
192
193 /**
194 * Constructs a new Builder.
195 *
196 * @return a new Builder.
197 * @since 3.20.0
198 */
199 public static Builder builder() {
200 return new Builder();
201 }
202
203 /** The executor service for managing the timer thread. */
204 private final ScheduledExecutorService executorService;
205
206 /** The period for this timed semaphore. */
207 private final long period;
208
209 /** The time unit for the period. */
210 private final TimeUnit unit;
211
212 /** A flag whether the executor service was created by this object. */
213 private final boolean ownExecutor;
214
215 /** A future object representing the timer task. */
216 private ScheduledFuture<?> task; // @GuardedBy("this")
217
218 /** Stores the total number of invocations of the acquire() method. */
219 private long totalAcquireCount; // @GuardedBy("this")
220
221 /**
222 * The counter for the periods. This counter is increased every time a period ends.
223 */
224 private long periodCount; // @GuardedBy("this")
225
226 /** The limit. */
227 private int limit; // @GuardedBy("this")
228
229 /** The current counter. */
230 private int acquireCount; // @GuardedBy("this")
231
232 /** The number of invocations of acquire() in the last period. */
233 private int lastCallsPerPeriod; // @GuardedBy("this")
234
235 /** A flag whether shutdown() was called. */
236 private boolean shutdown; // @GuardedBy("this")
237
238 private TimedSemaphore(final Builder builder) {
239 Validate.inclusiveBetween(1, Long.MAX_VALUE, builder.period, "Time period must be greater than 0.");
240 period = builder.period;
241 unit = builder.timeUnit;
242 if (builder.service != null) {
243 executorService = builder.service;
244 ownExecutor = false;
245 } else {
246 final ScheduledThreadPoolExecutor stpe = new ScheduledThreadPoolExecutor(THREAD_POOL_SIZE);
247 stpe.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
248 stpe.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
249 executorService = stpe;
250 ownExecutor = true;
251 }
252 setLimit(builder.limit);
253 }
254
255 /**
256 * Constructs a new instance of {@link TimedSemaphore} and initializes it with the given time period and the limit.
257 *
258 * @param timePeriod the time period.
259 * @param timeUnit the unit for the period.
260 * @param limit the limit for the semaphore.
261 * @throws IllegalArgumentException if the period is less or equals 0.
262 * @deprecated Use {@link #builder()} and {@link Builder}.
263 */
264 @Deprecated
265 public TimedSemaphore(final long timePeriod, final TimeUnit timeUnit, final int limit) {
266 this(null, timePeriod, timeUnit, limit);
267 }
268
269 /**
270 * Constructs a new instance of {@link TimedSemaphore} and initializes it with an executor service, the given time period, and the limit. The executor service
271 * will be used for creating a periodic task for monitoring the time period. It can be <strong>null</strong>, then a default service will be created.
272 *
273 * @param service the executor service.
274 * @param timePeriod the time period.
275 * @param timeUnit the unit for the period.
276 * @param limit the limit for the semaphore.
277 * @throws IllegalArgumentException if the period is less or equals 0.
278 * @deprecated Use {@link #builder()} and {@link Builder}.
279 */
280 @Deprecated
281 public TimedSemaphore(final ScheduledExecutorService service, final long timePeriod, final TimeUnit timeUnit, final int limit) {
282 this(builder().setService(service).setPeriod(timePeriod).setTimeUnit(timeUnit).setLimit(limit));
283 }
284
285 /**
286 * Acquires a permit from this semaphore. This method will block if the limit for the current period has already been reached. If {@link #shutdown()} has
287 * already been invoked, calling this method will cause an exception. The very first call of this method starts the timer task which monitors the time
288 * period set for this {@link TimedSemaphore}. From now on the semaphore is active.
289 *
290 * @throws InterruptedException if the thread gets interrupted.
291 * @throws IllegalStateException if this semaphore is already shut down.
292 */
293 public synchronized void acquire() throws InterruptedException {
294 prepareAcquire();
295 boolean canPass;
296 do {
297 canPass = acquirePermit();
298 if (!canPass) {
299 wait();
300 }
301 } while (!canPass);
302 }
303
304 /**
305 * Internal helper method for acquiring a permit. This method checks whether currently a permit can be acquired and - if so - increases the internal
306 * counter. The return value indicates whether a permit could be acquired. This method must be called with the lock of this object held.
307 *
308 * @return a flag whether a permit could be acquired.
309 */
310 private boolean acquirePermit() {
311 if (getLimit() <= NO_LIMIT || acquireCount < getLimit()) {
312 acquireCount++;
313 return true;
314 }
315 return false;
316 }
317
318 /**
319 * The current time period is finished. This method is called by the timer used internally to monitor the time period. It resets the counter and releases
320 * the threads waiting for this barrier.
321 */
322 synchronized void endOfPeriod() {
323 lastCallsPerPeriod = acquireCount;
324 totalAcquireCount += acquireCount;
325 periodCount++;
326 acquireCount = 0;
327 notifyAll();
328 }
329
330 /**
331 * Gets the number of invocations of the {@link #acquire()} method for the current period. This may be useful for testing or debugging purposes.
332 *
333 * @return the current number of {@link #acquire()} invocations.
334 */
335 public synchronized int getAcquireCount() {
336 return acquireCount;
337 }
338
339 /**
340 * Gets the number of calls to the {@link #acquire()} method that can still be performed in the current period without blocking. This method can give an
341 * indication whether it is safe to call the {@link #acquire()} method without risking to be suspended. However, there is no guarantee that a subsequent
342 * call to {@link #acquire()} actually is not-blocking because in the meantime other threads may have invoked the semaphore.
343 *
344 * @return the current number of available {@link #acquire()} calls in the current period.
345 */
346 public synchronized int getAvailablePermits() {
347 return getLimit() - getAcquireCount();
348 }
349
350 /**
351 * Gets the average number of successful (i.e. non-blocking) {@link #acquire()} invocations for the entire life-time of this {@code
352 * TimedSemaphore}. This method can be used for instance for statistical calculations.
353 *
354 * @return the average number of {@link #acquire()} invocations per time unit.
355 */
356 public synchronized double getAverageCallsPerPeriod() {
357 return periodCount == 0 ? 0 : (double) totalAcquireCount / (double) periodCount;
358 }
359
360 /**
361 * Gets the executor service used by this instance.
362 *
363 * @return the executor service.
364 */
365 protected ScheduledExecutorService getExecutorService() {
366 return executorService;
367 }
368
369 /**
370 * Gets the number of (successful) acquire invocations during the last period. This is the number of times the {@link #acquire()} method was called without
371 * blocking. This can be useful for testing or debugging purposes or to determine a meaningful threshold value. If a limit is set, the value returned by
372 * this method won't be greater than this limit.
373 *
374 * @return the number of non-blocking invocations of the {@link #acquire()} method.
375 */
376 public synchronized int getLastAcquiresPerPeriod() {
377 return lastCallsPerPeriod;
378 }
379
380 /**
381 * Gets the limit enforced by this semaphore. The limit determines how many invocations of {@link #acquire()} are allowed within the monitored period.
382 *
383 * @return the limit.
384 */
385 public final synchronized int getLimit() {
386 return limit;
387 }
388
389 /**
390 * Gets the time period. This is the time monitored by this semaphore. Only a given number of invocations of the {@link #acquire()} method is possible in
391 * this period.
392 *
393 * @return the time period.
394 */
395 public long getPeriod() {
396 return period;
397 }
398
399 /**
400 * Gets the time unit. This is the unit used by {@link #getPeriod()}.
401 *
402 * @return the time unit.
403 */
404 public TimeUnit getUnit() {
405 return unit;
406 }
407
408 /**
409 * Tests whether the {@link #shutdown()} method has been called on this object. If this method returns <strong>true</strong>, this instance cannot be used
410 * any longer.
411 *
412 * @return a flag whether a shutdown has been performed.
413 */
414 public synchronized boolean isShutdown() {
415 return shutdown;
416 }
417
418 /**
419 * Prepares an acquire operation. Checks for the current state and starts the internal timer if necessary. This method must be called with the lock of this
420 * object held.
421 */
422 private void prepareAcquire() {
423 if (isShutdown()) {
424 throw new IllegalStateException("TimedSemaphore is shut down!");
425 }
426 if (task == null) {
427 task = startTimer();
428 }
429 }
430
431 /**
432 * Sets the limit. This is the number of times the {@link #acquire()} method can be called within the time period specified. If this limit is reached,
433 * further invocations of {@link #acquire()} will block. Setting the limit to a value <= {@link #NO_LIMIT} will cause the limit to be disabled, i.e. an
434 * arbitrary number of{@link #acquire()} invocations is allowed in the time period.
435 *
436 * @param limit the limit.
437 */
438 public final synchronized void setLimit(final int limit) {
439 this.limit = limit;
440 }
441
442 /**
443 * Initializes a shutdown. After that the object cannot be used anymore. This method can be invoked an arbitrary number of times. All invocations after the
444 * first one do not have any effect.
445 */
446 public synchronized void shutdown() {
447 if (!shutdown) {
448 if (ownExecutor) {
449 // if the executor was created by this instance, it has
450 // to be shutdown
451 getExecutorService().shutdownNow();
452 }
453 if (task != null) {
454 task.cancel(false);
455 }
456 shutdown = true;
457 }
458 }
459
460 /**
461 * Starts the timer. This method is called when {@link #acquire()} is called for the first time. It schedules a task to be executed at fixed rate to monitor
462 * the time period specified.
463 *
464 * @return a future object representing the task scheduled.
465 */
466 protected ScheduledFuture<?> startTimer() {
467 return getExecutorService().scheduleAtFixedRate(this::endOfPeriod, getPeriod(), getPeriod(), getUnit());
468 }
469
470 /**
471 * Tries to acquire a permit from this semaphore. If the limit of this semaphore has not yet been reached, a permit is acquired, and this method returns
472 * <strong>true</strong>. Otherwise, this method returns immediately with the result <strong>false</strong>.
473 *
474 * @return <strong>true</strong> if a permit could be acquired; <strong>false</strong> otherwise.
475 * @throws IllegalStateException if this semaphore is already shut down.
476 * @since 3.5
477 */
478 public synchronized boolean tryAcquire() {
479 prepareAcquire();
480 return acquirePermit();
481 }
482 }