001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.commons.lang3.concurrent;
018
019import java.util.concurrent.ScheduledExecutorService;
020import java.util.concurrent.ScheduledFuture;
021import java.util.concurrent.ScheduledThreadPoolExecutor;
022import java.util.concurrent.TimeUnit;
023
024import org.apache.commons.lang3.Validate;
025
026/**
027 * <p>
028 * A specialized <em>semaphore</em> implementation that provides a number of
029 * permits in a given time frame.
030 * </p>
031 * <p>
032 * This class is similar to the {@code java.util.concurrent.Semaphore} class
033 * provided by the JDK in that it manages a configurable number of permits.
034 * Using the {@link #acquire()} method a permit can be requested by a thread.
035 * However, there is an additional timing dimension: there is no {@code
036 * release()} method for freeing a permit, but all permits are automatically
037 * released at the end of a configurable time frame. If a thread calls
038 * {@link #acquire()} and the available permits are already exhausted for this
039 * time frame, the thread is blocked. When the time frame ends all permits
040 * requested so far are restored, and blocking threads are waked up again, so
041 * that they can try to acquire a new permit. This basically means that in the
042 * specified time frame only the given number of operations is possible.
043 * </p>
044 * <p>
045 * A use case for this class is to artificially limit the load produced by a
046 * process. As an example consider an application that issues database queries
047 * on a production system in a background process to gather statistical
048 * information. This background processing should not produce so much database
049 * load that the functionality and the performance of the production system are
050 * impacted. Here a {@code TimedSemaphore} could be installed to guarantee that
051 * only a given number of database queries are issued per second.
052 * </p>
053 * <p>
054 * A thread class for performing database queries could look as follows:
055 * </p>
056 *
057 * <pre>
058 * public class StatisticsThread extends Thread {
059 *     // The semaphore for limiting database load.
060 *     private final TimedSemaphore semaphore;
061 *     // Create an instance and set the semaphore
062 *     public StatisticsThread(TimedSemaphore timedSemaphore) {
063 *         semaphore = timedSemaphore;
064 *     }
065 *     // Gather statistics
066 *     public void run() {
067 *         try {
068 *             while (true) {
069 *                 semaphore.acquire();   // limit database load
070 *                 performQuery();        // issue a query
071 *             }
072 *         } catch(InterruptedException) {
073 *             // fall through
074 *         }
075 *     }
076 *     ...
077 * }
078 * </pre>
079 *
080 * <p>
081 * The following code fragment shows how a {@code TimedSemaphore} is created
082 * that allows only 10 operations per second and passed to the statistics
083 * thread:
084 * </p>
085 *
086 * <pre>
087 * TimedSemaphore sem = new TimedSemaphore(1, TimeUnit.SECOND, 10);
088 * StatisticsThread thread = new StatisticsThread(sem);
089 * thread.start();
090 * </pre>
091 *
092 * <p>
093 * When creating an instance the time period for the semaphore must be
094 * specified. {@code TimedSemaphore} uses an executor service with a
095 * corresponding period to monitor this interval. The {@code
096 * ScheduledExecutorService} to be used for this purpose can be provided at
097 * construction time. Alternatively the class creates an internal executor
098 * service.
099 * </p>
100 * <p>
101 * Client code that uses {@code TimedSemaphore} has to call the
102 * {@link #acquire()} method in each processing step. {@code TimedSemaphore}
103 * keeps track of the number of invocations of the {@link #acquire()} method and
104 * blocks the calling thread if the counter exceeds the limit specified. When
105 * the timer signals the end of the time period the counter is reset and all
106 * waiting threads are released. Then another cycle can start.
107 * </p>
108 * <p>
109 * An alternative to {@code acquire()} is the {@link #tryAcquire()} method. This
110 * method checks whether the semaphore is under the specified limit and
111 * increases the internal counter if this is the case. The return value is then
112 * <strong>true</strong>, and the calling thread can continue with its action.
113 * If the semaphore is already at its limit, {@code tryAcquire()} immediately
114 * returns <strong>false</strong> without blocking; the calling thread must
115 * then abort its action. This usage scenario prevents blocking of threads.
116 * </p>
117 * <p>
118 * It is possible to modify the limit at any time using the
119 * {@link #setLimit(int)} method. This is useful if the load produced by an
120 * operation has to be adapted dynamically. In the example scenario with the
121 * thread collecting statistics it may make sense to specify a low limit during
122 * day time while allowing a higher load in the night time. Reducing the limit
123 * takes effect immediately by blocking incoming callers. If the limit is
124 * increased, waiting threads are not released immediately, but wake up when the
125 * timer runs out. Then, in the next period more processing steps can be
126 * performed without blocking. By setting the limit to 0 the semaphore can be
127 * switched off: in this mode the {@link #acquire()} method never blocks, but
128 * lets all callers pass directly.
129 * </p>
130 * <p>
131 * When the {@code TimedSemaphore} is no more needed its {@link #shutdown()}
132 * method should be called. This causes the periodic task that monitors the time
133 * interval to be canceled. If the {@code ScheduledExecutorService} has been
134 * created by the semaphore at construction time, it is also shut down.
135 * resources. After that {@link #acquire()} must not be called any more.
136 * </p>
137 *
138 * @since 3.0
139 */
140public class TimedSemaphore {
141    /**
142     * Constant for a value representing no limit. If the limit is set to a
143     * value less or equal this constant, the {@code TimedSemaphore} will be
144     * effectively switched off.
145     */
146    public static final int NO_LIMIT = 0;
147
148    /** Constant for the thread pool size for the executor. */
149    private static final int THREAD_POOL_SIZE = 1;
150
151    /** The executor service for managing the timer thread. */
152    private final ScheduledExecutorService executorService;
153
154    /** Stores the period for this timed semaphore. */
155    private final long period;
156
157    /** The time unit for the period. */
158    private final TimeUnit unit;
159
160    /** A flag whether the executor service was created by this object. */
161    private final boolean ownExecutor;
162
163    /** A future object representing the timer task. */
164    private ScheduledFuture<?> task; // @GuardedBy("this")
165
166    /** Stores the total number of invocations of the acquire() method. */
167    private long totalAcquireCount; // @GuardedBy("this")
168
169    /**
170     * The counter for the periods. This counter is increased every time a
171     * period ends.
172     */
173    private long periodCount; // @GuardedBy("this")
174
175    /** The limit. */
176    private int limit; // @GuardedBy("this")
177
178    /** The current counter. */
179    private int acquireCount;  // @GuardedBy("this")
180
181    /** The number of invocations of acquire() in the last period. */
182    private int lastCallsPerPeriod; // @GuardedBy("this")
183
184    /** A flag whether shutdown() was called. */
185    private boolean shutdown;  // @GuardedBy("this")
186
187    /**
188     * Creates a new instance of {@link TimedSemaphore} and initializes it with
189     * the given time period and the limit.
190     *
191     * @param timePeriod the time period
192     * @param timeUnit the unit for the period
193     * @param limit the limit for the semaphore
194     * @throws IllegalArgumentException if the period is less or equals 0
195     */
196    public TimedSemaphore(final long timePeriod, final TimeUnit timeUnit, final int limit) {
197        this(null, timePeriod, timeUnit, limit);
198    }
199
200    /**
201     * Creates a new instance of {@link TimedSemaphore} and initializes it with
202     * an executor service, the given time period, and the limit. The executor
203     * service will be used for creating a periodic task for monitoring the time
204     * period. It can be <b>null</b>, then a default service will be created.
205     *
206     * @param service the executor service
207     * @param timePeriod the time period
208     * @param timeUnit the unit for the period
209     * @param limit the limit for the semaphore
210     * @throws IllegalArgumentException if the period is less or equals 0
211     */
212    public TimedSemaphore(final ScheduledExecutorService service, final long timePeriod,
213            final TimeUnit timeUnit, final int limit) {
214        Validate.inclusiveBetween(1, Long.MAX_VALUE, timePeriod, "Time period must be greater than 0!");
215
216        period = timePeriod;
217        unit = timeUnit;
218
219        if (service != null) {
220            executorService = service;
221            ownExecutor = false;
222        } else {
223            final ScheduledThreadPoolExecutor s = new ScheduledThreadPoolExecutor(
224                    THREAD_POOL_SIZE);
225            s.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
226            s.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
227            executorService = s;
228            ownExecutor = true;
229        }
230
231        setLimit(limit);
232    }
233
234    /**
235     * Returns the limit enforced by this semaphore. The limit determines how
236     * many invocations of {@link #acquire()} are allowed within the monitored
237     * period.
238     *
239     * @return the limit
240     */
241    public final synchronized int getLimit() {
242        return limit;
243    }
244
245    /**
246     * Sets the limit. This is the number of times the {@link #acquire()} method
247     * can be called within the time period specified. If this limit is reached,
248     * further invocations of {@link #acquire()} will block. Setting the limit
249     * to a value &lt;= {@link #NO_LIMIT} will cause the limit to be disabled,
250     * i.e. an arbitrary number of{@link #acquire()} invocations is allowed in
251     * the time period.
252     *
253     * @param limit the limit
254     */
255    public final synchronized void setLimit(final int limit) {
256        this.limit = limit;
257    }
258
259    /**
260     * Initializes a shutdown. After that the object cannot be used any more.
261     * This method can be invoked an arbitrary number of times. All invocations
262     * after the first one do not have any effect.
263     */
264    public synchronized void shutdown() {
265        if (!shutdown) {
266
267            if (ownExecutor) {
268                // if the executor was created by this instance, it has
269                // to be shutdown
270                getExecutorService().shutdownNow();
271            }
272            if (task != null) {
273                task.cancel(false);
274            }
275
276            shutdown = true;
277        }
278    }
279
280    /**
281     * Tests whether the {@link #shutdown()} method has been called on this
282     * object. If this method returns <b>true</b>, this instance cannot be used
283     * any longer.
284     *
285     * @return a flag whether a shutdown has been performed
286     */
287    public synchronized boolean isShutdown() {
288        return shutdown;
289    }
290
291    /**
292     * Acquires a permit from this semaphore. This method will block if
293     * the limit for the current period has already been reached. If
294     * {@link #shutdown()} has already been invoked, calling this method will
295     * cause an exception. The very first call of this method starts the timer
296     * task which monitors the time period set for this {@code TimedSemaphore}.
297     * From now on the semaphore is active.
298     *
299     * @throws InterruptedException if the thread gets interrupted
300     * @throws IllegalStateException if this semaphore is already shut down
301     */
302    public synchronized void acquire() throws InterruptedException {
303        prepareAcquire();
304
305        boolean canPass;
306        do {
307            canPass = acquirePermit();
308            if (!canPass) {
309                wait();
310            }
311        } while (!canPass);
312    }
313
314    /**
315     * Tries to acquire a permit from this semaphore. If the limit of this semaphore has
316     * not yet been reached, a permit is acquired, and this method returns
317     * <strong>true</strong>. Otherwise, this method returns immediately with the result
318     * <strong>false</strong>.
319     *
320     * @return <strong>true</strong> if a permit could be acquired; <strong>false</strong>
321     * otherwise
322     * @throws IllegalStateException if this semaphore is already shut down
323     * @since 3.5
324     */
325    public synchronized boolean tryAcquire() {
326        prepareAcquire();
327        return acquirePermit();
328    }
329
330    /**
331     * Returns the number of (successful) acquire invocations during the last
332     * period. This is the number of times the {@link #acquire()} method was
333     * called without blocking. This can be useful for testing or debugging
334     * purposes or to determine a meaningful threshold value. If a limit is set,
335     * the value returned by this method won't be greater than this limit.
336     *
337     * @return the number of non-blocking invocations of the {@link #acquire()}
338     * method
339     */
340    public synchronized int getLastAcquiresPerPeriod() {
341        return lastCallsPerPeriod;
342    }
343
344    /**
345     * Returns the number of invocations of the {@link #acquire()} method for
346     * the current period. This may be useful for testing or debugging purposes.
347     *
348     * @return the current number of {@link #acquire()} invocations
349     */
350    public synchronized int getAcquireCount() {
351        return acquireCount;
352    }
353
354    /**
355     * Returns the number of calls to the {@link #acquire()} method that can
356     * still be performed in the current period without blocking. This method
357     * can give an indication whether it is safe to call the {@link #acquire()}
358     * method without risking to be suspended. However, there is no guarantee
359     * that a subsequent call to {@link #acquire()} actually is not-blocking
360     * because in the mean time other threads may have invoked the semaphore.
361     *
362     * @return the current number of available {@link #acquire()} calls in the
363     * current period
364     */
365    public synchronized int getAvailablePermits() {
366        return getLimit() - getAcquireCount();
367    }
368
369    /**
370     * Returns the average number of successful (i.e. non-blocking)
371     * {@link #acquire()} invocations for the entire life-time of this {@code
372     * TimedSemaphore}. This method can be used for instance for statistical
373     * calculations.
374     *
375     * @return the average number of {@link #acquire()} invocations per time
376     * unit
377     */
378    public synchronized double getAverageCallsPerPeriod() {
379        return periodCount == 0 ? 0 : (double) totalAcquireCount
380                / (double) periodCount;
381    }
382
383    /**
384     * Returns the time period. This is the time monitored by this semaphore.
385     * Only a given number of invocations of the {@link #acquire()} method is
386     * possible in this period.
387     *
388     * @return the time period
389     */
390    public long getPeriod() {
391        return period;
392    }
393
394    /**
395     * Returns the time unit. This is the unit used by {@link #getPeriod()}.
396     *
397     * @return the time unit
398     */
399    public TimeUnit getUnit() {
400        return unit;
401    }
402
403    /**
404     * Returns the executor service used by this instance.
405     *
406     * @return the executor service
407     */
408    protected ScheduledExecutorService getExecutorService() {
409        return executorService;
410    }
411
412    /**
413     * Starts the timer. This method is called when {@link #acquire()} is called
414     * for the first time. It schedules a task to be executed at fixed rate to
415     * monitor the time period specified.
416     *
417     * @return a future object representing the task scheduled
418     */
419    protected ScheduledFuture<?> startTimer() {
420        return getExecutorService().scheduleAtFixedRate(this::endOfPeriod, getPeriod(), getPeriod(), getUnit());
421    }
422
423    /**
424     * The current time period is finished. This method is called by the timer
425     * used internally to monitor the time period. It resets the counter and
426     * releases the threads waiting for this barrier.
427     */
428    synchronized void endOfPeriod() {
429        lastCallsPerPeriod = acquireCount;
430        totalAcquireCount += acquireCount;
431        periodCount++;
432        acquireCount = 0;
433        notifyAll();
434    }
435
436    /**
437     * Prepares an acquire operation. Checks for the current state and starts the internal
438     * timer if necessary. This method must be called with the lock of this object held.
439     */
440    private void prepareAcquire() {
441        if (isShutdown()) {
442            throw new IllegalStateException("TimedSemaphore is shut down!");
443        }
444
445        if (task == null) {
446            task = startTimer();
447        }
448    }
449
450    /**
451     * Internal helper method for acquiring a permit. This method checks whether currently
452     * a permit can be acquired and - if so - increases the internal counter. The return
453     * value indicates whether a permit could be acquired. This method must be called with
454     * the lock of this object held.
455     *
456     * @return a flag whether a permit could be acquired
457     */
458    private boolean acquirePermit() {
459        if (getLimit() <= NO_LIMIT || acquireCount < getLimit()) {
460            acquireCount++;
461            return true;
462        }
463        return false;
464    }
465}