001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.commons.lang3.concurrent;
018
019import java.util.concurrent.ScheduledExecutorService;
020import java.util.concurrent.ScheduledFuture;
021import java.util.concurrent.ScheduledThreadPoolExecutor;
022import java.util.concurrent.TimeUnit;
023
024import org.apache.commons.lang3.Validate;
025
026/**
027 * A specialized <em>semaphore</em> implementation that provides a number of
028 * permits in a given time frame.
029 *
030 * <p>
031 * This class is similar to the {@code java.util.concurrent.Semaphore} class
032 * provided by the JDK in that it manages a configurable number of permits.
033 * Using the {@link #acquire()} method a permit can be requested by a thread.
034 * However, there is an additional timing dimension: there is no {@code
035 * release()} method for freeing a permit, but all permits are automatically
036 * released at the end of a configurable time frame. If a thread calls
037 * {@link #acquire()} and the available permits are already exhausted for this
038 * time frame, the thread is blocked. When the time frame ends all permits
039 * requested so far are restored, and blocking threads are waked up again, so
040 * that they can try to acquire a new permit. This basically means that in the
041 * specified time frame only the given number of operations is possible.
042 * </p>
043 * <p>
044 * A use case for this class is to artificially limit the load produced by a
045 * process. As an example consider an application that issues database queries
046 * on a production system in a background process to gather statistical
047 * information. This background processing should not produce so much database
048 * load that the functionality and the performance of the production system are
049 * impacted. Here a {@link TimedSemaphore} could be installed to guarantee that
050 * only a given number of database queries are issued per second.
051 * </p>
052 * <p>
053 * A thread class for performing database queries could look as follows:
054 * </p>
055 *
056 * <pre>
057 * public class StatisticsThread extends Thread {
058 *     // The semaphore for limiting database load.
059 *     private final TimedSemaphore semaphore;
060 *     // Create an instance and set the semaphore
061 *     public StatisticsThread(TimedSemaphore timedSemaphore) {
062 *         semaphore = timedSemaphore;
063 *     }
064 *     // Gather statistics
065 *     public void run() {
066 *         try {
067 *             while (true) {
068 *                 semaphore.acquire();   // limit database load
069 *                 performQuery();        // issue a query
070 *             }
071 *         } catch (InterruptedException) {
072 *             // fall through
073 *         }
074 *     }
075 *     ...
076 * }
077 * </pre>
078 *
079 * <p>
080 * The following code fragment shows how a {@link TimedSemaphore} is created
081 * that allows only 10 operations per second and passed to the statistics
082 * thread:
083 * </p>
084 *
085 * <pre>
086 * TimedSemaphore sem = new TimedSemaphore(1, TimeUnit.SECOND, 10);
087 * StatisticsThread thread = new StatisticsThread(sem);
088 * thread.start();
089 * </pre>
090 *
091 * <p>
092 * When creating an instance the time period for the semaphore must be
093 * specified. {@link TimedSemaphore} uses an executor service with a
094 * corresponding period to monitor this interval. The {@code
095 * ScheduledExecutorService} to be used for this purpose can be provided at
096 * construction time. Alternatively the class creates an internal executor
097 * service.
098 * </p>
099 * <p>
100 * Client code that uses {@link TimedSemaphore} has to call the
101 * {@link #acquire()} method in each processing step. {@link TimedSemaphore}
102 * keeps track of the number of invocations of the {@link #acquire()} method and
103 * blocks the calling thread if the counter exceeds the limit specified. When
104 * the timer signals the end of the time period the counter is reset and all
105 * waiting threads are released. Then another cycle can start.
106 * </p>
107 * <p>
108 * An alternative to {@code acquire()} is the {@link #tryAcquire()} method. This
109 * method checks whether the semaphore is under the specified limit and
110 * increases the internal counter if this is the case. The return value is then
111 * <strong>true</strong>, and the calling thread can continue with its action.
112 * If the semaphore is already at its limit, {@code tryAcquire()} immediately
113 * returns <strong>false</strong> without blocking; the calling thread must
114 * then abort its action. This usage scenario prevents blocking of threads.
115 * </p>
116 * <p>
117 * It is possible to modify the limit at any time using the
118 * {@link #setLimit(int)} method. This is useful if the load produced by an
119 * operation has to be adapted dynamically. In the example scenario with the
120 * thread collecting statistics it may make sense to specify a low limit during
121 * day time while allowing a higher load in the night time. Reducing the limit
122 * takes effect immediately by blocking incoming callers. If the limit is
123 * increased, waiting threads are not released immediately, but wake up when the
124 * timer runs out. Then, in the next period more processing steps can be
125 * performed without blocking. By setting the limit to 0 the semaphore can be
126 * switched off: in this mode the {@link #acquire()} method never blocks, but
127 * lets all callers pass directly.
128 * </p>
129 * <p>
130 * When the {@link TimedSemaphore} is no more needed its {@link #shutdown()}
131 * method should be called. This causes the periodic task that monitors the time
132 * interval to be canceled. If the {@link ScheduledExecutorService} has been
133 * created by the semaphore at construction time, it is also shut down.
134 * resources. After that {@link #acquire()} must not be called any more.
135 * </p>
136 *
137 * @since 3.0
138 */
139public class TimedSemaphore {
140    /**
141     * Constant for a value representing no limit. If the limit is set to a
142     * value less or equal this constant, the {@link TimedSemaphore} will be
143     * effectively switched off.
144     */
145    public static final int NO_LIMIT = 0;
146
147    /** Constant for the thread pool size for the executor. */
148    private static final int THREAD_POOL_SIZE = 1;
149
150    /** The executor service for managing the timer thread. */
151    private final ScheduledExecutorService executorService;
152
153    /** Stores the period for this timed semaphore. */
154    private final long period;
155
156    /** The time unit for the period. */
157    private final TimeUnit unit;
158
159    /** A flag whether the executor service was created by this object. */
160    private final boolean ownExecutor;
161
162    /** A future object representing the timer task. */
163    private ScheduledFuture<?> task; // @GuardedBy("this")
164
165    /** Stores the total number of invocations of the acquire() method. */
166    private long totalAcquireCount; // @GuardedBy("this")
167
168    /**
169     * The counter for the periods. This counter is increased every time a
170     * period ends.
171     */
172    private long periodCount; // @GuardedBy("this")
173
174    /** The limit. */
175    private int limit; // @GuardedBy("this")
176
177    /** The current counter. */
178    private int acquireCount;  // @GuardedBy("this")
179
180    /** The number of invocations of acquire() in the last period. */
181    private int lastCallsPerPeriod; // @GuardedBy("this")
182
183    /** A flag whether shutdown() was called. */
184    private boolean shutdown;  // @GuardedBy("this")
185
186    /**
187     * Creates a new instance of {@link TimedSemaphore} and initializes it with
188     * the given time period and the limit.
189     *
190     * @param timePeriod the time period
191     * @param timeUnit the unit for the period
192     * @param limit the limit for the semaphore
193     * @throws IllegalArgumentException if the period is less or equals 0
194     */
195    public TimedSemaphore(final long timePeriod, final TimeUnit timeUnit, final int limit) {
196        this(null, timePeriod, timeUnit, limit);
197    }
198
199    /**
200     * Creates a new instance of {@link TimedSemaphore} and initializes it with
201     * an executor service, the given time period, and the limit. The executor
202     * service will be used for creating a periodic task for monitoring the time
203     * period. It can be <b>null</b>, then a default service will be created.
204     *
205     * @param service the executor service
206     * @param timePeriod the time period
207     * @param timeUnit the unit for the period
208     * @param limit the limit for the semaphore
209     * @throws IllegalArgumentException if the period is less or equals 0
210     */
211    public TimedSemaphore(final ScheduledExecutorService service, final long timePeriod,
212            final TimeUnit timeUnit, final int limit) {
213        Validate.inclusiveBetween(1, Long.MAX_VALUE, timePeriod, "Time period must be greater than 0!");
214
215        period = timePeriod;
216        unit = timeUnit;
217
218        if (service != null) {
219            executorService = service;
220            ownExecutor = false;
221        } else {
222            final ScheduledThreadPoolExecutor s = new ScheduledThreadPoolExecutor(
223                    THREAD_POOL_SIZE);
224            s.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
225            s.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
226            executorService = s;
227            ownExecutor = true;
228        }
229
230        setLimit(limit);
231    }
232
233    /**
234     * Acquires a permit from this semaphore. This method will block if
235     * the limit for the current period has already been reached. If
236     * {@link #shutdown()} has already been invoked, calling this method will
237     * cause an exception. The very first call of this method starts the timer
238     * task which monitors the time period set for this {@link TimedSemaphore}.
239     * From now on the semaphore is active.
240     *
241     * @throws InterruptedException if the thread gets interrupted
242     * @throws IllegalStateException if this semaphore is already shut down
243     */
244    public synchronized void acquire() throws InterruptedException {
245        prepareAcquire();
246
247        boolean canPass;
248        do {
249            canPass = acquirePermit();
250            if (!canPass) {
251                wait();
252            }
253        } while (!canPass);
254    }
255
256    /**
257     * Internal helper method for acquiring a permit. This method checks whether currently
258     * a permit can be acquired and - if so - increases the internal counter. The return
259     * value indicates whether a permit could be acquired. This method must be called with
260     * the lock of this object held.
261     *
262     * @return a flag whether a permit could be acquired
263     */
264    private boolean acquirePermit() {
265        if (getLimit() <= NO_LIMIT || acquireCount < getLimit()) {
266            acquireCount++;
267            return true;
268        }
269        return false;
270    }
271
272    /**
273     * The current time period is finished. This method is called by the timer
274     * used internally to monitor the time period. It resets the counter and
275     * releases the threads waiting for this barrier.
276     */
277    synchronized void endOfPeriod() {
278        lastCallsPerPeriod = acquireCount;
279        totalAcquireCount += acquireCount;
280        periodCount++;
281        acquireCount = 0;
282        notifyAll();
283    }
284
285    /**
286     * Returns the number of invocations of the {@link #acquire()} method for
287     * the current period. This may be useful for testing or debugging purposes.
288     *
289     * @return the current number of {@link #acquire()} invocations
290     */
291    public synchronized int getAcquireCount() {
292        return acquireCount;
293    }
294
295    /**
296     * Returns the number of calls to the {@link #acquire()} method that can
297     * still be performed in the current period without blocking. This method
298     * can give an indication whether it is safe to call the {@link #acquire()}
299     * method without risking to be suspended. However, there is no guarantee
300     * that a subsequent call to {@link #acquire()} actually is not-blocking
301     * because in the meantime other threads may have invoked the semaphore.
302     *
303     * @return the current number of available {@link #acquire()} calls in the
304     * current period
305     */
306    public synchronized int getAvailablePermits() {
307        return getLimit() - getAcquireCount();
308    }
309
310    /**
311     * Returns the average number of successful (i.e. non-blocking)
312     * {@link #acquire()} invocations for the entire life-time of this {@code
313     * TimedSemaphore}. This method can be used for instance for statistical
314     * calculations.
315     *
316     * @return the average number of {@link #acquire()} invocations per time
317     * unit
318     */
319    public synchronized double getAverageCallsPerPeriod() {
320        return periodCount == 0 ? 0 : (double) totalAcquireCount
321                / (double) periodCount;
322    }
323
324    /**
325     * Returns the executor service used by this instance.
326     *
327     * @return the executor service
328     */
329    protected ScheduledExecutorService getExecutorService() {
330        return executorService;
331    }
332
333    /**
334     * Returns the number of (successful) acquire invocations during the last
335     * period. This is the number of times the {@link #acquire()} method was
336     * called without blocking. This can be useful for testing or debugging
337     * purposes or to determine a meaningful threshold value. If a limit is set,
338     * the value returned by this method won't be greater than this limit.
339     *
340     * @return the number of non-blocking invocations of the {@link #acquire()}
341     * method
342     */
343    public synchronized int getLastAcquiresPerPeriod() {
344        return lastCallsPerPeriod;
345    }
346
347    /**
348     * Returns the limit enforced by this semaphore. The limit determines how
349     * many invocations of {@link #acquire()} are allowed within the monitored
350     * period.
351     *
352     * @return the limit
353     */
354    public final synchronized int getLimit() {
355        return limit;
356    }
357
358    /**
359     * Returns the time period. This is the time monitored by this semaphore.
360     * Only a given number of invocations of the {@link #acquire()} method is
361     * possible in this period.
362     *
363     * @return the time period
364     */
365    public long getPeriod() {
366        return period;
367    }
368
369    /**
370     * Returns the time unit. This is the unit used by {@link #getPeriod()}.
371     *
372     * @return the time unit
373     */
374    public TimeUnit getUnit() {
375        return unit;
376    }
377
378    /**
379     * Tests whether the {@link #shutdown()} method has been called on this
380     * object. If this method returns <b>true</b>, this instance cannot be used
381     * any longer.
382     *
383     * @return a flag whether a shutdown has been performed
384     */
385    public synchronized boolean isShutdown() {
386        return shutdown;
387    }
388
389    /**
390     * Prepares an acquire operation. Checks for the current state and starts the internal
391     * timer if necessary. This method must be called with the lock of this object held.
392     */
393    private void prepareAcquire() {
394        if (isShutdown()) {
395            throw new IllegalStateException("TimedSemaphore is shut down!");
396        }
397
398        if (task == null) {
399            task = startTimer();
400        }
401    }
402
403    /**
404     * Sets the limit. This is the number of times the {@link #acquire()} method
405     * can be called within the time period specified. If this limit is reached,
406     * further invocations of {@link #acquire()} will block. Setting the limit
407     * to a value &lt;= {@link #NO_LIMIT} will cause the limit to be disabled,
408     * i.e. an arbitrary number of{@link #acquire()} invocations is allowed in
409     * the time period.
410     *
411     * @param limit the limit
412     */
413    public final synchronized void setLimit(final int limit) {
414        this.limit = limit;
415    }
416
417    /**
418     * Initializes a shutdown. After that the object cannot be used anymore.
419     * This method can be invoked an arbitrary number of times. All invocations
420     * after the first one do not have any effect.
421     */
422    public synchronized void shutdown() {
423        if (!shutdown) {
424
425            if (ownExecutor) {
426                // if the executor was created by this instance, it has
427                // to be shutdown
428                getExecutorService().shutdownNow();
429            }
430            if (task != null) {
431                task.cancel(false);
432            }
433
434            shutdown = true;
435        }
436    }
437
438    /**
439     * Starts the timer. This method is called when {@link #acquire()} is called
440     * for the first time. It schedules a task to be executed at fixed rate to
441     * monitor the time period specified.
442     *
443     * @return a future object representing the task scheduled
444     */
445    protected ScheduledFuture<?> startTimer() {
446        return getExecutorService().scheduleAtFixedRate(this::endOfPeriod, getPeriod(), getPeriod(), getUnit());
447    }
448
449    /**
450     * Tries to acquire a permit from this semaphore. If the limit of this semaphore has
451     * not yet been reached, a permit is acquired, and this method returns
452     * <strong>true</strong>. Otherwise, this method returns immediately with the result
453     * <strong>false</strong>.
454     *
455     * @return <strong>true</strong> if a permit could be acquired; <strong>false</strong>
456     * otherwise
457     * @throws IllegalStateException if this semaphore is already shut down
458     * @since 3.5
459     */
460    public synchronized boolean tryAcquire() {
461        prepareAcquire();
462        return acquirePermit();
463    }
464}