View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *      https://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.commons.lang3.concurrent;
19  
20  import java.util.concurrent.ScheduledExecutorService;
21  import java.util.concurrent.ScheduledFuture;
22  import java.util.concurrent.ScheduledThreadPoolExecutor;
23  import java.util.concurrent.TimeUnit;
24  import java.util.function.Supplier;
25  
26  import org.apache.commons.lang3.Validate;
27  
28  /**
29   * A specialized <em>semaphore</em> implementation that provides a number of permits in a given time frame.
30   *
31   * <p>
32   * This class is similar to the {@link java.util.concurrent.Semaphore} class provided by the JDK in that it manages a configurable number of permits. Using the
33   * {@link #acquire()} method a permit can be requested by a thread. However, there is an additional timing dimension: there is no {@code
34   * release()} method for freeing a permit, but all permits are automatically released at the end of a configurable time frame. If a thread calls
35   * {@link #acquire()} and the available permits are already exhausted for this time frame, the thread is blocked. When the time frame ends all permits requested
36   * so far are restored, and blocking threads are waked up again, so that they can try to acquire a new permit. This basically means that in the specified time
37   * frame only the given number of operations is possible.
38   * </p>
39   * <p>
40   * A use case for this class is to artificially limit the load produced by a process. As an example consider an application that issues database queries on a
41   * production system in a background process to gather statistical information. This background processing should not produce so much database load that the
42   * functionality and the performance of the production system are impacted. Here a {@link TimedSemaphore} could be installed to guarantee that only a given
43   * number of database queries are issued per second.
44   * </p>
45   * <p>
46   * A thread class for performing database queries could look as follows:
47   * </p>
48   *
49   * <pre>
50   * public class StatisticsThread extends Thread {
51   *     // The semaphore for limiting database load.
52   *     private final TimedSemaphore semaphore;
53   *     // Create an instance and set the semaphore
54   *     public StatisticsThread(TimedSemaphore timedSemaphore) {
55   *         semaphore = timedSemaphore;
56   *     }
57   *     // Gather statistics
58   *     public void run() {
59   *         try {
60   *             while (true) {
61   *                 semaphore.acquire();   // limit database load
62   *                 performQuery();        // issue a query
63   *             }
64   *         } catch (InterruptedException) {
65   *             // fall through
66   *         }
67   *     }
68   *     ...
69   * }
70   * </pre>
71   *
72   * <p>
73   * The following code fragment shows how a {@link TimedSemaphore} is created that allows only 10 operations per second and passed to the statistics thread:
74   * </p>
75   *
76   * <pre>
77   * TimedSemaphore sem = new TimedSemaphore(1, TimeUnit.SECOND, 10);
78   * StatisticsThread thread = new StatisticsThread(sem);
79   * thread.start();
80   * </pre>
81   *
82   * <p>
83   * When creating an instance the time period for the semaphore must be specified. {@link TimedSemaphore} uses an executor service with a corresponding period to
84   * monitor this interval. The {@code
85   * ScheduledExecutorService} to be used for this purpose can be provided at construction time. Alternatively the class creates an internal executor service.
86   * </p>
87   * <p>
88   * Client code that uses {@link TimedSemaphore} has to call the {@link #acquire()} method in each processing step. {@link TimedSemaphore} keeps track of the
89   * number of invocations of the {@link #acquire()} method and blocks the calling thread if the counter exceeds the limit specified. When the timer signals the
90   * end of the time period the counter is reset and all waiting threads are released. Then another cycle can start.
91   * </p>
92   * <p>
93   * An alternative to {@code acquire()} is the {@link #tryAcquire()} method. This method checks whether the semaphore is under the specified limit and increases
94   * the internal counter if this is the case. The return value is then <strong>true</strong>, and the calling thread can continue with its action. If the
95   * semaphore is already at its limit, {@code tryAcquire()} immediately returns <strong>false</strong> without blocking; the calling thread must then abort its
96   * action. This usage scenario prevents blocking of threads.
97   * </p>
98   * <p>
99   * It is possible to modify the limit at any time using the {@link #setLimit(int)} method. This is useful if the load produced by an operation has to be adapted
100  * dynamically. In the example scenario with the thread collecting statistics it may make sense to specify a low limit during day time while allowing a higher
101  * load in the night time. Reducing the limit takes effect immediately by blocking incoming callers. If the limit is increased, waiting threads are not released
102  * immediately, but wake up when the timer runs out. Then, in the next period more processing steps can be performed without blocking. By setting the limit to 0
103  * the semaphore can be switched off: in this mode the {@link #acquire()} method never blocks, but lets all callers pass directly.
104  * </p>
105  * <p>
106  * When the {@link TimedSemaphore} is no more needed its {@link #shutdown()} method should be called. This causes the periodic task that monitors the time
107  * interval to be canceled. If the {@link ScheduledExecutorService} has been created by the semaphore at construction time, it is also shut down. resources.
108  * After that {@link #acquire()} must not be called any more.
109  * </p>
110  *
111  * @since 3.0
112  */
113 public class TimedSemaphore {
114 
115     /**
116      * Builds new {@link TimedSemaphore}.
117      *
118      * @since 3.20.0
119      */
120     public static class Builder implements Supplier<TimedSemaphore> {
121 
122         private ScheduledExecutorService service;
123         private long period;
124         private TimeUnit timeUnit;
125         private int limit;
126 
127         /**
128          * Constructs a new Builder.
129          */
130         public Builder() {
131             // empty
132         }
133 
134         @Override
135         public TimedSemaphore get() {
136             return new TimedSemaphore(this);
137         }
138 
139         /**
140          * Sets the limit.
141          *
142          * @param limit The limit.
143          * @return {@code this} instance.
144          */
145         public Builder setLimit(final int limit) {
146             this.limit = limit;
147             return this;
148         }
149 
150         /**
151          * Sets the time period.
152          *
153          * @param period The time period.
154          * @return {@code this} instance.
155          */
156         public Builder setPeriod(final long period) {
157             this.period = period;
158             return this;
159         }
160 
161         /**
162          * Sets the executor service.
163          *
164          * @param service The executor service.
165          * @return {@code this} instance.
166          */
167         public Builder setService(final ScheduledExecutorService service) {
168             this.service = service;
169             return this;
170         }
171 
172         /**
173          * Sets the time unit for the period.
174          *
175          * @param timeUnit The time unit for the period.
176          * @return {@code this} instance.
177          */
178         public Builder setTimeUnit(final TimeUnit timeUnit) {
179             this.timeUnit = timeUnit;
180             return this;
181         }
182     }
183 
184     /**
185      * Constant for a value representing no limit. If the limit is set to a value less or equal this constant, the {@link TimedSemaphore} will be effectively
186      * switched off.
187      */
188     public static final int NO_LIMIT = 0;
189 
190     /** Constant for the thread pool size for the executor. */
191     private static final int THREAD_POOL_SIZE = 1;
192 
193     /**
194      * Constructs a new Builder.
195      *
196      * @return a new Builder.
197      * @since 3.20.0
198      */
199     public static Builder builder() {
200         return new Builder();
201     }
202 
203     /** The executor service for managing the timer thread. */
204     private final ScheduledExecutorService executorService;
205 
206     /** The period for this timed semaphore. */
207     private final long period;
208 
209     /** The time unit for the period. */
210     private final TimeUnit unit;
211 
212     /** A flag whether the executor service was created by this object. */
213     private final boolean ownExecutor;
214 
215     /** A future object representing the timer task. */
216     private ScheduledFuture<?> task; // @GuardedBy("this")
217 
218     /** Stores the total number of invocations of the acquire() method. */
219     private long totalAcquireCount; // @GuardedBy("this")
220 
221     /**
222      * The counter for the periods. This counter is increased every time a period ends.
223      */
224     private long periodCount; // @GuardedBy("this")
225 
226     /** The limit. */
227     private int limit; // @GuardedBy("this")
228 
229     /** The current counter. */
230     private int acquireCount; // @GuardedBy("this")
231 
232     /** The number of invocations of acquire() in the last period. */
233     private int lastCallsPerPeriod; // @GuardedBy("this")
234 
235     /** A flag whether shutdown() was called. */
236     private boolean shutdown; // @GuardedBy("this")
237 
238     private TimedSemaphore(final Builder builder) {
239         Validate.inclusiveBetween(1, Long.MAX_VALUE, builder.period, "Time period must be greater than 0.");
240         period = builder.period;
241         unit = builder.timeUnit;
242         if (builder.service != null) {
243             executorService = builder.service;
244             ownExecutor = false;
245         } else {
246             final ScheduledThreadPoolExecutor stpe = new ScheduledThreadPoolExecutor(THREAD_POOL_SIZE);
247             stpe.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
248             stpe.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
249             executorService = stpe;
250             ownExecutor = true;
251         }
252         setLimit(builder.limit);
253     }
254 
255     /**
256      * Constructs a new instance of {@link TimedSemaphore} and initializes it with the given time period and the limit.
257      *
258      * @param timePeriod the time period.
259      * @param timeUnit   the unit for the period.
260      * @param limit      the limit for the semaphore.
261      * @throws IllegalArgumentException if the period is less or equals 0.
262      * @deprecated Use {@link #builder()} and {@link Builder}.
263      */
264     @Deprecated
265     public TimedSemaphore(final long timePeriod, final TimeUnit timeUnit, final int limit) {
266         this(null, timePeriod, timeUnit, limit);
267     }
268 
269     /**
270      * Constructs a new instance of {@link TimedSemaphore} and initializes it with an executor service, the given time period, and the limit. The executor service
271      * will be used for creating a periodic task for monitoring the time period. It can be <strong>null</strong>, then a default service will be created.
272      *
273      * @param service    the executor service.
274      * @param timePeriod the time period.
275      * @param timeUnit   the unit for the period.
276      * @param limit      the limit for the semaphore.
277      * @throws IllegalArgumentException if the period is less or equals 0.
278      * @deprecated Use {@link #builder()} and {@link Builder}.
279      */
280     @Deprecated
281     public TimedSemaphore(final ScheduledExecutorService service, final long timePeriod, final TimeUnit timeUnit, final int limit) {
282         this(builder().setService(service).setPeriod(timePeriod).setTimeUnit(timeUnit).setLimit(limit));
283     }
284 
285     /**
286      * Acquires a permit from this semaphore. This method will block if the limit for the current period has already been reached. If {@link #shutdown()} has
287      * already been invoked, calling this method will cause an exception. The very first call of this method starts the timer task which monitors the time
288      * period set for this {@link TimedSemaphore}. From now on the semaphore is active.
289      *
290      * @throws InterruptedException  if the thread gets interrupted.
291      * @throws IllegalStateException if this semaphore is already shut down.
292      */
293     public synchronized void acquire() throws InterruptedException {
294         prepareAcquire();
295         boolean canPass;
296         do {
297             canPass = acquirePermit();
298             if (!canPass) {
299                 wait();
300             }
301         } while (!canPass);
302     }
303 
304     /**
305      * Internal helper method for acquiring a permit. This method checks whether currently a permit can be acquired and - if so - increases the internal
306      * counter. The return value indicates whether a permit could be acquired. This method must be called with the lock of this object held.
307      *
308      * @return a flag whether a permit could be acquired.
309      */
310     private boolean acquirePermit() {
311         if (getLimit() <= NO_LIMIT || acquireCount < getLimit()) {
312             acquireCount++;
313             return true;
314         }
315         return false;
316     }
317 
318     /**
319      * The current time period is finished. This method is called by the timer used internally to monitor the time period. It resets the counter and releases
320      * the threads waiting for this barrier.
321      */
322     synchronized void endOfPeriod() {
323         lastCallsPerPeriod = acquireCount;
324         totalAcquireCount += acquireCount;
325         periodCount++;
326         acquireCount = 0;
327         notifyAll();
328     }
329 
330     /**
331      * Gets the number of invocations of the {@link #acquire()} method for the current period. This may be useful for testing or debugging purposes.
332      *
333      * @return the current number of {@link #acquire()} invocations.
334      */
335     public synchronized int getAcquireCount() {
336         return acquireCount;
337     }
338 
339     /**
340      * Gets the number of calls to the {@link #acquire()} method that can still be performed in the current period without blocking. This method can give an
341      * indication whether it is safe to call the {@link #acquire()} method without risking to be suspended. However, there is no guarantee that a subsequent
342      * call to {@link #acquire()} actually is not-blocking because in the meantime other threads may have invoked the semaphore.
343      *
344      * @return the current number of available {@link #acquire()} calls in the current period.
345      */
346     public synchronized int getAvailablePermits() {
347         return getLimit() - getAcquireCount();
348     }
349 
350     /**
351      * Gets the average number of successful (i.e. non-blocking) {@link #acquire()} invocations for the entire life-time of this {@code
352      * TimedSemaphore}. This method can be used for instance for statistical calculations.
353      *
354      * @return the average number of {@link #acquire()} invocations per time unit.
355      */
356     public synchronized double getAverageCallsPerPeriod() {
357         return periodCount == 0 ? 0 : (double) totalAcquireCount / (double) periodCount;
358     }
359 
360     /**
361      * Gets the executor service used by this instance.
362      *
363      * @return the executor service.
364      */
365     protected ScheduledExecutorService getExecutorService() {
366         return executorService;
367     }
368 
369     /**
370      * Gets the number of (successful) acquire invocations during the last period. This is the number of times the {@link #acquire()} method was called without
371      * blocking. This can be useful for testing or debugging purposes or to determine a meaningful threshold value. If a limit is set, the value returned by
372      * this method won't be greater than this limit.
373      *
374      * @return the number of non-blocking invocations of the {@link #acquire()} method.
375      */
376     public synchronized int getLastAcquiresPerPeriod() {
377         return lastCallsPerPeriod;
378     }
379 
380     /**
381      * Gets the limit enforced by this semaphore. The limit determines how many invocations of {@link #acquire()} are allowed within the monitored period.
382      *
383      * @return the limit.
384      */
385     public final synchronized int getLimit() {
386         return limit;
387     }
388 
389     /**
390      * Gets the time period. This is the time monitored by this semaphore. Only a given number of invocations of the {@link #acquire()} method is possible in
391      * this period.
392      *
393      * @return the time period.
394      */
395     public long getPeriod() {
396         return period;
397     }
398 
399     /**
400      * Gets the time unit. This is the unit used by {@link #getPeriod()}.
401      *
402      * @return the time unit.
403      */
404     public TimeUnit getUnit() {
405         return unit;
406     }
407 
408     /**
409      * Tests whether the {@link #shutdown()} method has been called on this object. If this method returns <strong>true</strong>, this instance cannot be used
410      * any longer.
411      *
412      * @return a flag whether a shutdown has been performed.
413      */
414     public synchronized boolean isShutdown() {
415         return shutdown;
416     }
417 
418     /**
419      * Prepares an acquire operation. Checks for the current state and starts the internal timer if necessary. This method must be called with the lock of this
420      * object held.
421      */
422     private void prepareAcquire() {
423         if (isShutdown()) {
424             throw new IllegalStateException("TimedSemaphore is shut down!");
425         }
426         if (task == null) {
427             task = startTimer();
428         }
429     }
430 
431     /**
432      * Sets the limit. This is the number of times the {@link #acquire()} method can be called within the time period specified. If this limit is reached,
433      * further invocations of {@link #acquire()} will block. Setting the limit to a value &lt;= {@link #NO_LIMIT} will cause the limit to be disabled, i.e. an
434      * arbitrary number of{@link #acquire()} invocations is allowed in the time period.
435      *
436      * @param limit the limit.
437      */
438     public final synchronized void setLimit(final int limit) {
439         this.limit = limit;
440     }
441 
442     /**
443      * Initializes a shutdown. After that the object cannot be used anymore. This method can be invoked an arbitrary number of times. All invocations after the
444      * first one do not have any effect.
445      */
446     public synchronized void shutdown() {
447         if (!shutdown) {
448             if (ownExecutor) {
449                 // if the executor was created by this instance, it has
450                 // to be shutdown
451                 getExecutorService().shutdownNow();
452             }
453             if (task != null) {
454                 task.cancel(false);
455             }
456             shutdown = true;
457         }
458     }
459 
460     /**
461      * Starts the timer. This method is called when {@link #acquire()} is called for the first time. It schedules a task to be executed at fixed rate to monitor
462      * the time period specified.
463      *
464      * @return a future object representing the task scheduled.
465      */
466     protected ScheduledFuture<?> startTimer() {
467         return getExecutorService().scheduleAtFixedRate(this::endOfPeriod, getPeriod(), getPeriod(), getUnit());
468     }
469 
470     /**
471      * Tries to acquire a permit from this semaphore. If the limit of this semaphore has not yet been reached, a permit is acquired, and this method returns
472      * <strong>true</strong>. Otherwise, this method returns immediately with the result <strong>false</strong>.
473      *
474      * @return <strong>true</strong> if a permit could be acquired; <strong>false</strong> otherwise.
475      * @throws IllegalStateException if this semaphore is already shut down.
476      * @since 3.5
477      */
478     public synchronized boolean tryAcquire() {
479         prepareAcquire();
480         return acquirePermit();
481     }
482 }