1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17 package org.apache.commons.lang3.concurrent;
18
19 import java.util.concurrent.ScheduledExecutorService;
20 import java.util.concurrent.ScheduledFuture;
21 import java.util.concurrent.ScheduledThreadPoolExecutor;
22 import java.util.concurrent.TimeUnit;
23
24 /**
25 * <p>
26 * A specialized <em>semaphore</em> implementation that provides a number of
27 * permits in a given time frame.
28 * </p>
29 * <p>
30 * This class is similar to the {@code java.util.concurrent.Semaphore} class
31 * provided by the JDK in that it manages a configurable number of permits.
32 * Using the {@link #acquire()} method a permit can be requested by a thread.
33 * However, there is an additional timing dimension: there is no {@code
34 * release()} method for freeing a permit, but all permits are automatically
35 * released at the end of a configurable time frame. If a thread calls
36 * {@link #acquire()} and the available permits are already exhausted for this
37 * time frame, the thread is blocked. When the time frame ends all permits
38 * requested so far are restored, and blocking threads are waked up again, so
39 * that they can try to acquire a new permit. This basically means that in the
40 * specified time frame only the given number of operations is possible.
41 * </p>
42 * <p>
43 * A use case for this class is to artificially limit the load produced by a
44 * process. As an example consider an application that issues database queries
45 * on a production system in a background process to gather statistical
46 * information. This background processing should not produce so much database
47 * load that the functionality and the performance of the production system are
48 * impacted. Here a {@code TimedSemaphore} could be installed to guarantee that
49 * only a given number of database queries are issued per second.
50 * </p>
51 * <p>
52 * A thread class for performing database queries could look as follows:
53 *
54 * <pre>
55 * public class StatisticsThread extends Thread {
56 * // The semaphore for limiting database load.
57 * private final TimedSemaphore semaphore;
58 * // Create an instance and set the semaphore
59 * public StatisticsThread(TimedSemaphore timedSemaphore) {
60 * semaphore = timedSemaphore;
61 * }
62 * // Gather statistics
63 * public void run() {
64 * try {
65 * while(true) {
66 * semaphore.acquire(); // limit database load
67 * performQuery(); // issue a query
68 * }
69 * } catch(InterruptedException) {
70 * // fall through
71 * }
72 * }
73 * ...
74 * }
75 * </pre>
76 *
77 * The following code fragment shows how a {@code TimedSemaphore} is created
78 * that allows only 10 operations per second and passed to the statistics
79 * thread:
80 *
81 * <pre>
82 * TimedSemaphore sem = new TimedSemaphore(1, TimeUnit.SECOND, 10);
83 * StatisticsThread thread = new StatisticsThread(sem);
84 * thread.start();
85 * </pre>
86 *
87 * </p>
88 * <p>
89 * When creating an instance the time period for the semaphore must be
90 * specified. {@code TimedSemaphore} uses an executor service with a
91 * corresponding period to monitor this interval. The {@code
92 * ScheduledExecutorService} to be used for this purpose can be provided at
93 * construction time. Alternatively the class creates an internal executor
94 * service.
95 * </p>
96 * <p>
97 * Client code that uses {@code TimedSemaphore} has to call the
98 * {@link #acquire()} method in aach processing step. {@code TimedSemaphore}
99 * keeps track of the number of invocations of the {@link #acquire()} method and
100 * blocks the calling thread if the counter exceeds the limit specified. When
101 * the timer signals the end of the time period the counter is reset and all
102 * waiting threads are released. Then another cycle can start.
103 * </p>
104 * <p>
105 * It is possible to modify the limit at any time using the
106 * {@link #setLimit(int)} method. This is useful if the load produced by an
107 * operation has to be adapted dynamically. In the example scenario with the
108 * thread collecting statistics it may make sense to specify a low limit during
109 * day time while allowing a higher load in the night time. Reducing the limit
110 * takes effect immediately by blocking incoming callers. If the limit is
111 * increased, waiting threads are not released immediately, but wake up when the
112 * timer runs out. Then, in the next period more processing steps can be
113 * performed without blocking. By setting the limit to 0 the semaphore can be
114 * switched off: in this mode the {@link #acquire()} method never blocks, but
115 * lets all callers pass directly.
116 * </p>
117 * <p>
118 * When the {@code TimedSemaphore} is no more needed its {@link #shutdown()}
119 * method should be called. This causes the periodic task that monitors the time
120 * interval to be canceled. If the {@code ScheduledExecutorService} has been
121 * created by the semaphore at construction time, it is also shut down.
122 * resources. After that {@link #acquire()} must not be called any more.
123 * </p>
124 *
125 * @since 3.0
126 * @version $Id: TimedSemaphore.java 1436770 2013-01-22 07:09:45Z ggregory $
127 */
128 public class TimedSemaphore {
129 /**
130 * Constant for a value representing no limit. If the limit is set to a
131 * value less or equal this constant, the {@code TimedSemaphore} will be
132 * effectively switched off.
133 */
134 public static final int NO_LIMIT = 0;
135
136 /** Constant for the thread pool size for the executor. */
137 private static final int THREAD_POOL_SIZE = 1;
138
139 /** The executor service for managing the timer thread. */
140 private final ScheduledExecutorService executorService;
141
142 /** Stores the period for this timed semaphore. */
143 private final long period;
144
145 /** The time unit for the period. */
146 private final TimeUnit unit;
147
148 /** A flag whether the executor service was created by this object. */
149 private final boolean ownExecutor;
150
151 /** A future object representing the timer task. */
152 private ScheduledFuture<?> task; // @GuardedBy("this")
153
154 /** Stores the total number of invocations of the acquire() method. */
155 private long totalAcquireCount; // @GuardedBy("this")
156
157 /**
158 * The counter for the periods. This counter is increased every time a
159 * period ends.
160 */
161 private long periodCount; // @GuardedBy("this")
162
163 /** The limit. */
164 private int limit; // @GuardedBy("this")
165
166 /** The current counter. */
167 private int acquireCount; // @GuardedBy("this")
168
169 /** The number of invocations of acquire() in the last period. */
170 private int lastCallsPerPeriod; // @GuardedBy("this")
171
172 /** A flag whether shutdown() was called. */
173 private boolean shutdown; // @GuardedBy("this")
174
175 /**
176 * Creates a new instance of {@link TimedSemaphore} and initializes it with
177 * the given time period and the limit.
178 *
179 * @param timePeriod the time period
180 * @param timeUnit the unit for the period
181 * @param limit the limit for the semaphore
182 * @throws IllegalArgumentException if the period is less or equals 0
183 */
184 public TimedSemaphore(final long timePeriod, final TimeUnit timeUnit, final int limit) {
185 this(null, timePeriod, timeUnit, limit);
186 }
187
188 /**
189 * Creates a new instance of {@link TimedSemaphore} and initializes it with
190 * an executor service, the given time period, and the limit. The executor
191 * service will be used for creating a periodic task for monitoring the time
192 * period. It can be <b>null</b>, then a default service will be created.
193 *
194 * @param service the executor service
195 * @param timePeriod the time period
196 * @param timeUnit the unit for the period
197 * @param limit the limit for the semaphore
198 * @throws IllegalArgumentException if the period is less or equals 0
199 */
200 public TimedSemaphore(final ScheduledExecutorService service, final long timePeriod,
201 final TimeUnit timeUnit, final int limit) {
202 if (timePeriod <= 0) {
203 throw new IllegalArgumentException("Time period must be greater 0!");
204 }
205
206 period = timePeriod;
207 unit = timeUnit;
208
209 if (service != null) {
210 executorService = service;
211 ownExecutor = false;
212 } else {
213 final ScheduledThreadPoolExecutor s = new ScheduledThreadPoolExecutor(
214 THREAD_POOL_SIZE);
215 s.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
216 s.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
217 executorService = s;
218 ownExecutor = true;
219 }
220
221 setLimit(limit);
222 }
223
224 /**
225 * Returns the limit enforced by this semaphore. The limit determines how
226 * many invocations of {@link #acquire()} are allowed within the monitored
227 * period.
228 *
229 * @return the limit
230 */
231 public final synchronized int getLimit() {
232 return limit;
233 }
234
235 /**
236 * Sets the limit. This is the number of times the {@link #acquire()} method
237 * can be called within the time period specified. If this limit is reached,
238 * further invocations of {@link #acquire()} will block. Setting the limit
239 * to a value <= {@link #NO_LIMIT} will cause the limit to be disabled,
240 * i.e. an arbitrary number of{@link #acquire()} invocations is allowed in
241 * the time period.
242 *
243 * @param limit the limit
244 */
245 public final synchronized void setLimit(final int limit) {
246 this.limit = limit;
247 }
248
249 /**
250 * Initializes a shutdown. After that the object cannot be used any more.
251 * This method can be invoked an arbitrary number of times. All invocations
252 * after the first one do not have any effect.
253 */
254 public synchronized void shutdown() {
255 if (!shutdown) {
256
257 if (ownExecutor) {
258 // if the executor was created by this instance, it has
259 // to be shutdown
260 getExecutorService().shutdownNow();
261 }
262 if (task != null) {
263 task.cancel(false);
264 }
265
266 shutdown = true;
267 }
268 }
269
270 /**
271 * Tests whether the {@link #shutdown()} method has been called on this
272 * object. If this method returns <b>true</b>, this instance cannot be used
273 * any longer.
274 *
275 * @return a flag whether a shutdown has been performed
276 */
277 public synchronized boolean isShutdown() {
278 return shutdown;
279 }
280
281 /**
282 * Tries to acquire a permit from this semaphore. This method will block if
283 * the limit for the current period has already been reached. If
284 * {@link #shutdown()} has already been invoked, calling this method will
285 * cause an exception. The very first call of this method starts the timer
286 * task which monitors the time period set for this {@code TimedSemaphore}.
287 * From now on the semaphore is active.
288 *
289 * @throws InterruptedException if the thread gets interrupted
290 * @throws IllegalStateException if this semaphore is already shut down
291 */
292 public synchronized void acquire() throws InterruptedException {
293 if (isShutdown()) {
294 throw new IllegalStateException("TimedSemaphore is shut down!");
295 }
296
297 if (task == null) {
298 task = startTimer();
299 }
300
301 boolean canPass = false;
302 do {
303 canPass = getLimit() <= NO_LIMIT || acquireCount < getLimit();
304 if (!canPass) {
305 wait();
306 } else {
307 acquireCount++;
308 }
309 } while (!canPass);
310 }
311
312 /**
313 * Returns the number of (successful) acquire invocations during the last
314 * period. This is the number of times the {@link #acquire()} method was
315 * called without blocking. This can be useful for testing or debugging
316 * purposes or to determine a meaningful threshold value. If a limit is set,
317 * the value returned by this method won't be greater than this limit.
318 *
319 * @return the number of non-blocking invocations of the {@link #acquire()}
320 * method
321 */
322 public synchronized int getLastAcquiresPerPeriod() {
323 return lastCallsPerPeriod;
324 }
325
326 /**
327 * Returns the number of invocations of the {@link #acquire()} method for
328 * the current period. This may be useful for testing or debugging purposes.
329 *
330 * @return the current number of {@link #acquire()} invocations
331 */
332 public synchronized int getAcquireCount() {
333 return acquireCount;
334 }
335
336 /**
337 * Returns the number of calls to the {@link #acquire()} method that can
338 * still be performed in the current period without blocking. This method
339 * can give an indication whether it is safe to call the {@link #acquire()}
340 * method without risking to be suspended. However, there is no guarantee
341 * that a subsequent call to {@link #acquire()} actually is not-blocking
342 * because in the mean time other threads may have invoked the semaphore.
343 *
344 * @return the current number of available {@link #acquire()} calls in the
345 * current period
346 */
347 public synchronized int getAvailablePermits() {
348 return getLimit() - getAcquireCount();
349 }
350
351 /**
352 * Returns the average number of successful (i.e. non-blocking)
353 * {@link #acquire()} invocations for the entire life-time of this {@code
354 * TimedSemaphore}. This method can be used for instance for statistical
355 * calculations.
356 *
357 * @return the average number of {@link #acquire()} invocations per time
358 * unit
359 */
360 public synchronized double getAverageCallsPerPeriod() {
361 return periodCount == 0 ? 0 : (double) totalAcquireCount
362 / (double) periodCount;
363 }
364
365 /**
366 * Returns the time period. This is the time monitored by this semaphore.
367 * Only a given number of invocations of the {@link #acquire()} method is
368 * possible in this period.
369 *
370 * @return the time period
371 */
372 public long getPeriod() {
373 return period;
374 }
375
376 /**
377 * Returns the time unit. This is the unit used by {@link #getPeriod()}.
378 *
379 * @return the time unit
380 */
381 public TimeUnit getUnit() {
382 return unit;
383 }
384
385 /**
386 * Returns the executor service used by this instance.
387 *
388 * @return the executor service
389 */
390 protected ScheduledExecutorService getExecutorService() {
391 return executorService;
392 }
393
394 /**
395 * Starts the timer. This method is called when {@link #acquire()} is called
396 * for the first time. It schedules a task to be executed at fixed rate to
397 * monitor the time period specified.
398 *
399 * @return a future object representing the task scheduled
400 */
401 protected ScheduledFuture<?> startTimer() {
402 return getExecutorService().scheduleAtFixedRate(new Runnable() {
403 @Override
404 public void run() {
405 endOfPeriod();
406 }
407 }, getPeriod(), getPeriod(), getUnit());
408 }
409
410 /**
411 * The current time period is finished. This method is called by the timer
412 * used internally to monitor the time period. It resets the counter and
413 * releases the threads waiting for this barrier.
414 */
415 synchronized void endOfPeriod() {
416 lastCallsPerPeriod = acquireCount;
417 totalAcquireCount += acquireCount;
418 periodCount++;
419 acquireCount = 0;
420 notifyAll();
421 }
422 }