Coverage Report - org.apache.commons.lang3.concurrent.TimedSemaphore
 
Classes in this File Line Coverage Branch Coverage Complexity
TimedSemaphore
100%
53/53
100%
24/24
1,765
TimedSemaphore$1
100%
3/3
N/A
1,765
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one or more
 3  
  * contributor license agreements.  See the NOTICE file distributed with
 4  
  * this work for additional information regarding copyright ownership.
 5  
  * The ASF licenses this file to You under the Apache License, Version 2.0
 6  
  * (the "License"); you may not use this file except in compliance with
 7  
  * the License.  You may obtain a copy of the License at
 8  
  *
 9  
  *      http://www.apache.org/licenses/LICENSE-2.0
 10  
  *
 11  
  * Unless required by applicable law or agreed to in writing, software
 12  
  * distributed under the License is distributed on an "AS IS" BASIS,
 13  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 14  
  * See the License for the specific language governing permissions and
 15  
  * limitations under the License.
 16  
  */
 17  
 package org.apache.commons.lang3.concurrent;
 18  
 
 19  
 import java.util.concurrent.ScheduledExecutorService;
 20  
 import java.util.concurrent.ScheduledFuture;
 21  
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 22  
 import java.util.concurrent.TimeUnit;
 23  
 
 24  
 /**
 25  
  * <p>
 26  
  * A specialized <em>semaphore</em> implementation that provides a number of
 27  
  * permits in a given time frame.
 28  
  * </p>
 29  
  * <p>
 30  
  * This class is similar to the {@code java.util.concurrent.Semaphore} class
 31  
  * provided by the JDK in that it manages a configurable number of permits.
 32  
  * Using the {@link #acquire()} method a permit can be requested by a thread.
 33  
  * However, there is an additional timing dimension: there is no {@code
 34  
  * release()} method for freeing a permit, but all permits are automatically
 35  
  * released at the end of a configurable time frame. If a thread calls
 36  
  * {@link #acquire()} and the available permits are already exhausted for this
 37  
  * time frame, the thread is blocked. When the time frame ends all permits
 38  
  * requested so far are restored, and blocking threads are waked up again, so
 39  
  * that they can try to acquire a new permit. This basically means that in the
 40  
  * specified time frame only the given number of operations is possible.
 41  
  * </p>
 42  
  * <p>
 43  
  * A use case for this class is to artificially limit the load produced by a
 44  
  * process. As an example consider an application that issues database queries
 45  
  * on a production system in a background process to gather statistical
 46  
  * information. This background processing should not produce so much database
 47  
  * load that the functionality and the performance of the production system are
 48  
  * impacted. Here a {@code TimedSemaphore} could be installed to guarantee that
 49  
  * only a given number of database queries are issued per second.
 50  
  * </p>
 51  
  * <p>
 52  
  * A thread class for performing database queries could look as follows:
 53  
  * </p>
 54  
  *
 55  
  * <pre>
 56  
  * public class StatisticsThread extends Thread {
 57  
  *     // The semaphore for limiting database load.
 58  
  *     private final TimedSemaphore semaphore;
 59  
  *     // Create an instance and set the semaphore
 60  
  *     public StatisticsThread(TimedSemaphore timedSemaphore) {
 61  
  *         semaphore = timedSemaphore;
 62  
  *     }
 63  
  *     // Gather statistics
 64  
  *     public void run() {
 65  
  *         try {
 66  
  *             while(true) {
 67  
  *                 semaphore.acquire();   // limit database load
 68  
  *                 performQuery();        // issue a query
 69  
  *             }
 70  
  *         } catch(InterruptedException) {
 71  
  *             // fall through
 72  
  *         }
 73  
  *     }
 74  
  *     ...
 75  
  * }
 76  
  * </pre>
 77  
  *
 78  
  * <p>
 79  
  * The following code fragment shows how a {@code TimedSemaphore} is created
 80  
  * that allows only 10 operations per second and passed to the statistics
 81  
  * thread:
 82  
  * </p>
 83  
  *
 84  
  * <pre>
 85  
  * TimedSemaphore sem = new TimedSemaphore(1, TimeUnit.SECOND, 10);
 86  
  * StatisticsThread thread = new StatisticsThread(sem);
 87  
  * thread.start();
 88  
  * </pre>
 89  
  *
 90  
  * <p>
 91  
  * When creating an instance the time period for the semaphore must be
 92  
  * specified. {@code TimedSemaphore} uses an executor service with a
 93  
  * corresponding period to monitor this interval. The {@code
 94  
  * ScheduledExecutorService} to be used for this purpose can be provided at
 95  
  * construction time. Alternatively the class creates an internal executor
 96  
  * service.
 97  
  * </p>
 98  
  * <p>
 99  
  * Client code that uses {@code TimedSemaphore} has to call the
 100  
  * {@link #acquire()} method in aach processing step. {@code TimedSemaphore}
 101  
  * keeps track of the number of invocations of the {@link #acquire()} method and
 102  
  * blocks the calling thread if the counter exceeds the limit specified. When
 103  
  * the timer signals the end of the time period the counter is reset and all
 104  
  * waiting threads are released. Then another cycle can start.
 105  
  * </p>
 106  
  * <p>
 107  
  * It is possible to modify the limit at any time using the
 108  
  * {@link #setLimit(int)} method. This is useful if the load produced by an
 109  
  * operation has to be adapted dynamically. In the example scenario with the
 110  
  * thread collecting statistics it may make sense to specify a low limit during
 111  
  * day time while allowing a higher load in the night time. Reducing the limit
 112  
  * takes effect immediately by blocking incoming callers. If the limit is
 113  
  * increased, waiting threads are not released immediately, but wake up when the
 114  
  * timer runs out. Then, in the next period more processing steps can be
 115  
  * performed without blocking. By setting the limit to 0 the semaphore can be
 116  
  * switched off: in this mode the {@link #acquire()} method never blocks, but
 117  
  * lets all callers pass directly.
 118  
  * </p>
 119  
  * <p>
 120  
  * When the {@code TimedSemaphore} is no more needed its {@link #shutdown()}
 121  
  * method should be called. This causes the periodic task that monitors the time
 122  
  * interval to be canceled. If the {@code ScheduledExecutorService} has been
 123  
  * created by the semaphore at construction time, it is also shut down.
 124  
  * resources. After that {@link #acquire()} must not be called any more.
 125  
  * </p>
 126  
  *
 127  
  * @since 3.0
 128  
  * @version $Id: TimedSemaphore.java 1583482 2014-03-31 22:54:57Z niallp $
 129  
  */
 130  
 public class TimedSemaphore {
 131  
     /**
 132  
      * Constant for a value representing no limit. If the limit is set to a
 133  
      * value less or equal this constant, the {@code TimedSemaphore} will be
 134  
      * effectively switched off.
 135  
      */
 136  
     public static final int NO_LIMIT = 0;
 137  
 
 138  
     /** Constant for the thread pool size for the executor. */
 139  
     private static final int THREAD_POOL_SIZE = 1;
 140  
 
 141  
     /** The executor service for managing the timer thread. */
 142  
     private final ScheduledExecutorService executorService;
 143  
 
 144  
     /** Stores the period for this timed semaphore. */
 145  
     private final long period;
 146  
 
 147  
     /** The time unit for the period. */
 148  
     private final TimeUnit unit;
 149  
 
 150  
     /** A flag whether the executor service was created by this object. */
 151  
     private final boolean ownExecutor;
 152  
 
 153  
     /** A future object representing the timer task. */
 154  
     private ScheduledFuture<?> task; // @GuardedBy("this")
 155  
 
 156  
     /** Stores the total number of invocations of the acquire() method. */
 157  
     private long totalAcquireCount; // @GuardedBy("this")
 158  
 
 159  
     /**
 160  
      * The counter for the periods. This counter is increased every time a
 161  
      * period ends.
 162  
      */
 163  
     private long periodCount; // @GuardedBy("this")
 164  
 
 165  
     /** The limit. */
 166  
     private int limit; // @GuardedBy("this")
 167  
 
 168  
     /** The current counter. */
 169  
     private int acquireCount;  // @GuardedBy("this")
 170  
 
 171  
     /** The number of invocations of acquire() in the last period. */
 172  
     private int lastCallsPerPeriod; // @GuardedBy("this")
 173  
 
 174  
     /** A flag whether shutdown() was called. */
 175  
     private boolean shutdown;  // @GuardedBy("this")
 176  
 
 177  
     /**
 178  
      * Creates a new instance of {@link TimedSemaphore} and initializes it with
 179  
      * the given time period and the limit.
 180  
      *
 181  
      * @param timePeriod the time period
 182  
      * @param timeUnit the unit for the period
 183  
      * @param limit the limit for the semaphore
 184  
      * @throws IllegalArgumentException if the period is less or equals 0
 185  
      */
 186  
     public TimedSemaphore(final long timePeriod, final TimeUnit timeUnit, final int limit) {
 187  6
         this(null, timePeriod, timeUnit, limit);
 188  5
     }
 189  
 
 190  
     /**
 191  
      * Creates a new instance of {@link TimedSemaphore} and initializes it with
 192  
      * an executor service, the given time period, and the limit. The executor
 193  
      * service will be used for creating a periodic task for monitoring the time
 194  
      * period. It can be <b>null</b>, then a default service will be created.
 195  
      *
 196  
      * @param service the executor service
 197  
      * @param timePeriod the time period
 198  
      * @param timeUnit the unit for the period
 199  
      * @param limit the limit for the semaphore
 200  
      * @throws IllegalArgumentException if the period is less or equals 0
 201  
      */
 202  
     public TimedSemaphore(final ScheduledExecutorService service, final long timePeriod,
 203  15
             final TimeUnit timeUnit, final int limit) {
 204  15
         if (timePeriod <= 0) {
 205  1
             throw new IllegalArgumentException("Time period must be greater 0!");
 206  
         }
 207  
 
 208  14
         period = timePeriod;
 209  14
         unit = timeUnit;
 210  
 
 211  14
         if (service != null) {
 212  9
             executorService = service;
 213  9
             ownExecutor = false;
 214  
         } else {
 215  5
             final ScheduledThreadPoolExecutor s = new ScheduledThreadPoolExecutor(
 216  
                     THREAD_POOL_SIZE);
 217  5
             s.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
 218  5
             s.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
 219  5
             executorService = s;
 220  5
             ownExecutor = true;
 221  
         }
 222  
 
 223  14
         setLimit(limit);
 224  14
     }
 225  
 
 226  
     /**
 227  
      * Returns the limit enforced by this semaphore. The limit determines how
 228  
      * many invocations of {@link #acquire()} are allowed within the monitored
 229  
      * period.
 230  
      *
 231  
      * @return the limit
 232  
      */
 233  
     public final synchronized int getLimit() {
 234  3150
         return limit;
 235  
     }
 236  
 
 237  
     /**
 238  
      * Sets the limit. This is the number of times the {@link #acquire()} method
 239  
      * can be called within the time period specified. If this limit is reached,
 240  
      * further invocations of {@link #acquire()} will block. Setting the limit
 241  
      * to a value &lt;= {@link #NO_LIMIT} will cause the limit to be disabled,
 242  
      * i.e. an arbitrary number of{@link #acquire()} invocations is allowed in
 243  
      * the time period.
 244  
      *
 245  
      * @param limit the limit
 246  
      */
 247  
     public final synchronized void setLimit(final int limit) {
 248  16
         this.limit = limit;
 249  16
     }
 250  
 
 251  
     /**
 252  
      * Initializes a shutdown. After that the object cannot be used any more.
 253  
      * This method can be invoked an arbitrary number of times. All invocations
 254  
      * after the first one do not have any effect.
 255  
      */
 256  
     public synchronized void shutdown() {
 257  17
         if (!shutdown) {
 258  
 
 259  8
             if (ownExecutor) {
 260  
                 // if the executor was created by this instance, it has
 261  
                 // to be shutdown
 262  5
                 getExecutorService().shutdownNow();
 263  
             }
 264  8
             if (task != null) {
 265  3
                 task.cancel(false);
 266  
             }
 267  
 
 268  8
             shutdown = true;
 269  
         }
 270  17
     }
 271  
 
 272  
     /**
 273  
      * Tests whether the {@link #shutdown()} method has been called on this
 274  
      * object. If this method returns <b>true</b>, this instance cannot be used
 275  
      * any longer.
 276  
      *
 277  
      * @return a flag whether a shutdown has been performed
 278  
      */
 279  
     public synchronized boolean isShutdown() {
 280  2040
         return shutdown;
 281  
     }
 282  
 
 283  
     /**
 284  
      * Tries to acquire a permit from this semaphore. This method will block if
 285  
      * the limit for the current period has already been reached. If
 286  
      * {@link #shutdown()} has already been invoked, calling this method will
 287  
      * cause an exception. The very first call of this method starts the timer
 288  
      * task which monitors the time period set for this {@code TimedSemaphore}.
 289  
      * From now on the semaphore is active.
 290  
      *
 291  
      * @throws InterruptedException if the thread gets interrupted
 292  
      * @throws IllegalStateException if this semaphore is already shut down
 293  
      */
 294  
     public synchronized void acquire() throws InterruptedException {
 295  2036
         if (isShutdown()) {
 296  1
             throw new IllegalStateException("TimedSemaphore is shut down!");
 297  
         }
 298  
 
 299  2035
         if (task == null) {
 300  8
             task = startTimer();
 301  
         }
 302  
 
 303  2035
         boolean canPass = false;
 304  
         do {
 305  2069
             canPass = getLimit() <= NO_LIMIT || acquireCount < getLimit();
 306  2069
             if (!canPass) {
 307  34
                 wait();
 308  
             } else {
 309  2035
                 acquireCount++;
 310  
             }
 311  2069
         } while (!canPass);
 312  2035
     }
 313  
 
 314  
     /**
 315  
      * Returns the number of (successful) acquire invocations during the last
 316  
      * period. This is the number of times the {@link #acquire()} method was
 317  
      * called without blocking. This can be useful for testing or debugging
 318  
      * purposes or to determine a meaningful threshold value. If a limit is set,
 319  
      * the value returned by this method won't be greater than this limit.
 320  
      *
 321  
      * @return the number of non-blocking invocations of the {@link #acquire()}
 322  
      * method
 323  
      */
 324  
     public synchronized int getLastAcquiresPerPeriod() {
 325  12
         return lastCallsPerPeriod;
 326  
     }
 327  
 
 328  
     /**
 329  
      * Returns the number of invocations of the {@link #acquire()} method for
 330  
      * the current period. This may be useful for testing or debugging purposes.
 331  
      *
 332  
      * @return the current number of {@link #acquire()} invocations
 333  
      */
 334  
     public synchronized int getAcquireCount() {
 335  23
         return acquireCount;
 336  
     }
 337  
 
 338  
     /**
 339  
      * Returns the number of calls to the {@link #acquire()} method that can
 340  
      * still be performed in the current period without blocking. This method
 341  
      * can give an indication whether it is safe to call the {@link #acquire()}
 342  
      * method without risking to be suspended. However, there is no guarantee
 343  
      * that a subsequent call to {@link #acquire()} actually is not-blocking
 344  
      * because in the mean time other threads may have invoked the semaphore.
 345  
      *
 346  
      * @return the current number of available {@link #acquire()} calls in the
 347  
      * current period
 348  
      */
 349  
     public synchronized int getAvailablePermits() {
 350  11
         return getLimit() - getAcquireCount();
 351  
     }
 352  
 
 353  
     /**
 354  
      * Returns the average number of successful (i.e. non-blocking)
 355  
      * {@link #acquire()} invocations for the entire life-time of this {@code
 356  
      * TimedSemaphore}. This method can be used for instance for statistical
 357  
      * calculations.
 358  
      *
 359  
      * @return the average number of {@link #acquire()} invocations per time
 360  
      * unit
 361  
      */
 362  
     public synchronized double getAverageCallsPerPeriod() {
 363  3
         return periodCount == 0 ? 0 : (double) totalAcquireCount
 364  
                 / (double) periodCount;
 365  
     }
 366  
 
 367  
     /**
 368  
      * Returns the time period. This is the time monitored by this semaphore.
 369  
      * Only a given number of invocations of the {@link #acquire()} method is
 370  
      * possible in this period.
 371  
      *
 372  
      * @return the time period
 373  
      */
 374  
     public long getPeriod() {
 375  19
         return period;
 376  
     }
 377  
 
 378  
     /**
 379  
      * Returns the time unit. This is the unit used by {@link #getPeriod()}.
 380  
      *
 381  
      * @return the time unit
 382  
      */
 383  
     public TimeUnit getUnit() {
 384  10
         return unit;
 385  
     }
 386  
 
 387  
     /**
 388  
      * Returns the executor service used by this instance.
 389  
      *
 390  
      * @return the executor service
 391  
      */
 392  
     protected ScheduledExecutorService getExecutorService() {
 393  17
         return executorService;
 394  
     }
 395  
 
 396  
     /**
 397  
      * Starts the timer. This method is called when {@link #acquire()} is called
 398  
      * for the first time. It schedules a task to be executed at fixed rate to
 399  
      * monitor the time period specified.
 400  
      *
 401  
      * @return a future object representing the task scheduled
 402  
      */
 403  
     protected ScheduledFuture<?> startTimer() {
 404  9
         return getExecutorService().scheduleAtFixedRate(new Runnable() {
 405  
             @Override
 406  
             public void run() {
 407  5
                 endOfPeriod();
 408  5
             }
 409  
         }, getPeriod(), getPeriod(), getUnit());
 410  
     }
 411  
 
 412  
     /**
 413  
      * The current time period is finished. This method is called by the timer
 414  
      * used internally to monitor the time period. It resets the counter and
 415  
      * releases the threads waiting for this barrier.
 416  
      */
 417  
     synchronized void endOfPeriod() {
 418  19
         lastCallsPerPeriod = acquireCount;
 419  19
         totalAcquireCount += acquireCount;
 420  19
         periodCount++;
 421  19
         acquireCount = 0;
 422  19
         notifyAll();
 423  19
     }
 424  
 }