View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.commons.lang3.concurrent;
18  
19  import org.apache.commons.lang3.Validate;
20  
21  import java.util.concurrent.ScheduledExecutorService;
22  import java.util.concurrent.ScheduledFuture;
23  import java.util.concurrent.ScheduledThreadPoolExecutor;
24  import java.util.concurrent.TimeUnit;
25  
26  /**
27   * <p>
28   * A specialized <em>semaphore</em> implementation that provides a number of
29   * permits in a given time frame.
30   * </p>
31   * <p>
32   * This class is similar to the {@code java.util.concurrent.Semaphore} class
33   * provided by the JDK in that it manages a configurable number of permits.
34   * Using the {@link #acquire()} method a permit can be requested by a thread.
35   * However, there is an additional timing dimension: there is no {@code
36   * release()} method for freeing a permit, but all permits are automatically
37   * released at the end of a configurable time frame. If a thread calls
38   * {@link #acquire()} and the available permits are already exhausted for this
39   * time frame, the thread is blocked. When the time frame ends all permits
40   * requested so far are restored, and blocking threads are waked up again, so
41   * that they can try to acquire a new permit. This basically means that in the
42   * specified time frame only the given number of operations is possible.
43   * </p>
44   * <p>
45   * A use case for this class is to artificially limit the load produced by a
46   * process. As an example consider an application that issues database queries
47   * on a production system in a background process to gather statistical
48   * information. This background processing should not produce so much database
49   * load that the functionality and the performance of the production system are
50   * impacted. Here a {@code TimedSemaphore} could be installed to guarantee that
51   * only a given number of database queries are issued per second.
52   * </p>
53   * <p>
54   * A thread class for performing database queries could look as follows:
55   * </p>
56   *
57   * <pre>
58   * public class StatisticsThread extends Thread {
59   *     // The semaphore for limiting database load.
60   *     private final TimedSemaphore semaphore;
61   *     // Create an instance and set the semaphore
62   *     public StatisticsThread(TimedSemaphore timedSemaphore) {
63   *         semaphore = timedSemaphore;
64   *     }
65   *     // Gather statistics
66   *     public void run() {
67   *         try {
68   *             while(true) {
69   *                 semaphore.acquire();   // limit database load
70   *                 performQuery();        // issue a query
71   *             }
72   *         } catch(InterruptedException) {
73   *             // fall through
74   *         }
75   *     }
76   *     ...
77   * }
78   * </pre>
79   *
80   * <p>
81   * The following code fragment shows how a {@code TimedSemaphore} is created
82   * that allows only 10 operations per second and passed to the statistics
83   * thread:
84   * </p>
85   *
86   * <pre>
87   * TimedSemaphore sem = new TimedSemaphore(1, TimeUnit.SECOND, 10);
88   * StatisticsThread thread = new StatisticsThread(sem);
89   * thread.start();
90   * </pre>
91   *
92   * <p>
93   * When creating an instance the time period for the semaphore must be
94   * specified. {@code TimedSemaphore} uses an executor service with a
95   * corresponding period to monitor this interval. The {@code
96   * ScheduledExecutorService} to be used for this purpose can be provided at
97   * construction time. Alternatively the class creates an internal executor
98   * service.
99   * </p>
100  * <p>
101  * Client code that uses {@code TimedSemaphore} has to call the
102  * {@link #acquire()} method in each processing step. {@code TimedSemaphore}
103  * keeps track of the number of invocations of the {@link #acquire()} method and
104  * blocks the calling thread if the counter exceeds the limit specified. When
105  * the timer signals the end of the time period the counter is reset and all
106  * waiting threads are released. Then another cycle can start.
107  * </p>
108  * <p>
109  * An alternative to {@code acquire()} is the {@link #tryAcquire()} method. This
110  * method checks whether the semaphore is under the specified limit and
111  * increases the internal counter if this is the case. The return value is then
112  * <strong>true</strong>, and the calling thread can continue with its action.
113  * If the semaphore is already at its limit, {@code tryAcquire()} immediately
114  * returns <strong>false</strong> without blocking; the calling thread must
115  * then abort its action. This usage scenario prevents blocking of threads.
116  * </p>
117  * <p>
118  * It is possible to modify the limit at any time using the
119  * {@link #setLimit(int)} method. This is useful if the load produced by an
120  * operation has to be adapted dynamically. In the example scenario with the
121  * thread collecting statistics it may make sense to specify a low limit during
122  * day time while allowing a higher load in the night time. Reducing the limit
123  * takes effect immediately by blocking incoming callers. If the limit is
124  * increased, waiting threads are not released immediately, but wake up when the
125  * timer runs out. Then, in the next period more processing steps can be
126  * performed without blocking. By setting the limit to 0 the semaphore can be
127  * switched off: in this mode the {@link #acquire()} method never blocks, but
128  * lets all callers pass directly.
129  * </p>
130  * <p>
131  * When the {@code TimedSemaphore} is no more needed its {@link #shutdown()}
132  * method should be called. This causes the periodic task that monitors the time
133  * interval to be canceled. If the {@code ScheduledExecutorService} has been
134  * created by the semaphore at construction time, it is also shut down.
135  * resources. After that {@link #acquire()} must not be called any more.
136  * </p>
137  *
138  * @since 3.0
139  */
140 public class TimedSemaphore {
141     /**
142      * Constant for a value representing no limit. If the limit is set to a
143      * value less or equal this constant, the {@code TimedSemaphore} will be
144      * effectively switched off.
145      */
146     public static final int NO_LIMIT = 0;
147 
148     /** Constant for the thread pool size for the executor. */
149     private static final int THREAD_POOL_SIZE = 1;
150 
151     /** The executor service for managing the timer thread. */
152     private final ScheduledExecutorService executorService;
153 
154     /** Stores the period for this timed semaphore. */
155     private final long period;
156 
157     /** The time unit for the period. */
158     private final TimeUnit unit;
159 
160     /** A flag whether the executor service was created by this object. */
161     private final boolean ownExecutor;
162 
163     /** A future object representing the timer task. */
164     private ScheduledFuture<?> task; // @GuardedBy("this")
165 
166     /** Stores the total number of invocations of the acquire() method. */
167     private long totalAcquireCount; // @GuardedBy("this")
168 
169     /**
170      * The counter for the periods. This counter is increased every time a
171      * period ends.
172      */
173     private long periodCount; // @GuardedBy("this")
174 
175     /** The limit. */
176     private int limit; // @GuardedBy("this")
177 
178     /** The current counter. */
179     private int acquireCount;  // @GuardedBy("this")
180 
181     /** The number of invocations of acquire() in the last period. */
182     private int lastCallsPerPeriod; // @GuardedBy("this")
183 
184     /** A flag whether shutdown() was called. */
185     private boolean shutdown;  // @GuardedBy("this")
186 
187     /**
188      * Creates a new instance of {@link TimedSemaphore} and initializes it with
189      * the given time period and the limit.
190      *
191      * @param timePeriod the time period
192      * @param timeUnit the unit for the period
193      * @param limit the limit for the semaphore
194      * @throws IllegalArgumentException if the period is less or equals 0
195      */
196     public TimedSemaphore(final long timePeriod, final TimeUnit timeUnit, final int limit) {
197         this(null, timePeriod, timeUnit, limit);
198     }
199 
200     /**
201      * Creates a new instance of {@link TimedSemaphore} and initializes it with
202      * an executor service, the given time period, and the limit. The executor
203      * service will be used for creating a periodic task for monitoring the time
204      * period. It can be <b>null</b>, then a default service will be created.
205      *
206      * @param service the executor service
207      * @param timePeriod the time period
208      * @param timeUnit the unit for the period
209      * @param limit the limit for the semaphore
210      * @throws IllegalArgumentException if the period is less or equals 0
211      */
212     public TimedSemaphore(final ScheduledExecutorService service, final long timePeriod,
213             final TimeUnit timeUnit, final int limit) {
214         Validate.inclusiveBetween(1, Long.MAX_VALUE, timePeriod, "Time period must be greater than 0!");
215 
216         period = timePeriod;
217         unit = timeUnit;
218 
219         if (service != null) {
220             executorService = service;
221             ownExecutor = false;
222         } else {
223             final ScheduledThreadPoolExecutor s = new ScheduledThreadPoolExecutor(
224                     THREAD_POOL_SIZE);
225             s.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
226             s.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
227             executorService = s;
228             ownExecutor = true;
229         }
230 
231         setLimit(limit);
232     }
233 
234     /**
235      * Returns the limit enforced by this semaphore. The limit determines how
236      * many invocations of {@link #acquire()} are allowed within the monitored
237      * period.
238      *
239      * @return the limit
240      */
241     public final synchronized int getLimit() {
242         return limit;
243     }
244 
245     /**
246      * Sets the limit. This is the number of times the {@link #acquire()} method
247      * can be called within the time period specified. If this limit is reached,
248      * further invocations of {@link #acquire()} will block. Setting the limit
249      * to a value &lt;= {@link #NO_LIMIT} will cause the limit to be disabled,
250      * i.e. an arbitrary number of{@link #acquire()} invocations is allowed in
251      * the time period.
252      *
253      * @param limit the limit
254      */
255     public final synchronized void setLimit(final int limit) {
256         this.limit = limit;
257     }
258 
259     /**
260      * Initializes a shutdown. After that the object cannot be used any more.
261      * This method can be invoked an arbitrary number of times. All invocations
262      * after the first one do not have any effect.
263      */
264     public synchronized void shutdown() {
265         if (!shutdown) {
266 
267             if (ownExecutor) {
268                 // if the executor was created by this instance, it has
269                 // to be shutdown
270                 getExecutorService().shutdownNow();
271             }
272             if (task != null) {
273                 task.cancel(false);
274             }
275 
276             shutdown = true;
277         }
278     }
279 
280     /**
281      * Tests whether the {@link #shutdown()} method has been called on this
282      * object. If this method returns <b>true</b>, this instance cannot be used
283      * any longer.
284      *
285      * @return a flag whether a shutdown has been performed
286      */
287     public synchronized boolean isShutdown() {
288         return shutdown;
289     }
290 
291     /**
292      * Acquires a permit from this semaphore. This method will block if
293      * the limit for the current period has already been reached. If
294      * {@link #shutdown()} has already been invoked, calling this method will
295      * cause an exception. The very first call of this method starts the timer
296      * task which monitors the time period set for this {@code TimedSemaphore}.
297      * From now on the semaphore is active.
298      *
299      * @throws InterruptedException if the thread gets interrupted
300      * @throws IllegalStateException if this semaphore is already shut down
301      */
302     public synchronized void acquire() throws InterruptedException {
303         prepareAcquire();
304 
305         boolean canPass;
306         do {
307             canPass = acquirePermit();
308             if (!canPass) {
309                 wait();
310             }
311         } while (!canPass);
312     }
313 
314     /**
315      * Tries to acquire a permit from this semaphore. If the limit of this semaphore has
316      * not yet been reached, a permit is acquired, and this method returns
317      * <strong>true</strong>. Otherwise, this method returns immediately with the result
318      * <strong>false</strong>.
319      *
320      * @return <strong>true</strong> if a permit could be acquired; <strong>false</strong>
321      * otherwise
322      * @throws IllegalStateException if this semaphore is already shut down
323      * @since 3.5
324      */
325     public synchronized boolean tryAcquire() {
326         prepareAcquire();
327         return acquirePermit();
328     }
329 
330     /**
331      * Returns the number of (successful) acquire invocations during the last
332      * period. This is the number of times the {@link #acquire()} method was
333      * called without blocking. This can be useful for testing or debugging
334      * purposes or to determine a meaningful threshold value. If a limit is set,
335      * the value returned by this method won't be greater than this limit.
336      *
337      * @return the number of non-blocking invocations of the {@link #acquire()}
338      * method
339      */
340     public synchronized int getLastAcquiresPerPeriod() {
341         return lastCallsPerPeriod;
342     }
343 
344     /**
345      * Returns the number of invocations of the {@link #acquire()} method for
346      * the current period. This may be useful for testing or debugging purposes.
347      *
348      * @return the current number of {@link #acquire()} invocations
349      */
350     public synchronized int getAcquireCount() {
351         return acquireCount;
352     }
353 
354     /**
355      * Returns the number of calls to the {@link #acquire()} method that can
356      * still be performed in the current period without blocking. This method
357      * can give an indication whether it is safe to call the {@link #acquire()}
358      * method without risking to be suspended. However, there is no guarantee
359      * that a subsequent call to {@link #acquire()} actually is not-blocking
360      * because in the mean time other threads may have invoked the semaphore.
361      *
362      * @return the current number of available {@link #acquire()} calls in the
363      * current period
364      */
365     public synchronized int getAvailablePermits() {
366         return getLimit() - getAcquireCount();
367     }
368 
369     /**
370      * Returns the average number of successful (i.e. non-blocking)
371      * {@link #acquire()} invocations for the entire life-time of this {@code
372      * TimedSemaphore}. This method can be used for instance for statistical
373      * calculations.
374      *
375      * @return the average number of {@link #acquire()} invocations per time
376      * unit
377      */
378     public synchronized double getAverageCallsPerPeriod() {
379         return periodCount == 0 ? 0 : (double) totalAcquireCount
380                 / (double) periodCount;
381     }
382 
383     /**
384      * Returns the time period. This is the time monitored by this semaphore.
385      * Only a given number of invocations of the {@link #acquire()} method is
386      * possible in this period.
387      *
388      * @return the time period
389      */
390     public long getPeriod() {
391         return period;
392     }
393 
394     /**
395      * Returns the time unit. This is the unit used by {@link #getPeriod()}.
396      *
397      * @return the time unit
398      */
399     public TimeUnit getUnit() {
400         return unit;
401     }
402 
403     /**
404      * Returns the executor service used by this instance.
405      *
406      * @return the executor service
407      */
408     protected ScheduledExecutorService getExecutorService() {
409         return executorService;
410     }
411 
412     /**
413      * Starts the timer. This method is called when {@link #acquire()} is called
414      * for the first time. It schedules a task to be executed at fixed rate to
415      * monitor the time period specified.
416      *
417      * @return a future object representing the task scheduled
418      */
419     protected ScheduledFuture<?> startTimer() {
420         return getExecutorService().scheduleAtFixedRate(new Runnable() {
421             @Override
422             public void run() {
423                 endOfPeriod();
424             }
425         }, getPeriod(), getPeriod(), getUnit());
426     }
427 
428     /**
429      * The current time period is finished. This method is called by the timer
430      * used internally to monitor the time period. It resets the counter and
431      * releases the threads waiting for this barrier.
432      */
433     synchronized void endOfPeriod() {
434         lastCallsPerPeriod = acquireCount;
435         totalAcquireCount += acquireCount;
436         periodCount++;
437         acquireCount = 0;
438         notifyAll();
439     }
440 
441     /**
442      * Prepares an acquire operation. Checks for the current state and starts the internal
443      * timer if necessary. This method must be called with the lock of this object held.
444      */
445     private void prepareAcquire() {
446         if (isShutdown()) {
447             throw new IllegalStateException("TimedSemaphore is shut down!");
448         }
449 
450         if (task == null) {
451             task = startTimer();
452         }
453     }
454 
455     /**
456      * Internal helper method for acquiring a permit. This method checks whether currently
457      * a permit can be acquired and - if so - increases the internal counter. The return
458      * value indicates whether a permit could be acquired. This method must be called with
459      * the lock of this object held.
460      *
461      * @return a flag whether a permit could be acquired
462      */
463     private boolean acquirePermit() {
464         if (getLimit() <= NO_LIMIT || acquireCount < getLimit()) {
465             acquireCount++;
466             return true;
467         }
468         return false;
469     }
470 }