001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.commons.lang3.concurrent;
018
019import java.util.concurrent.ScheduledExecutorService;
020import java.util.concurrent.ScheduledFuture;
021import java.util.concurrent.ScheduledThreadPoolExecutor;
022import java.util.concurrent.TimeUnit;
023
024/**
025 * <p>
026 * A specialized <em>semaphore</em> implementation that provides a number of
027 * permits in a given time frame.
028 * </p>
029 * <p>
030 * This class is similar to the {@code java.util.concurrent.Semaphore} class
031 * provided by the JDK in that it manages a configurable number of permits.
032 * Using the {@link #acquire()} method a permit can be requested by a thread.
033 * However, there is an additional timing dimension: there is no {@code
034 * release()} method for freeing a permit, but all permits are automatically
035 * released at the end of a configurable time frame. If a thread calls
036 * {@link #acquire()} and the available permits are already exhausted for this
037 * time frame, the thread is blocked. When the time frame ends all permits
038 * requested so far are restored, and blocking threads are waked up again, so
039 * that they can try to acquire a new permit. This basically means that in the
040 * specified time frame only the given number of operations is possible.
041 * </p>
042 * <p>
043 * A use case for this class is to artificially limit the load produced by a
044 * process. As an example consider an application that issues database queries
045 * on a production system in a background process to gather statistical
046 * information. This background processing should not produce so much database
047 * load that the functionality and the performance of the production system are
048 * impacted. Here a {@code TimedSemaphore} could be installed to guarantee that
049 * only a given number of database queries are issued per second.
050 * </p>
051 * <p>
052 * A thread class for performing database queries could look as follows:
053 * </p>
054 *
055 * <pre>
056 * public class StatisticsThread extends Thread {
057 *     // The semaphore for limiting database load.
058 *     private final TimedSemaphore semaphore;
059 *     // Create an instance and set the semaphore
060 *     public StatisticsThread(TimedSemaphore timedSemaphore) {
061 *         semaphore = timedSemaphore;
062 *     }
063 *     // Gather statistics
064 *     public void run() {
065 *         try {
066 *             while(true) {
067 *                 semaphore.acquire();   // limit database load
068 *                 performQuery();        // issue a query
069 *             }
070 *         } catch(InterruptedException) {
071 *             // fall through
072 *         }
073 *     }
074 *     ...
075 * }
076 * </pre>
077 *
078 * <p>
079 * The following code fragment shows how a {@code TimedSemaphore} is created
080 * that allows only 10 operations per second and passed to the statistics
081 * thread:
082 * </p>
083 *
084 * <pre>
085 * TimedSemaphore sem = new TimedSemaphore(1, TimeUnit.SECOND, 10);
086 * StatisticsThread thread = new StatisticsThread(sem);
087 * thread.start();
088 * </pre>
089 *
090 * <p>
091 * When creating an instance the time period for the semaphore must be
092 * specified. {@code TimedSemaphore} uses an executor service with a
093 * corresponding period to monitor this interval. The {@code
094 * ScheduledExecutorService} to be used for this purpose can be provided at
095 * construction time. Alternatively the class creates an internal executor
096 * service.
097 * </p>
098 * <p>
099 * Client code that uses {@code TimedSemaphore} has to call the
100 * {@link #acquire()} method in aach processing step. {@code TimedSemaphore}
101 * keeps track of the number of invocations of the {@link #acquire()} method and
102 * blocks the calling thread if the counter exceeds the limit specified. When
103 * the timer signals the end of the time period the counter is reset and all
104 * waiting threads are released. Then another cycle can start.
105 * </p>
106 * <p>
107 * It is possible to modify the limit at any time using the
108 * {@link #setLimit(int)} method. This is useful if the load produced by an
109 * operation has to be adapted dynamically. In the example scenario with the
110 * thread collecting statistics it may make sense to specify a low limit during
111 * day time while allowing a higher load in the night time. Reducing the limit
112 * takes effect immediately by blocking incoming callers. If the limit is
113 * increased, waiting threads are not released immediately, but wake up when the
114 * timer runs out. Then, in the next period more processing steps can be
115 * performed without blocking. By setting the limit to 0 the semaphore can be
116 * switched off: in this mode the {@link #acquire()} method never blocks, but
117 * lets all callers pass directly.
118 * </p>
119 * <p>
120 * When the {@code TimedSemaphore} is no more needed its {@link #shutdown()}
121 * method should be called. This causes the periodic task that monitors the time
122 * interval to be canceled. If the {@code ScheduledExecutorService} has been
123 * created by the semaphore at construction time, it is also shut down.
124 * resources. After that {@link #acquire()} must not be called any more.
125 * </p>
126 *
127 * @since 3.0
128 * @version $Id: TimedSemaphore.java 1583482 2014-03-31 22:54:57Z niallp $
129 */
130public class TimedSemaphore {
131    /**
132     * Constant for a value representing no limit. If the limit is set to a
133     * value less or equal this constant, the {@code TimedSemaphore} will be
134     * effectively switched off.
135     */
136    public static final int NO_LIMIT = 0;
137
138    /** Constant for the thread pool size for the executor. */
139    private static final int THREAD_POOL_SIZE = 1;
140
141    /** The executor service for managing the timer thread. */
142    private final ScheduledExecutorService executorService;
143
144    /** Stores the period for this timed semaphore. */
145    private final long period;
146
147    /** The time unit for the period. */
148    private final TimeUnit unit;
149
150    /** A flag whether the executor service was created by this object. */
151    private final boolean ownExecutor;
152
153    /** A future object representing the timer task. */
154    private ScheduledFuture<?> task; // @GuardedBy("this")
155
156    /** Stores the total number of invocations of the acquire() method. */
157    private long totalAcquireCount; // @GuardedBy("this")
158
159    /**
160     * The counter for the periods. This counter is increased every time a
161     * period ends.
162     */
163    private long periodCount; // @GuardedBy("this")
164
165    /** The limit. */
166    private int limit; // @GuardedBy("this")
167
168    /** The current counter. */
169    private int acquireCount;  // @GuardedBy("this")
170
171    /** The number of invocations of acquire() in the last period. */
172    private int lastCallsPerPeriod; // @GuardedBy("this")
173
174    /** A flag whether shutdown() was called. */
175    private boolean shutdown;  // @GuardedBy("this")
176
177    /**
178     * Creates a new instance of {@link TimedSemaphore} and initializes it with
179     * the given time period and the limit.
180     *
181     * @param timePeriod the time period
182     * @param timeUnit the unit for the period
183     * @param limit the limit for the semaphore
184     * @throws IllegalArgumentException if the period is less or equals 0
185     */
186    public TimedSemaphore(final long timePeriod, final TimeUnit timeUnit, final int limit) {
187        this(null, timePeriod, timeUnit, limit);
188    }
189
190    /**
191     * Creates a new instance of {@link TimedSemaphore} and initializes it with
192     * an executor service, the given time period, and the limit. The executor
193     * service will be used for creating a periodic task for monitoring the time
194     * period. It can be <b>null</b>, then a default service will be created.
195     *
196     * @param service the executor service
197     * @param timePeriod the time period
198     * @param timeUnit the unit for the period
199     * @param limit the limit for the semaphore
200     * @throws IllegalArgumentException if the period is less or equals 0
201     */
202    public TimedSemaphore(final ScheduledExecutorService service, final long timePeriod,
203            final TimeUnit timeUnit, final int limit) {
204        if (timePeriod <= 0) {
205            throw new IllegalArgumentException("Time period must be greater 0!");
206        }
207
208        period = timePeriod;
209        unit = timeUnit;
210
211        if (service != null) {
212            executorService = service;
213            ownExecutor = false;
214        } else {
215            final ScheduledThreadPoolExecutor s = new ScheduledThreadPoolExecutor(
216                    THREAD_POOL_SIZE);
217            s.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
218            s.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
219            executorService = s;
220            ownExecutor = true;
221        }
222
223        setLimit(limit);
224    }
225
226    /**
227     * Returns the limit enforced by this semaphore. The limit determines how
228     * many invocations of {@link #acquire()} are allowed within the monitored
229     * period.
230     *
231     * @return the limit
232     */
233    public final synchronized int getLimit() {
234        return limit;
235    }
236
237    /**
238     * Sets the limit. This is the number of times the {@link #acquire()} method
239     * can be called within the time period specified. If this limit is reached,
240     * further invocations of {@link #acquire()} will block. Setting the limit
241     * to a value &lt;= {@link #NO_LIMIT} will cause the limit to be disabled,
242     * i.e. an arbitrary number of{@link #acquire()} invocations is allowed in
243     * the time period.
244     *
245     * @param limit the limit
246     */
247    public final synchronized void setLimit(final int limit) {
248        this.limit = limit;
249    }
250
251    /**
252     * Initializes a shutdown. After that the object cannot be used any more.
253     * This method can be invoked an arbitrary number of times. All invocations
254     * after the first one do not have any effect.
255     */
256    public synchronized void shutdown() {
257        if (!shutdown) {
258
259            if (ownExecutor) {
260                // if the executor was created by this instance, it has
261                // to be shutdown
262                getExecutorService().shutdownNow();
263            }
264            if (task != null) {
265                task.cancel(false);
266            }
267
268            shutdown = true;
269        }
270    }
271
272    /**
273     * Tests whether the {@link #shutdown()} method has been called on this
274     * object. If this method returns <b>true</b>, this instance cannot be used
275     * any longer.
276     *
277     * @return a flag whether a shutdown has been performed
278     */
279    public synchronized boolean isShutdown() {
280        return shutdown;
281    }
282
283    /**
284     * Tries to acquire a permit from this semaphore. This method will block if
285     * the limit for the current period has already been reached. If
286     * {@link #shutdown()} has already been invoked, calling this method will
287     * cause an exception. The very first call of this method starts the timer
288     * task which monitors the time period set for this {@code TimedSemaphore}.
289     * From now on the semaphore is active.
290     *
291     * @throws InterruptedException if the thread gets interrupted
292     * @throws IllegalStateException if this semaphore is already shut down
293     */
294    public synchronized void acquire() throws InterruptedException {
295        if (isShutdown()) {
296            throw new IllegalStateException("TimedSemaphore is shut down!");
297        }
298
299        if (task == null) {
300            task = startTimer();
301        }
302
303        boolean canPass = false;
304        do {
305            canPass = getLimit() <= NO_LIMIT || acquireCount < getLimit();
306            if (!canPass) {
307                wait();
308            } else {
309                acquireCount++;
310            }
311        } while (!canPass);
312    }
313
314    /**
315     * Returns the number of (successful) acquire invocations during the last
316     * period. This is the number of times the {@link #acquire()} method was
317     * called without blocking. This can be useful for testing or debugging
318     * purposes or to determine a meaningful threshold value. If a limit is set,
319     * the value returned by this method won't be greater than this limit.
320     *
321     * @return the number of non-blocking invocations of the {@link #acquire()}
322     * method
323     */
324    public synchronized int getLastAcquiresPerPeriod() {
325        return lastCallsPerPeriod;
326    }
327
328    /**
329     * Returns the number of invocations of the {@link #acquire()} method for
330     * the current period. This may be useful for testing or debugging purposes.
331     *
332     * @return the current number of {@link #acquire()} invocations
333     */
334    public synchronized int getAcquireCount() {
335        return acquireCount;
336    }
337
338    /**
339     * Returns the number of calls to the {@link #acquire()} method that can
340     * still be performed in the current period without blocking. This method
341     * can give an indication whether it is safe to call the {@link #acquire()}
342     * method without risking to be suspended. However, there is no guarantee
343     * that a subsequent call to {@link #acquire()} actually is not-blocking
344     * because in the mean time other threads may have invoked the semaphore.
345     *
346     * @return the current number of available {@link #acquire()} calls in the
347     * current period
348     */
349    public synchronized int getAvailablePermits() {
350        return getLimit() - getAcquireCount();
351    }
352
353    /**
354     * Returns the average number of successful (i.e. non-blocking)
355     * {@link #acquire()} invocations for the entire life-time of this {@code
356     * TimedSemaphore}. This method can be used for instance for statistical
357     * calculations.
358     *
359     * @return the average number of {@link #acquire()} invocations per time
360     * unit
361     */
362    public synchronized double getAverageCallsPerPeriod() {
363        return periodCount == 0 ? 0 : (double) totalAcquireCount
364                / (double) periodCount;
365    }
366
367    /**
368     * Returns the time period. This is the time monitored by this semaphore.
369     * Only a given number of invocations of the {@link #acquire()} method is
370     * possible in this period.
371     *
372     * @return the time period
373     */
374    public long getPeriod() {
375        return period;
376    }
377
378    /**
379     * Returns the time unit. This is the unit used by {@link #getPeriod()}.
380     *
381     * @return the time unit
382     */
383    public TimeUnit getUnit() {
384        return unit;
385    }
386
387    /**
388     * Returns the executor service used by this instance.
389     *
390     * @return the executor service
391     */
392    protected ScheduledExecutorService getExecutorService() {
393        return executorService;
394    }
395
396    /**
397     * Starts the timer. This method is called when {@link #acquire()} is called
398     * for the first time. It schedules a task to be executed at fixed rate to
399     * monitor the time period specified.
400     *
401     * @return a future object representing the task scheduled
402     */
403    protected ScheduledFuture<?> startTimer() {
404        return getExecutorService().scheduleAtFixedRate(new Runnable() {
405            @Override
406            public void run() {
407                endOfPeriod();
408            }
409        }, getPeriod(), getPeriod(), getUnit());
410    }
411
412    /**
413     * The current time period is finished. This method is called by the timer
414     * used internally to monitor the time period. It resets the counter and
415     * releases the threads waiting for this barrier.
416     */
417    synchronized void endOfPeriod() {
418        lastCallsPerPeriod = acquireCount;
419        totalAcquireCount += acquireCount;
420        periodCount++;
421        acquireCount = 0;
422        notifyAll();
423    }
424}