001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.commons.lang3.concurrent;
018    
019    import java.util.concurrent.ScheduledExecutorService;
020    import java.util.concurrent.ScheduledFuture;
021    import java.util.concurrent.ScheduledThreadPoolExecutor;
022    import java.util.concurrent.TimeUnit;
023    
024    /**
025     * <p>
026     * A specialized <em>semaphore</em> implementation that provides a number of
027     * permits in a given time frame.
028     * </p>
029     * <p>
030     * This class is similar to the {@code java.util.concurrent.Semaphore} class
031     * provided by the JDK in that it manages a configurable number of permits.
032     * Using the {@link #acquire()} method a permit can be requested by a thread.
033     * However, there is an additional timing dimension: there is no {@code
034     * release()} method for freeing a permit, but all permits are automatically
035     * released at the end of a configurable time frame. If a thread calls
036     * {@link #acquire()} and the available permits are already exhausted for this
037     * time frame, the thread is blocked. When the time frame ends all permits
038     * requested so far are restored, and blocking threads are waked up again, so
039     * that they can try to acquire a new permit. This basically means that in the
040     * specified time frame only the given number of operations is possible.
041     * </p>
042     * <p>
043     * A use case for this class is to artificially limit the load produced by a
044     * process. As an example consider an application that issues database queries
045     * on a production system in a background process to gather statistical
046     * information. This background processing should not produce so much database
047     * load that the functionality and the performance of the production system are
048     * impacted. Here a {@code TimedSemaphore} could be installed to guarantee that
049     * only a given number of database queries are issued per second.
050     * </p>
051     * <p>
052     * A thread class for performing database queries could look as follows:
053     *
054     * <pre>
055     * public class StatisticsThread extends Thread {
056     *     // The semaphore for limiting database load.
057     *     private final TimedSemaphore semaphore;
058     *     // Create an instance and set the semaphore
059     *     public StatisticsThread(TimedSemaphore timedSemaphore) {
060     *         semaphore = timedSemaphore;
061     *     }
062     *     // Gather statistics
063     *     public void run() {
064     *         try {
065     *             while(true) {
066     *                 semaphore.acquire();   // limit database load
067     *                 performQuery();        // issue a query
068     *             }
069     *         } catch(InterruptedException) {
070     *             // fall through
071     *         }
072     *     }
073     *     ...
074     * }
075     * </pre>
076     *
077     * The following code fragment shows how a {@code TimedSemaphore} is created
078     * that allows only 10 operations per second and passed to the statistics
079     * thread:
080     *
081     * <pre>
082     * TimedSemaphore sem = new TimedSemaphore(1, TimeUnit.SECOND, 10);
083     * StatisticsThread thread = new StatisticsThread(sem);
084     * thread.start();
085     * </pre>
086     *
087     * </p>
088     * <p>
089     * When creating an instance the time period for the semaphore must be
090     * specified. {@code TimedSemaphore} uses an executor service with a
091     * corresponding period to monitor this interval. The {@code
092     * ScheduledExecutorService} to be used for this purpose can be provided at
093     * construction time. Alternatively the class creates an internal executor
094     * service.
095     * </p>
096     * <p>
097     * Client code that uses {@code TimedSemaphore} has to call the
098     * {@link #acquire()} method in aach processing step. {@code TimedSemaphore}
099     * keeps track of the number of invocations of the {@link #acquire()} method and
100     * blocks the calling thread if the counter exceeds the limit specified. When
101     * the timer signals the end of the time period the counter is reset and all
102     * waiting threads are released. Then another cycle can start.
103     * </p>
104     * <p>
105     * It is possible to modify the limit at any time using the
106     * {@link #setLimit(int)} method. This is useful if the load produced by an
107     * operation has to be adapted dynamically. In the example scenario with the
108     * thread collecting statistics it may make sense to specify a low limit during
109     * day time while allowing a higher load in the night time. Reducing the limit
110     * takes effect immediately by blocking incoming callers. If the limit is
111     * increased, waiting threads are not released immediately, but wake up when the
112     * timer runs out. Then, in the next period more processing steps can be
113     * performed without blocking. By setting the limit to 0 the semaphore can be
114     * switched off: in this mode the {@link #acquire()} method never blocks, but
115     * lets all callers pass directly.
116     * </p>
117     * <p>
118     * When the {@code TimedSemaphore} is no more needed its {@link #shutdown()}
119     * method should be called. This causes the periodic task that monitors the time
120     * interval to be canceled. If the {@code ScheduledExecutorService} has been
121     * created by the semaphore at construction time, it is also shut down.
122     * resources. After that {@link #acquire()} must not be called any more.
123     * </p>
124     *
125     * @since 3.0
126     * @version $Id: TimedSemaphore.java 1199894 2011-11-09 17:53:59Z ggregory $
127     */
128    public class TimedSemaphore {
129        /**
130         * Constant for a value representing no limit. If the limit is set to a
131         * value less or equal this constant, the {@code TimedSemaphore} will be
132         * effectively switched off.
133         */
134        public static final int NO_LIMIT = 0;
135    
136        /** Constant for the thread pool size for the executor. */
137        private static final int THREAD_POOL_SIZE = 1;
138    
139        /** The executor service for managing the timer thread. */
140        private final ScheduledExecutorService executorService;
141    
142        /** Stores the period for this timed semaphore. */
143        private final long period;
144    
145        /** The time unit for the period. */
146        private final TimeUnit unit;
147    
148        /** A flag whether the executor service was created by this object. */
149        private final boolean ownExecutor;
150    
151        /** A future object representing the timer task. */
152        private ScheduledFuture<?> task;
153    
154        /** Stores the total number of invocations of the acquire() method. */
155        private long totalAcquireCount;
156    
157        /**
158         * The counter for the periods. This counter is increased every time a
159         * period ends.
160         */
161        private long periodCount;
162    
163        /** The limit. */
164        private int limit;
165    
166        /** The current counter. */
167        private int acquireCount;
168    
169        /** The number of invocations of acquire() in the last period. */
170        private int lastCallsPerPeriod;
171    
172        /** A flag whether shutdown() was called. */
173        private boolean shutdown;
174    
175        /**
176         * Creates a new instance of {@link TimedSemaphore} and initializes it with
177         * the given time period and the limit.
178         *
179         * @param timePeriod the time period
180         * @param timeUnit the unit for the period
181         * @param limit the limit for the semaphore
182         * @throws IllegalArgumentException if the period is less or equals 0
183         */
184        public TimedSemaphore(long timePeriod, TimeUnit timeUnit, int limit) {
185            this(null, timePeriod, timeUnit, limit);
186        }
187    
188        /**
189         * Creates a new instance of {@link TimedSemaphore} and initializes it with
190         * an executor service, the given time period, and the limit. The executor
191         * service will be used for creating a periodic task for monitoring the time
192         * period. It can be <b>null</b>, then a default service will be created.
193         *
194         * @param service the executor service
195         * @param timePeriod the time period
196         * @param timeUnit the unit for the period
197         * @param limit the limit for the semaphore
198         * @throws IllegalArgumentException if the period is less or equals 0
199         */
200        public TimedSemaphore(ScheduledExecutorService service, long timePeriod,
201                TimeUnit timeUnit, int limit) {
202            if (timePeriod <= 0) {
203                throw new IllegalArgumentException("Time period must be greater 0!");
204            }
205    
206            period = timePeriod;
207            unit = timeUnit;
208    
209            if (service != null) {
210                executorService = service;
211                ownExecutor = false;
212            } else {
213                ScheduledThreadPoolExecutor s = new ScheduledThreadPoolExecutor(
214                        THREAD_POOL_SIZE);
215                s.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
216                s.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
217                executorService = s;
218                ownExecutor = true;
219            }
220    
221            setLimit(limit);
222        }
223    
224        /**
225         * Returns the limit enforced by this semaphore. The limit determines how
226         * many invocations of {@link #acquire()} are allowed within the monitored
227         * period.
228         *
229         * @return the limit
230         */
231        public final synchronized int getLimit() {
232            return limit;
233        }
234    
235        /**
236         * Sets the limit. This is the number of times the {@link #acquire()} method
237         * can be called within the time period specified. If this limit is reached,
238         * further invocations of {@link #acquire()} will block. Setting the limit
239         * to a value &lt;= {@link #NO_LIMIT} will cause the limit to be disabled,
240         * i.e. an arbitrary number of{@link #acquire()} invocations is allowed in
241         * the time period.
242         *
243         * @param limit the limit
244         */
245        public final synchronized void setLimit(int limit) {
246            this.limit = limit;
247        }
248    
249        /**
250         * Initializes a shutdown. After that the object cannot be used any more.
251         * This method can be invoked an arbitrary number of times. All invocations
252         * after the first one do not have any effect.
253         */
254        public synchronized void shutdown() {
255            if (!shutdown) {
256    
257                if (ownExecutor) {
258                    // if the executor was created by this instance, it has
259                    // to be shutdown
260                    getExecutorService().shutdownNow();
261                }
262                if (task != null) {
263                    task.cancel(false);
264                }
265    
266                shutdown = true;
267            }
268        }
269    
270        /**
271         * Tests whether the {@link #shutdown()} method has been called on this
272         * object. If this method returns <b>true</b>, this instance cannot be used
273         * any longer.
274         *
275         * @return a flag whether a shutdown has been performed
276         */
277        public synchronized boolean isShutdown() {
278            return shutdown;
279        }
280    
281        /**
282         * Tries to acquire a permit from this semaphore. This method will block if
283         * the limit for the current period has already been reached. If
284         * {@link #shutdown()} has already been invoked, calling this method will
285         * cause an exception. The very first call of this method starts the timer
286         * task which monitors the time period set for this {@code TimedSemaphore}.
287         * From now on the semaphore is active.
288         *
289         * @throws InterruptedException if the thread gets interrupted
290         * @throws IllegalStateException if this semaphore is already shut down
291         */
292        public synchronized void acquire() throws InterruptedException {
293            if (isShutdown()) {
294                throw new IllegalStateException("TimedSemaphore is shut down!");
295            }
296    
297            if (task == null) {
298                task = startTimer();
299            }
300    
301            boolean canPass = false;
302            do {
303                canPass = getLimit() <= NO_LIMIT || acquireCount < getLimit();
304                if (!canPass) {
305                    wait();
306                } else {
307                    acquireCount++;
308                }
309            } while (!canPass);
310        }
311    
312        /**
313         * Returns the number of (successful) acquire invocations during the last
314         * period. This is the number of times the {@link #acquire()} method was
315         * called without blocking. This can be useful for testing or debugging
316         * purposes or to determine a meaningful threshold value. If a limit is set,
317         * the value returned by this method won't be greater than this limit.
318         *
319         * @return the number of non-blocking invocations of the {@link #acquire()}
320         * method
321         */
322        public synchronized int getLastAcquiresPerPeriod() {
323            return lastCallsPerPeriod;
324        }
325    
326        /**
327         * Returns the number of invocations of the {@link #acquire()} method for
328         * the current period. This may be useful for testing or debugging purposes.
329         *
330         * @return the current number of {@link #acquire()} invocations
331         */
332        public synchronized int getAcquireCount() {
333            return acquireCount;
334        }
335    
336        /**
337         * Returns the number of calls to the {@link #acquire()} method that can
338         * still be performed in the current period without blocking. This method
339         * can give an indication whether it is safe to call the {@link #acquire()}
340         * method without risking to be suspended. However, there is no guarantee
341         * that a subsequent call to {@link #acquire()} actually is not-blocking
342         * because in the mean time other threads may have invoked the semaphore.
343         *
344         * @return the current number of available {@link #acquire()} calls in the
345         * current period
346         */
347        public synchronized int getAvailablePermits() {
348            return getLimit() - getAcquireCount();
349        }
350    
351        /**
352         * Returns the average number of successful (i.e. non-blocking)
353         * {@link #acquire()} invocations for the entire life-time of this {@code
354         * TimedSemaphore}. This method can be used for instance for statistical
355         * calculations.
356         *
357         * @return the average number of {@link #acquire()} invocations per time
358         * unit
359         */
360        public synchronized double getAverageCallsPerPeriod() {
361            return periodCount == 0 ? 0 : (double) totalAcquireCount
362                    / (double) periodCount;
363        }
364    
365        /**
366         * Returns the time period. This is the time monitored by this semaphore.
367         * Only a given number of invocations of the {@link #acquire()} method is
368         * possible in this period.
369         *
370         * @return the time period
371         */
372        public long getPeriod() {
373            return period;
374        }
375    
376        /**
377         * Returns the time unit. This is the unit used by {@link #getPeriod()}.
378         *
379         * @return the time unit
380         */
381        public TimeUnit getUnit() {
382            return unit;
383        }
384    
385        /**
386         * Returns the executor service used by this instance.
387         *
388         * @return the executor service
389         */
390        protected ScheduledExecutorService getExecutorService() {
391            return executorService;
392        }
393    
394        /**
395         * Starts the timer. This method is called when {@link #acquire()} is called
396         * for the first time. It schedules a task to be executed at fixed rate to
397         * monitor the time period specified.
398         *
399         * @return a future object representing the task scheduled
400         */
401        protected ScheduledFuture<?> startTimer() {
402            return getExecutorService().scheduleAtFixedRate(new Runnable() {
403                public void run() {
404                    endOfPeriod();
405                }
406            }, getPeriod(), getPeriod(), getUnit());
407        }
408    
409        /**
410         * The current time period is finished. This method is called by the timer
411         * used internally to monitor the time period. It resets the counter and
412         * releases the threads waiting for this barrier.
413         */
414        synchronized void endOfPeriod() {
415            lastCallsPerPeriod = acquireCount;
416            totalAcquireCount += acquireCount;
417            periodCount++;
418            acquireCount = 0;
419            notifyAll();
420        }
421    }