View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.commons.lang3.concurrent;
18  
19  import java.util.concurrent.ScheduledExecutorService;
20  import java.util.concurrent.ScheduledFuture;
21  import java.util.concurrent.ScheduledThreadPoolExecutor;
22  import java.util.concurrent.TimeUnit;
23  
24  /**
25   * <p>
26   * A specialized <em>semaphore</em> implementation that provides a number of
27   * permits in a given time frame.
28   * </p>
29   * <p>
30   * This class is similar to the {@code java.util.concurrent.Semaphore} class
31   * provided by the JDK in that it manages a configurable number of permits.
32   * Using the {@link #acquire()} method a permit can be requested by a thread.
33   * However, there is an additional timing dimension: there is no {@code
34   * release()} method for freeing a permit, but all permits are automatically
35   * released at the end of a configurable time frame. If a thread calls
36   * {@link #acquire()} and the available permits are already exhausted for this
37   * time frame, the thread is blocked. When the time frame ends all permits
38   * requested so far are restored, and blocking threads are waked up again, so
39   * that they can try to acquire a new permit. This basically means that in the
40   * specified time frame only the given number of operations is possible.
41   * </p>
42   * <p>
43   * A use case for this class is to artificially limit the load produced by a
44   * process. As an example consider an application that issues database queries
45   * on a production system in a background process to gather statistical
46   * information. This background processing should not produce so much database
47   * load that the functionality and the performance of the production system are
48   * impacted. Here a {@code TimedSemaphore} could be installed to guarantee that
49   * only a given number of database queries are issued per second.
50   * </p>
51   * <p>
52   * A thread class for performing database queries could look as follows:
53   *
54   * <pre>
55   * public class StatisticsThread extends Thread {
56   *     // The semaphore for limiting database load.
57   *     private final TimedSemaphore semaphore;
58   *     // Create an instance and set the semaphore
59   *     public StatisticsThread(TimedSemaphore timedSemaphore) {
60   *         semaphore = timedSemaphore;
61   *     }
62   *     // Gather statistics
63   *     public void run() {
64   *         try {
65   *             while(true) {
66   *                 semaphore.acquire();   // limit database load
67   *                 performQuery();        // issue a query
68   *             }
69   *         } catch(InterruptedException) {
70   *             // fall through
71   *         }
72   *     }
73   *     ...
74   * }
75   * </pre>
76   *
77   * The following code fragment shows how a {@code TimedSemaphore} is created
78   * that allows only 10 operations per second and passed to the statistics
79   * thread:
80   *
81   * <pre>
82   * TimedSemaphore sem = new TimedSemaphore(1, TimeUnit.SECOND, 10);
83   * StatisticsThread thread = new StatisticsThread(sem);
84   * thread.start();
85   * </pre>
86   *
87   * </p>
88   * <p>
89   * When creating an instance the time period for the semaphore must be
90   * specified. {@code TimedSemaphore} uses an executor service with a
91   * corresponding period to monitor this interval. The {@code
92   * ScheduledExecutorService} to be used for this purpose can be provided at
93   * construction time. Alternatively the class creates an internal executor
94   * service.
95   * </p>
96   * <p>
97   * Client code that uses {@code TimedSemaphore} has to call the
98   * {@link #acquire()} method in aach processing step. {@code TimedSemaphore}
99   * keeps track of the number of invocations of the {@link #acquire()} method and
100  * blocks the calling thread if the counter exceeds the limit specified. When
101  * the timer signals the end of the time period the counter is reset and all
102  * waiting threads are released. Then another cycle can start.
103  * </p>
104  * <p>
105  * It is possible to modify the limit at any time using the
106  * {@link #setLimit(int)} method. This is useful if the load produced by an
107  * operation has to be adapted dynamically. In the example scenario with the
108  * thread collecting statistics it may make sense to specify a low limit during
109  * day time while allowing a higher load in the night time. Reducing the limit
110  * takes effect immediately by blocking incoming callers. If the limit is
111  * increased, waiting threads are not released immediately, but wake up when the
112  * timer runs out. Then, in the next period more processing steps can be
113  * performed without blocking. By setting the limit to 0 the semaphore can be
114  * switched off: in this mode the {@link #acquire()} method never blocks, but
115  * lets all callers pass directly.
116  * </p>
117  * <p>
118  * When the {@code TimedSemaphore} is no more needed its {@link #shutdown()}
119  * method should be called. This causes the periodic task that monitors the time
120  * interval to be canceled. If the {@code ScheduledExecutorService} has been
121  * created by the semaphore at construction time, it is also shut down.
122  * resources. After that {@link #acquire()} must not be called any more.
123  * </p>
124  *
125  * @since 3.0
126  * @version $Id: TimedSemaphore.java 1436770 2013-01-22 07:09:45Z ggregory $
127  */
128 public class TimedSemaphore {
129     /**
130      * Constant for a value representing no limit. If the limit is set to a
131      * value less or equal this constant, the {@code TimedSemaphore} will be
132      * effectively switched off.
133      */
134     public static final int NO_LIMIT = 0;
135 
136     /** Constant for the thread pool size for the executor. */
137     private static final int THREAD_POOL_SIZE = 1;
138 
139     /** The executor service for managing the timer thread. */
140     private final ScheduledExecutorService executorService;
141 
142     /** Stores the period for this timed semaphore. */
143     private final long period;
144 
145     /** The time unit for the period. */
146     private final TimeUnit unit;
147 
148     /** A flag whether the executor service was created by this object. */
149     private final boolean ownExecutor;
150 
151     /** A future object representing the timer task. */
152     private ScheduledFuture<?> task; // @GuardedBy("this")
153 
154     /** Stores the total number of invocations of the acquire() method. */
155     private long totalAcquireCount; // @GuardedBy("this")
156 
157     /**
158      * The counter for the periods. This counter is increased every time a
159      * period ends.
160      */
161     private long periodCount; // @GuardedBy("this")
162 
163     /** The limit. */
164     private int limit; // @GuardedBy("this")
165 
166     /** The current counter. */
167     private int acquireCount;  // @GuardedBy("this")
168 
169     /** The number of invocations of acquire() in the last period. */
170     private int lastCallsPerPeriod; // @GuardedBy("this")
171 
172     /** A flag whether shutdown() was called. */
173     private boolean shutdown;  // @GuardedBy("this")
174 
175     /**
176      * Creates a new instance of {@link TimedSemaphore} and initializes it with
177      * the given time period and the limit.
178      *
179      * @param timePeriod the time period
180      * @param timeUnit the unit for the period
181      * @param limit the limit for the semaphore
182      * @throws IllegalArgumentException if the period is less or equals 0
183      */
184     public TimedSemaphore(final long timePeriod, final TimeUnit timeUnit, final int limit) {
185         this(null, timePeriod, timeUnit, limit);
186     }
187 
188     /**
189      * Creates a new instance of {@link TimedSemaphore} and initializes it with
190      * an executor service, the given time period, and the limit. The executor
191      * service will be used for creating a periodic task for monitoring the time
192      * period. It can be <b>null</b>, then a default service will be created.
193      *
194      * @param service the executor service
195      * @param timePeriod the time period
196      * @param timeUnit the unit for the period
197      * @param limit the limit for the semaphore
198      * @throws IllegalArgumentException if the period is less or equals 0
199      */
200     public TimedSemaphore(final ScheduledExecutorService service, final long timePeriod,
201             final TimeUnit timeUnit, final int limit) {
202         if (timePeriod <= 0) {
203             throw new IllegalArgumentException("Time period must be greater 0!");
204         }
205 
206         period = timePeriod;
207         unit = timeUnit;
208 
209         if (service != null) {
210             executorService = service;
211             ownExecutor = false;
212         } else {
213             final ScheduledThreadPoolExecutor s = new ScheduledThreadPoolExecutor(
214                     THREAD_POOL_SIZE);
215             s.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
216             s.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
217             executorService = s;
218             ownExecutor = true;
219         }
220 
221         setLimit(limit);
222     }
223 
224     /**
225      * Returns the limit enforced by this semaphore. The limit determines how
226      * many invocations of {@link #acquire()} are allowed within the monitored
227      * period.
228      *
229      * @return the limit
230      */
231     public final synchronized int getLimit() {
232         return limit;
233     }
234 
235     /**
236      * Sets the limit. This is the number of times the {@link #acquire()} method
237      * can be called within the time period specified. If this limit is reached,
238      * further invocations of {@link #acquire()} will block. Setting the limit
239      * to a value &lt;= {@link #NO_LIMIT} will cause the limit to be disabled,
240      * i.e. an arbitrary number of{@link #acquire()} invocations is allowed in
241      * the time period.
242      *
243      * @param limit the limit
244      */
245     public final synchronized void setLimit(final int limit) {
246         this.limit = limit;
247     }
248 
249     /**
250      * Initializes a shutdown. After that the object cannot be used any more.
251      * This method can be invoked an arbitrary number of times. All invocations
252      * after the first one do not have any effect.
253      */
254     public synchronized void shutdown() {
255         if (!shutdown) {
256 
257             if (ownExecutor) {
258                 // if the executor was created by this instance, it has
259                 // to be shutdown
260                 getExecutorService().shutdownNow();
261             }
262             if (task != null) {
263                 task.cancel(false);
264             }
265 
266             shutdown = true;
267         }
268     }
269 
270     /**
271      * Tests whether the {@link #shutdown()} method has been called on this
272      * object. If this method returns <b>true</b>, this instance cannot be used
273      * any longer.
274      *
275      * @return a flag whether a shutdown has been performed
276      */
277     public synchronized boolean isShutdown() {
278         return shutdown;
279     }
280 
281     /**
282      * Tries to acquire a permit from this semaphore. This method will block if
283      * the limit for the current period has already been reached. If
284      * {@link #shutdown()} has already been invoked, calling this method will
285      * cause an exception. The very first call of this method starts the timer
286      * task which monitors the time period set for this {@code TimedSemaphore}.
287      * From now on the semaphore is active.
288      *
289      * @throws InterruptedException if the thread gets interrupted
290      * @throws IllegalStateException if this semaphore is already shut down
291      */
292     public synchronized void acquire() throws InterruptedException {
293         if (isShutdown()) {
294             throw new IllegalStateException("TimedSemaphore is shut down!");
295         }
296 
297         if (task == null) {
298             task = startTimer();
299         }
300 
301         boolean canPass = false;
302         do {
303             canPass = getLimit() <= NO_LIMIT || acquireCount < getLimit();
304             if (!canPass) {
305                 wait();
306             } else {
307                 acquireCount++;
308             }
309         } while (!canPass);
310     }
311 
312     /**
313      * Returns the number of (successful) acquire invocations during the last
314      * period. This is the number of times the {@link #acquire()} method was
315      * called without blocking. This can be useful for testing or debugging
316      * purposes or to determine a meaningful threshold value. If a limit is set,
317      * the value returned by this method won't be greater than this limit.
318      *
319      * @return the number of non-blocking invocations of the {@link #acquire()}
320      * method
321      */
322     public synchronized int getLastAcquiresPerPeriod() {
323         return lastCallsPerPeriod;
324     }
325 
326     /**
327      * Returns the number of invocations of the {@link #acquire()} method for
328      * the current period. This may be useful for testing or debugging purposes.
329      *
330      * @return the current number of {@link #acquire()} invocations
331      */
332     public synchronized int getAcquireCount() {
333         return acquireCount;
334     }
335 
336     /**
337      * Returns the number of calls to the {@link #acquire()} method that can
338      * still be performed in the current period without blocking. This method
339      * can give an indication whether it is safe to call the {@link #acquire()}
340      * method without risking to be suspended. However, there is no guarantee
341      * that a subsequent call to {@link #acquire()} actually is not-blocking
342      * because in the mean time other threads may have invoked the semaphore.
343      *
344      * @return the current number of available {@link #acquire()} calls in the
345      * current period
346      */
347     public synchronized int getAvailablePermits() {
348         return getLimit() - getAcquireCount();
349     }
350 
351     /**
352      * Returns the average number of successful (i.e. non-blocking)
353      * {@link #acquire()} invocations for the entire life-time of this {@code
354      * TimedSemaphore}. This method can be used for instance for statistical
355      * calculations.
356      *
357      * @return the average number of {@link #acquire()} invocations per time
358      * unit
359      */
360     public synchronized double getAverageCallsPerPeriod() {
361         return periodCount == 0 ? 0 : (double) totalAcquireCount
362                 / (double) periodCount;
363     }
364 
365     /**
366      * Returns the time period. This is the time monitored by this semaphore.
367      * Only a given number of invocations of the {@link #acquire()} method is
368      * possible in this period.
369      *
370      * @return the time period
371      */
372     public long getPeriod() {
373         return period;
374     }
375 
376     /**
377      * Returns the time unit. This is the unit used by {@link #getPeriod()}.
378      *
379      * @return the time unit
380      */
381     public TimeUnit getUnit() {
382         return unit;
383     }
384 
385     /**
386      * Returns the executor service used by this instance.
387      *
388      * @return the executor service
389      */
390     protected ScheduledExecutorService getExecutorService() {
391         return executorService;
392     }
393 
394     /**
395      * Starts the timer. This method is called when {@link #acquire()} is called
396      * for the first time. It schedules a task to be executed at fixed rate to
397      * monitor the time period specified.
398      *
399      * @return a future object representing the task scheduled
400      */
401     protected ScheduledFuture<?> startTimer() {
402         return getExecutorService().scheduleAtFixedRate(new Runnable() {
403             @Override
404             public void run() {
405                 endOfPeriod();
406             }
407         }, getPeriod(), getPeriod(), getUnit());
408     }
409 
410     /**
411      * The current time period is finished. This method is called by the timer
412      * used internally to monitor the time period. It resets the counter and
413      * releases the threads waiting for this barrier.
414      */
415     synchronized void endOfPeriod() {
416         lastCallsPerPeriod = acquireCount;
417         totalAcquireCount += acquireCount;
418         periodCount++;
419         acquireCount = 0;
420         notifyAll();
421     }
422 }