View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.commons.lang3.concurrent;
18  
19  import java.util.concurrent.ScheduledExecutorService;
20  import java.util.concurrent.ScheduledFuture;
21  import java.util.concurrent.ScheduledThreadPoolExecutor;
22  import java.util.concurrent.TimeUnit;
23  
24  import org.apache.commons.lang3.Validate;
25  
26  /**
27   * A specialized <em>semaphore</em> implementation that provides a number of
28   * permits in a given time frame.
29   *
30   * <p>
31   * This class is similar to the {@link java.util.concurrent.Semaphore} class
32   * provided by the JDK in that it manages a configurable number of permits.
33   * Using the {@link #acquire()} method a permit can be requested by a thread.
34   * However, there is an additional timing dimension: there is no {@code
35   * release()} method for freeing a permit, but all permits are automatically
36   * released at the end of a configurable time frame. If a thread calls
37   * {@link #acquire()} and the available permits are already exhausted for this
38   * time frame, the thread is blocked. When the time frame ends all permits
39   * requested so far are restored, and blocking threads are waked up again, so
40   * that they can try to acquire a new permit. This basically means that in the
41   * specified time frame only the given number of operations is possible.
42   * </p>
43   * <p>
44   * A use case for this class is to artificially limit the load produced by a
45   * process. As an example consider an application that issues database queries
46   * on a production system in a background process to gather statistical
47   * information. This background processing should not produce so much database
48   * load that the functionality and the performance of the production system are
49   * impacted. Here a {@link TimedSemaphore} could be installed to guarantee that
50   * only a given number of database queries are issued per second.
51   * </p>
52   * <p>
53   * A thread class for performing database queries could look as follows:
54   * </p>
55   *
56   * <pre>
57   * public class StatisticsThread extends Thread {
58   *     // The semaphore for limiting database load.
59   *     private final TimedSemaphore semaphore;
60   *     // Create an instance and set the semaphore
61   *     public StatisticsThread(TimedSemaphore timedSemaphore) {
62   *         semaphore = timedSemaphore;
63   *     }
64   *     // Gather statistics
65   *     public void run() {
66   *         try {
67   *             while (true) {
68   *                 semaphore.acquire();   // limit database load
69   *                 performQuery();        // issue a query
70   *             }
71   *         } catch (InterruptedException) {
72   *             // fall through
73   *         }
74   *     }
75   *     ...
76   * }
77   * </pre>
78   *
79   * <p>
80   * The following code fragment shows how a {@link TimedSemaphore} is created
81   * that allows only 10 operations per second and passed to the statistics
82   * thread:
83   * </p>
84   *
85   * <pre>
86   * TimedSemaphore sem = new TimedSemaphore(1, TimeUnit.SECOND, 10);
87   * StatisticsThread thread = new StatisticsThread(sem);
88   * thread.start();
89   * </pre>
90   *
91   * <p>
92   * When creating an instance the time period for the semaphore must be
93   * specified. {@link TimedSemaphore} uses an executor service with a
94   * corresponding period to monitor this interval. The {@code
95   * ScheduledExecutorService} to be used for this purpose can be provided at
96   * construction time. Alternatively the class creates an internal executor
97   * service.
98   * </p>
99   * <p>
100  * Client code that uses {@link TimedSemaphore} has to call the
101  * {@link #acquire()} method in each processing step. {@link TimedSemaphore}
102  * keeps track of the number of invocations of the {@link #acquire()} method and
103  * blocks the calling thread if the counter exceeds the limit specified. When
104  * the timer signals the end of the time period the counter is reset and all
105  * waiting threads are released. Then another cycle can start.
106  * </p>
107  * <p>
108  * An alternative to {@code acquire()} is the {@link #tryAcquire()} method. This
109  * method checks whether the semaphore is under the specified limit and
110  * increases the internal counter if this is the case. The return value is then
111  * <strong>true</strong>, and the calling thread can continue with its action.
112  * If the semaphore is already at its limit, {@code tryAcquire()} immediately
113  * returns <strong>false</strong> without blocking; the calling thread must
114  * then abort its action. This usage scenario prevents blocking of threads.
115  * </p>
116  * <p>
117  * It is possible to modify the limit at any time using the
118  * {@link #setLimit(int)} method. This is useful if the load produced by an
119  * operation has to be adapted dynamically. In the example scenario with the
120  * thread collecting statistics it may make sense to specify a low limit during
121  * day time while allowing a higher load in the night time. Reducing the limit
122  * takes effect immediately by blocking incoming callers. If the limit is
123  * increased, waiting threads are not released immediately, but wake up when the
124  * timer runs out. Then, in the next period more processing steps can be
125  * performed without blocking. By setting the limit to 0 the semaphore can be
126  * switched off: in this mode the {@link #acquire()} method never blocks, but
127  * lets all callers pass directly.
128  * </p>
129  * <p>
130  * When the {@link TimedSemaphore} is no more needed its {@link #shutdown()}
131  * method should be called. This causes the periodic task that monitors the time
132  * interval to be canceled. If the {@link ScheduledExecutorService} has been
133  * created by the semaphore at construction time, it is also shut down.
134  * resources. After that {@link #acquire()} must not be called any more.
135  * </p>
136  *
137  * @since 3.0
138  */
139 public class TimedSemaphore {
140     /**
141      * Constant for a value representing no limit. If the limit is set to a
142      * value less or equal this constant, the {@link TimedSemaphore} will be
143      * effectively switched off.
144      */
145     public static final int NO_LIMIT = 0;
146 
147     /** Constant for the thread pool size for the executor. */
148     private static final int THREAD_POOL_SIZE = 1;
149 
150     /** The executor service for managing the timer thread. */
151     private final ScheduledExecutorService executorService;
152 
153     /** Stores the period for this timed semaphore. */
154     private final long period;
155 
156     /** The time unit for the period. */
157     private final TimeUnit unit;
158 
159     /** A flag whether the executor service was created by this object. */
160     private final boolean ownExecutor;
161 
162     /** A future object representing the timer task. */
163     private ScheduledFuture<?> task; // @GuardedBy("this")
164 
165     /** Stores the total number of invocations of the acquire() method. */
166     private long totalAcquireCount; // @GuardedBy("this")
167 
168     /**
169      * The counter for the periods. This counter is increased every time a
170      * period ends.
171      */
172     private long periodCount; // @GuardedBy("this")
173 
174     /** The limit. */
175     private int limit; // @GuardedBy("this")
176 
177     /** The current counter. */
178     private int acquireCount;  // @GuardedBy("this")
179 
180     /** The number of invocations of acquire() in the last period. */
181     private int lastCallsPerPeriod; // @GuardedBy("this")
182 
183     /** A flag whether shutdown() was called. */
184     private boolean shutdown;  // @GuardedBy("this")
185 
186     /**
187      * Creates a new instance of {@link TimedSemaphore} and initializes it with
188      * the given time period and the limit.
189      *
190      * @param timePeriod the time period
191      * @param timeUnit the unit for the period
192      * @param limit the limit for the semaphore
193      * @throws IllegalArgumentException if the period is less or equals 0
194      */
195     public TimedSemaphore(final long timePeriod, final TimeUnit timeUnit, final int limit) {
196         this(null, timePeriod, timeUnit, limit);
197     }
198 
199     /**
200      * Creates a new instance of {@link TimedSemaphore} and initializes it with
201      * an executor service, the given time period, and the limit. The executor
202      * service will be used for creating a periodic task for monitoring the time
203      * period. It can be <b>null</b>, then a default service will be created.
204      *
205      * @param service the executor service
206      * @param timePeriod the time period
207      * @param timeUnit the unit for the period
208      * @param limit the limit for the semaphore
209      * @throws IllegalArgumentException if the period is less or equals 0
210      */
211     public TimedSemaphore(final ScheduledExecutorService service, final long timePeriod,
212             final TimeUnit timeUnit, final int limit) {
213         Validate.inclusiveBetween(1, Long.MAX_VALUE, timePeriod, "Time period must be greater than 0!");
214 
215         period = timePeriod;
216         unit = timeUnit;
217 
218         if (service != null) {
219             executorService = service;
220             ownExecutor = false;
221         } else {
222             final ScheduledThreadPoolExecutor s = new ScheduledThreadPoolExecutor(
223                     THREAD_POOL_SIZE);
224             s.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
225             s.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
226             executorService = s;
227             ownExecutor = true;
228         }
229 
230         setLimit(limit);
231     }
232 
233     /**
234      * Acquires a permit from this semaphore. This method will block if
235      * the limit for the current period has already been reached. If
236      * {@link #shutdown()} has already been invoked, calling this method will
237      * cause an exception. The very first call of this method starts the timer
238      * task which monitors the time period set for this {@link TimedSemaphore}.
239      * From now on the semaphore is active.
240      *
241      * @throws InterruptedException if the thread gets interrupted
242      * @throws IllegalStateException if this semaphore is already shut down
243      */
244     public synchronized void acquire() throws InterruptedException {
245         prepareAcquire();
246 
247         boolean canPass;
248         do {
249             canPass = acquirePermit();
250             if (!canPass) {
251                 wait();
252             }
253         } while (!canPass);
254     }
255 
256     /**
257      * Internal helper method for acquiring a permit. This method checks whether currently
258      * a permit can be acquired and - if so - increases the internal counter. The return
259      * value indicates whether a permit could be acquired. This method must be called with
260      * the lock of this object held.
261      *
262      * @return a flag whether a permit could be acquired
263      */
264     private boolean acquirePermit() {
265         if (getLimit() <= NO_LIMIT || acquireCount < getLimit()) {
266             acquireCount++;
267             return true;
268         }
269         return false;
270     }
271 
272     /**
273      * The current time period is finished. This method is called by the timer
274      * used internally to monitor the time period. It resets the counter and
275      * releases the threads waiting for this barrier.
276      */
277     synchronized void endOfPeriod() {
278         lastCallsPerPeriod = acquireCount;
279         totalAcquireCount += acquireCount;
280         periodCount++;
281         acquireCount = 0;
282         notifyAll();
283     }
284 
285     /**
286      * Returns the number of invocations of the {@link #acquire()} method for
287      * the current period. This may be useful for testing or debugging purposes.
288      *
289      * @return the current number of {@link #acquire()} invocations
290      */
291     public synchronized int getAcquireCount() {
292         return acquireCount;
293     }
294 
295     /**
296      * Returns the number of calls to the {@link #acquire()} method that can
297      * still be performed in the current period without blocking. This method
298      * can give an indication whether it is safe to call the {@link #acquire()}
299      * method without risking to be suspended. However, there is no guarantee
300      * that a subsequent call to {@link #acquire()} actually is not-blocking
301      * because in the meantime other threads may have invoked the semaphore.
302      *
303      * @return the current number of available {@link #acquire()} calls in the
304      * current period
305      */
306     public synchronized int getAvailablePermits() {
307         return getLimit() - getAcquireCount();
308     }
309 
310     /**
311      * Returns the average number of successful (i.e. non-blocking)
312      * {@link #acquire()} invocations for the entire life-time of this {@code
313      * TimedSemaphore}. This method can be used for instance for statistical
314      * calculations.
315      *
316      * @return the average number of {@link #acquire()} invocations per time
317      * unit
318      */
319     public synchronized double getAverageCallsPerPeriod() {
320         return periodCount == 0 ? 0 : (double) totalAcquireCount
321                 / (double) periodCount;
322     }
323 
324     /**
325      * Returns the executor service used by this instance.
326      *
327      * @return the executor service
328      */
329     protected ScheduledExecutorService getExecutorService() {
330         return executorService;
331     }
332 
333     /**
334      * Returns the number of (successful) acquire invocations during the last
335      * period. This is the number of times the {@link #acquire()} method was
336      * called without blocking. This can be useful for testing or debugging
337      * purposes or to determine a meaningful threshold value. If a limit is set,
338      * the value returned by this method won't be greater than this limit.
339      *
340      * @return the number of non-blocking invocations of the {@link #acquire()}
341      * method
342      */
343     public synchronized int getLastAcquiresPerPeriod() {
344         return lastCallsPerPeriod;
345     }
346 
347     /**
348      * Returns the limit enforced by this semaphore. The limit determines how
349      * many invocations of {@link #acquire()} are allowed within the monitored
350      * period.
351      *
352      * @return the limit
353      */
354     public final synchronized int getLimit() {
355         return limit;
356     }
357 
358     /**
359      * Returns the time period. This is the time monitored by this semaphore.
360      * Only a given number of invocations of the {@link #acquire()} method is
361      * possible in this period.
362      *
363      * @return the time period
364      */
365     public long getPeriod() {
366         return period;
367     }
368 
369     /**
370      * Returns the time unit. This is the unit used by {@link #getPeriod()}.
371      *
372      * @return the time unit
373      */
374     public TimeUnit getUnit() {
375         return unit;
376     }
377 
378     /**
379      * Tests whether the {@link #shutdown()} method has been called on this
380      * object. If this method returns <b>true</b>, this instance cannot be used
381      * any longer.
382      *
383      * @return a flag whether a shutdown has been performed
384      */
385     public synchronized boolean isShutdown() {
386         return shutdown;
387     }
388 
389     /**
390      * Prepares an acquire operation. Checks for the current state and starts the internal
391      * timer if necessary. This method must be called with the lock of this object held.
392      */
393     private void prepareAcquire() {
394         if (isShutdown()) {
395             throw new IllegalStateException("TimedSemaphore is shut down!");
396         }
397 
398         if (task == null) {
399             task = startTimer();
400         }
401     }
402 
403     /**
404      * Sets the limit. This is the number of times the {@link #acquire()} method
405      * can be called within the time period specified. If this limit is reached,
406      * further invocations of {@link #acquire()} will block. Setting the limit
407      * to a value &lt;= {@link #NO_LIMIT} will cause the limit to be disabled,
408      * i.e. an arbitrary number of{@link #acquire()} invocations is allowed in
409      * the time period.
410      *
411      * @param limit the limit
412      */
413     public final synchronized void setLimit(final int limit) {
414         this.limit = limit;
415     }
416 
417     /**
418      * Initializes a shutdown. After that the object cannot be used anymore.
419      * This method can be invoked an arbitrary number of times. All invocations
420      * after the first one do not have any effect.
421      */
422     public synchronized void shutdown() {
423         if (!shutdown) {
424 
425             if (ownExecutor) {
426                 // if the executor was created by this instance, it has
427                 // to be shutdown
428                 getExecutorService().shutdownNow();
429             }
430             if (task != null) {
431                 task.cancel(false);
432             }
433 
434             shutdown = true;
435         }
436     }
437 
438     /**
439      * Starts the timer. This method is called when {@link #acquire()} is called
440      * for the first time. It schedules a task to be executed at fixed rate to
441      * monitor the time period specified.
442      *
443      * @return a future object representing the task scheduled
444      */
445     protected ScheduledFuture<?> startTimer() {
446         return getExecutorService().scheduleAtFixedRate(this::endOfPeriod, getPeriod(), getPeriod(), getUnit());
447     }
448 
449     /**
450      * Tries to acquire a permit from this semaphore. If the limit of this semaphore has
451      * not yet been reached, a permit is acquired, and this method returns
452      * <strong>true</strong>. Otherwise, this method returns immediately with the result
453      * <strong>false</strong>.
454      *
455      * @return <strong>true</strong> if a permit could be acquired; <strong>false</strong>
456      * otherwise
457      * @throws IllegalStateException if this semaphore is already shut down
458      * @since 3.5
459      */
460     public synchronized boolean tryAcquire() {
461         prepareAcquire();
462         return acquirePermit();
463     }
464 }