001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      https://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package org.apache.commons.lang3.concurrent;
019
020import java.util.concurrent.ScheduledExecutorService;
021import java.util.concurrent.ScheduledFuture;
022import java.util.concurrent.ScheduledThreadPoolExecutor;
023import java.util.concurrent.TimeUnit;
024import java.util.function.Supplier;
025
026import org.apache.commons.lang3.Validate;
027
028/**
029 * A specialized <em>semaphore</em> implementation that provides a number of permits in a given time frame.
030 *
031 * <p>
032 * This class is similar to the {@link java.util.concurrent.Semaphore} class provided by the JDK in that it manages a configurable number of permits. Using the
033 * {@link #acquire()} method a permit can be requested by a thread. However, there is an additional timing dimension: there is no {@code
034 * release()} method for freeing a permit, but all permits are automatically released at the end of a configurable time frame. If a thread calls
035 * {@link #acquire()} and the available permits are already exhausted for this time frame, the thread is blocked. When the time frame ends all permits requested
036 * so far are restored, and blocking threads are waked up again, so that they can try to acquire a new permit. This basically means that in the specified time
037 * frame only the given number of operations is possible.
038 * </p>
039 * <p>
040 * A use case for this class is to artificially limit the load produced by a process. As an example consider an application that issues database queries on a
041 * production system in a background process to gather statistical information. This background processing should not produce so much database load that the
042 * functionality and the performance of the production system are impacted. Here a {@link TimedSemaphore} could be installed to guarantee that only a given
043 * number of database queries are issued per second.
044 * </p>
045 * <p>
046 * A thread class for performing database queries could look as follows:
047 * </p>
048 *
049 * <pre>
050 * public class StatisticsThread extends Thread {
051 *     // The semaphore for limiting database load.
052 *     private final TimedSemaphore semaphore;
053 *     // Create an instance and set the semaphore
054 *     public StatisticsThread(TimedSemaphore timedSemaphore) {
055 *         semaphore = timedSemaphore;
056 *     }
057 *     // Gather statistics
058 *     public void run() {
059 *         try {
060 *             while (true) {
061 *                 semaphore.acquire();   // limit database load
062 *                 performQuery();        // issue a query
063 *             }
064 *         } catch (InterruptedException) {
065 *             // fall through
066 *         }
067 *     }
068 *     ...
069 * }
070 * </pre>
071 *
072 * <p>
073 * The following code fragment shows how a {@link TimedSemaphore} is created that allows only 10 operations per second and passed to the statistics thread:
074 * </p>
075 *
076 * <pre>
077 * TimedSemaphore sem = new TimedSemaphore(1, TimeUnit.SECOND, 10);
078 * StatisticsThread thread = new StatisticsThread(sem);
079 * thread.start();
080 * </pre>
081 *
082 * <p>
083 * When creating an instance the time period for the semaphore must be specified. {@link TimedSemaphore} uses an executor service with a corresponding period to
084 * monitor this interval. The {@code
085 * ScheduledExecutorService} to be used for this purpose can be provided at construction time. Alternatively the class creates an internal executor service.
086 * </p>
087 * <p>
088 * Client code that uses {@link TimedSemaphore} has to call the {@link #acquire()} method in each processing step. {@link TimedSemaphore} keeps track of the
089 * number of invocations of the {@link #acquire()} method and blocks the calling thread if the counter exceeds the limit specified. When the timer signals the
090 * end of the time period the counter is reset and all waiting threads are released. Then another cycle can start.
091 * </p>
092 * <p>
093 * An alternative to {@code acquire()} is the {@link #tryAcquire()} method. This method checks whether the semaphore is under the specified limit and increases
094 * the internal counter if this is the case. The return value is then <strong>true</strong>, and the calling thread can continue with its action. If the
095 * semaphore is already at its limit, {@code tryAcquire()} immediately returns <strong>false</strong> without blocking; the calling thread must then abort its
096 * action. This usage scenario prevents blocking of threads.
097 * </p>
098 * <p>
099 * It is possible to modify the limit at any time using the {@link #setLimit(int)} method. This is useful if the load produced by an operation has to be adapted
100 * dynamically. In the example scenario with the thread collecting statistics it may make sense to specify a low limit during day time while allowing a higher
101 * load in the night time. Reducing the limit takes effect immediately by blocking incoming callers. If the limit is increased, waiting threads are not released
102 * immediately, but wake up when the timer runs out. Then, in the next period more processing steps can be performed without blocking. By setting the limit to 0
103 * the semaphore can be switched off: in this mode the {@link #acquire()} method never blocks, but lets all callers pass directly.
104 * </p>
105 * <p>
106 * When the {@link TimedSemaphore} is no more needed its {@link #shutdown()} method should be called. This causes the periodic task that monitors the time
107 * interval to be canceled. If the {@link ScheduledExecutorService} has been created by the semaphore at construction time, it is also shut down. resources.
108 * After that {@link #acquire()} must not be called any more.
109 * </p>
110 *
111 * @since 3.0
112 */
113public class TimedSemaphore {
114
115    /**
116     * Builds new {@link TimedSemaphore}.
117     *
118     * @since 3.20.0
119     */
120    public static class Builder implements Supplier<TimedSemaphore> {
121
122        private ScheduledExecutorService service;
123        private long period;
124        private TimeUnit timeUnit;
125        private int limit;
126
127        /**
128         * Constructs a new Builder.
129         */
130        public Builder() {
131            // empty
132        }
133
134        @Override
135        public TimedSemaphore get() {
136            return new TimedSemaphore(this);
137        }
138
139        /**
140         * Sets the limit.
141         *
142         * @param limit The limit.
143         * @return {@code this} instance.
144         */
145        public Builder setLimit(final int limit) {
146            this.limit = limit;
147            return this;
148        }
149
150        /**
151         * Sets the time period.
152         *
153         * @param period The time period.
154         * @return {@code this} instance.
155         */
156        public Builder setPeriod(final long period) {
157            this.period = period;
158            return this;
159        }
160
161        /**
162         * Sets the executor service.
163         *
164         * @param service The executor service.
165         * @return {@code this} instance.
166         */
167        public Builder setService(final ScheduledExecutorService service) {
168            this.service = service;
169            return this;
170        }
171
172        /**
173         * Sets the time unit for the period.
174         *
175         * @param timeUnit The time unit for the period.
176         * @return {@code this} instance.
177         */
178        public Builder setTimeUnit(final TimeUnit timeUnit) {
179            this.timeUnit = timeUnit;
180            return this;
181        }
182    }
183
184    /**
185     * Constant for a value representing no limit. If the limit is set to a value less or equal this constant, the {@link TimedSemaphore} will be effectively
186     * switched off.
187     */
188    public static final int NO_LIMIT = 0;
189
190    /** Constant for the thread pool size for the executor. */
191    private static final int THREAD_POOL_SIZE = 1;
192
193    /**
194     * Constructs a new Builder.
195     *
196     * @return a new Builder.
197     * @since 3.20.0
198     */
199    public static Builder builder() {
200        return new Builder();
201    }
202
203    /** The executor service for managing the timer thread. */
204    private final ScheduledExecutorService executorService;
205
206    /** The period for this timed semaphore. */
207    private final long period;
208
209    /** The time unit for the period. */
210    private final TimeUnit unit;
211
212    /** A flag whether the executor service was created by this object. */
213    private final boolean ownExecutor;
214
215    /** A future object representing the timer task. */
216    private ScheduledFuture<?> task; // @GuardedBy("this")
217
218    /** Stores the total number of invocations of the acquire() method. */
219    private long totalAcquireCount; // @GuardedBy("this")
220
221    /**
222     * The counter for the periods. This counter is increased every time a period ends.
223     */
224    private long periodCount; // @GuardedBy("this")
225
226    /** The limit. */
227    private int limit; // @GuardedBy("this")
228
229    /** The current counter. */
230    private int acquireCount; // @GuardedBy("this")
231
232    /** The number of invocations of acquire() in the last period. */
233    private int lastCallsPerPeriod; // @GuardedBy("this")
234
235    /** A flag whether shutdown() was called. */
236    private boolean shutdown; // @GuardedBy("this")
237
238    private TimedSemaphore(final Builder builder) {
239        Validate.inclusiveBetween(1, Long.MAX_VALUE, builder.period, "Time period must be greater than 0.");
240        period = builder.period;
241        unit = builder.timeUnit;
242        if (builder.service != null) {
243            executorService = builder.service;
244            ownExecutor = false;
245        } else {
246            final ScheduledThreadPoolExecutor stpe = new ScheduledThreadPoolExecutor(THREAD_POOL_SIZE);
247            stpe.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
248            stpe.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
249            executorService = stpe;
250            ownExecutor = true;
251        }
252        setLimit(builder.limit);
253    }
254
255    /**
256     * Constructs a new instance of {@link TimedSemaphore} and initializes it with the given time period and the limit.
257     *
258     * @param timePeriod the time period.
259     * @param timeUnit   the unit for the period.
260     * @param limit      the limit for the semaphore.
261     * @throws IllegalArgumentException if the period is less or equals 0.
262     * @deprecated Use {@link #builder()} and {@link Builder}.
263     */
264    @Deprecated
265    public TimedSemaphore(final long timePeriod, final TimeUnit timeUnit, final int limit) {
266        this(null, timePeriod, timeUnit, limit);
267    }
268
269    /**
270     * Constructs a new instance of {@link TimedSemaphore} and initializes it with an executor service, the given time period, and the limit. The executor service
271     * will be used for creating a periodic task for monitoring the time period. It can be <strong>null</strong>, then a default service will be created.
272     *
273     * @param service    the executor service.
274     * @param timePeriod the time period.
275     * @param timeUnit   the unit for the period.
276     * @param limit      the limit for the semaphore.
277     * @throws IllegalArgumentException if the period is less or equals 0.
278     * @deprecated Use {@link #builder()} and {@link Builder}.
279     */
280    @Deprecated
281    public TimedSemaphore(final ScheduledExecutorService service, final long timePeriod, final TimeUnit timeUnit, final int limit) {
282        this(builder().setService(service).setPeriod(timePeriod).setTimeUnit(timeUnit).setLimit(limit));
283    }
284
285    /**
286     * Acquires a permit from this semaphore. This method will block if the limit for the current period has already been reached. If {@link #shutdown()} has
287     * already been invoked, calling this method will cause an exception. The very first call of this method starts the timer task which monitors the time
288     * period set for this {@link TimedSemaphore}. From now on the semaphore is active.
289     *
290     * @throws InterruptedException  if the thread gets interrupted.
291     * @throws IllegalStateException if this semaphore is already shut down.
292     */
293    public synchronized void acquire() throws InterruptedException {
294        prepareAcquire();
295        boolean canPass;
296        do {
297            canPass = acquirePermit();
298            if (!canPass) {
299                wait();
300            }
301        } while (!canPass);
302    }
303
304    /**
305     * Internal helper method for acquiring a permit. This method checks whether currently a permit can be acquired and - if so - increases the internal
306     * counter. The return value indicates whether a permit could be acquired. This method must be called with the lock of this object held.
307     *
308     * @return a flag whether a permit could be acquired.
309     */
310    private boolean acquirePermit() {
311        if (getLimit() <= NO_LIMIT || acquireCount < getLimit()) {
312            acquireCount++;
313            return true;
314        }
315        return false;
316    }
317
318    /**
319     * The current time period is finished. This method is called by the timer used internally to monitor the time period. It resets the counter and releases
320     * the threads waiting for this barrier.
321     */
322    synchronized void endOfPeriod() {
323        lastCallsPerPeriod = acquireCount;
324        totalAcquireCount += acquireCount;
325        periodCount++;
326        acquireCount = 0;
327        notifyAll();
328    }
329
330    /**
331     * Gets the number of invocations of the {@link #acquire()} method for the current period. This may be useful for testing or debugging purposes.
332     *
333     * @return the current number of {@link #acquire()} invocations.
334     */
335    public synchronized int getAcquireCount() {
336        return acquireCount;
337    }
338
339    /**
340     * Gets the number of calls to the {@link #acquire()} method that can still be performed in the current period without blocking. This method can give an
341     * indication whether it is safe to call the {@link #acquire()} method without risking to be suspended. However, there is no guarantee that a subsequent
342     * call to {@link #acquire()} actually is not-blocking because in the meantime other threads may have invoked the semaphore.
343     *
344     * @return the current number of available {@link #acquire()} calls in the current period.
345     */
346    public synchronized int getAvailablePermits() {
347        return getLimit() - getAcquireCount();
348    }
349
350    /**
351     * Gets the average number of successful (i.e. non-blocking) {@link #acquire()} invocations for the entire life-time of this {@code
352     * TimedSemaphore}. This method can be used for instance for statistical calculations.
353     *
354     * @return the average number of {@link #acquire()} invocations per time unit.
355     */
356    public synchronized double getAverageCallsPerPeriod() {
357        return periodCount == 0 ? 0 : (double) totalAcquireCount / (double) periodCount;
358    }
359
360    /**
361     * Gets the executor service used by this instance.
362     *
363     * @return the executor service.
364     */
365    protected ScheduledExecutorService getExecutorService() {
366        return executorService;
367    }
368
369    /**
370     * Gets the number of (successful) acquire invocations during the last period. This is the number of times the {@link #acquire()} method was called without
371     * blocking. This can be useful for testing or debugging purposes or to determine a meaningful threshold value. If a limit is set, the value returned by
372     * this method won't be greater than this limit.
373     *
374     * @return the number of non-blocking invocations of the {@link #acquire()} method.
375     */
376    public synchronized int getLastAcquiresPerPeriod() {
377        return lastCallsPerPeriod;
378    }
379
380    /**
381     * Gets the limit enforced by this semaphore. The limit determines how many invocations of {@link #acquire()} are allowed within the monitored period.
382     *
383     * @return the limit.
384     */
385    public final synchronized int getLimit() {
386        return limit;
387    }
388
389    /**
390     * Gets the time period. This is the time monitored by this semaphore. Only a given number of invocations of the {@link #acquire()} method is possible in
391     * this period.
392     *
393     * @return the time period.
394     */
395    public long getPeriod() {
396        return period;
397    }
398
399    /**
400     * Gets the time unit. This is the unit used by {@link #getPeriod()}.
401     *
402     * @return the time unit.
403     */
404    public TimeUnit getUnit() {
405        return unit;
406    }
407
408    /**
409     * Tests whether the {@link #shutdown()} method has been called on this object. If this method returns <strong>true</strong>, this instance cannot be used
410     * any longer.
411     *
412     * @return a flag whether a shutdown has been performed.
413     */
414    public synchronized boolean isShutdown() {
415        return shutdown;
416    }
417
418    /**
419     * Prepares an acquire operation. Checks for the current state and starts the internal timer if necessary. This method must be called with the lock of this
420     * object held.
421     */
422    private void prepareAcquire() {
423        if (isShutdown()) {
424            throw new IllegalStateException("TimedSemaphore is shut down!");
425        }
426        if (task == null) {
427            task = startTimer();
428        }
429    }
430
431    /**
432     * Sets the limit. This is the number of times the {@link #acquire()} method can be called within the time period specified. If this limit is reached,
433     * further invocations of {@link #acquire()} will block. Setting the limit to a value &lt;= {@link #NO_LIMIT} will cause the limit to be disabled, i.e. an
434     * arbitrary number of{@link #acquire()} invocations is allowed in the time period.
435     *
436     * @param limit the limit.
437     */
438    public final synchronized void setLimit(final int limit) {
439        this.limit = limit;
440    }
441
442    /**
443     * Initializes a shutdown. After that the object cannot be used anymore. This method can be invoked an arbitrary number of times. All invocations after the
444     * first one do not have any effect.
445     */
446    public synchronized void shutdown() {
447        if (!shutdown) {
448            if (ownExecutor) {
449                // if the executor was created by this instance, it has
450                // to be shutdown
451                getExecutorService().shutdownNow();
452            }
453            if (task != null) {
454                task.cancel(false);
455            }
456            shutdown = true;
457        }
458    }
459
460    /**
461     * Starts the timer. This method is called when {@link #acquire()} is called for the first time. It schedules a task to be executed at fixed rate to monitor
462     * the time period specified.
463     *
464     * @return a future object representing the task scheduled.
465     */
466    protected ScheduledFuture<?> startTimer() {
467        return getExecutorService().scheduleAtFixedRate(this::endOfPeriod, getPeriod(), getPeriod(), getUnit());
468    }
469
470    /**
471     * Tries to acquire a permit from this semaphore. If the limit of this semaphore has not yet been reached, a permit is acquired, and this method returns
472     * <strong>true</strong>. Otherwise, this method returns immediately with the result <strong>false</strong>.
473     *
474     * @return <strong>true</strong> if a permit could be acquired; <strong>false</strong> otherwise.
475     * @throws IllegalStateException if this semaphore is already shut down.
476     * @since 3.5
477     */
478    public synchronized boolean tryAcquire() {
479        prepareAcquire();
480        return acquirePermit();
481    }
482}