TimedSemaphore.java

  1. /*
  2.  * Licensed to the Apache Software Foundation (ASF) under one or more
  3.  * contributor license agreements.  See the NOTICE file distributed with
  4.  * this work for additional information regarding copyright ownership.
  5.  * The ASF licenses this file to You under the Apache License, Version 2.0
  6.  * (the "License"); you may not use this file except in compliance with
  7.  * the License.  You may obtain a copy of the License at
  8.  *
  9.  *      http://www.apache.org/licenses/LICENSE-2.0
  10.  *
  11.  * Unless required by applicable law or agreed to in writing, software
  12.  * distributed under the License is distributed on an "AS IS" BASIS,
  13.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14.  * See the License for the specific language governing permissions and
  15.  * limitations under the License.
  16.  */
  17. package org.apache.commons.lang3.concurrent;

  18. import java.util.concurrent.ScheduledExecutorService;
  19. import java.util.concurrent.ScheduledFuture;
  20. import java.util.concurrent.ScheduledThreadPoolExecutor;
  21. import java.util.concurrent.TimeUnit;

  22. import org.apache.commons.lang3.Validate;

  23. /**
  24.  * A specialized <em>semaphore</em> implementation that provides a number of
  25.  * permits in a given time frame.
  26.  *
  27.  * <p>
  28.  * This class is similar to the {@link java.util.concurrent.Semaphore} class
  29.  * provided by the JDK in that it manages a configurable number of permits.
  30.  * Using the {@link #acquire()} method a permit can be requested by a thread.
  31.  * However, there is an additional timing dimension: there is no {@code
  32.  * release()} method for freeing a permit, but all permits are automatically
  33.  * released at the end of a configurable time frame. If a thread calls
  34.  * {@link #acquire()} and the available permits are already exhausted for this
  35.  * time frame, the thread is blocked. When the time frame ends all permits
  36.  * requested so far are restored, and blocking threads are waked up again, so
  37.  * that they can try to acquire a new permit. This basically means that in the
  38.  * specified time frame only the given number of operations is possible.
  39.  * </p>
  40.  * <p>
  41.  * A use case for this class is to artificially limit the load produced by a
  42.  * process. As an example consider an application that issues database queries
  43.  * on a production system in a background process to gather statistical
  44.  * information. This background processing should not produce so much database
  45.  * load that the functionality and the performance of the production system are
  46.  * impacted. Here a {@link TimedSemaphore} could be installed to guarantee that
  47.  * only a given number of database queries are issued per second.
  48.  * </p>
  49.  * <p>
  50.  * A thread class for performing database queries could look as follows:
  51.  * </p>
  52.  *
  53.  * <pre>
  54.  * public class StatisticsThread extends Thread {
  55.  *     // The semaphore for limiting database load.
  56.  *     private final TimedSemaphore semaphore;
  57.  *     // Create an instance and set the semaphore
  58.  *     public StatisticsThread(TimedSemaphore timedSemaphore) {
  59.  *         semaphore = timedSemaphore;
  60.  *     }
  61.  *     // Gather statistics
  62.  *     public void run() {
  63.  *         try {
  64.  *             while (true) {
  65.  *                 semaphore.acquire();   // limit database load
  66.  *                 performQuery();        // issue a query
  67.  *             }
  68.  *         } catch (InterruptedException) {
  69.  *             // fall through
  70.  *         }
  71.  *     }
  72.  *     ...
  73.  * }
  74.  * </pre>
  75.  *
  76.  * <p>
  77.  * The following code fragment shows how a {@link TimedSemaphore} is created
  78.  * that allows only 10 operations per second and passed to the statistics
  79.  * thread:
  80.  * </p>
  81.  *
  82.  * <pre>
  83.  * TimedSemaphore sem = new TimedSemaphore(1, TimeUnit.SECOND, 10);
  84.  * StatisticsThread thread = new StatisticsThread(sem);
  85.  * thread.start();
  86.  * </pre>
  87.  *
  88.  * <p>
  89.  * When creating an instance the time period for the semaphore must be
  90.  * specified. {@link TimedSemaphore} uses an executor service with a
  91.  * corresponding period to monitor this interval. The {@code
  92.  * ScheduledExecutorService} to be used for this purpose can be provided at
  93.  * construction time. Alternatively the class creates an internal executor
  94.  * service.
  95.  * </p>
  96.  * <p>
  97.  * Client code that uses {@link TimedSemaphore} has to call the
  98.  * {@link #acquire()} method in each processing step. {@link TimedSemaphore}
  99.  * keeps track of the number of invocations of the {@link #acquire()} method and
  100.  * blocks the calling thread if the counter exceeds the limit specified. When
  101.  * the timer signals the end of the time period the counter is reset and all
  102.  * waiting threads are released. Then another cycle can start.
  103.  * </p>
  104.  * <p>
  105.  * An alternative to {@code acquire()} is the {@link #tryAcquire()} method. This
  106.  * method checks whether the semaphore is under the specified limit and
  107.  * increases the internal counter if this is the case. The return value is then
  108.  * <strong>true</strong>, and the calling thread can continue with its action.
  109.  * If the semaphore is already at its limit, {@code tryAcquire()} immediately
  110.  * returns <strong>false</strong> without blocking; the calling thread must
  111.  * then abort its action. This usage scenario prevents blocking of threads.
  112.  * </p>
  113.  * <p>
  114.  * It is possible to modify the limit at any time using the
  115.  * {@link #setLimit(int)} method. This is useful if the load produced by an
  116.  * operation has to be adapted dynamically. In the example scenario with the
  117.  * thread collecting statistics it may make sense to specify a low limit during
  118.  * day time while allowing a higher load in the night time. Reducing the limit
  119.  * takes effect immediately by blocking incoming callers. If the limit is
  120.  * increased, waiting threads are not released immediately, but wake up when the
  121.  * timer runs out. Then, in the next period more processing steps can be
  122.  * performed without blocking. By setting the limit to 0 the semaphore can be
  123.  * switched off: in this mode the {@link #acquire()} method never blocks, but
  124.  * lets all callers pass directly.
  125.  * </p>
  126.  * <p>
  127.  * When the {@link TimedSemaphore} is no more needed its {@link #shutdown()}
  128.  * method should be called. This causes the periodic task that monitors the time
  129.  * interval to be canceled. If the {@link ScheduledExecutorService} has been
  130.  * created by the semaphore at construction time, it is also shut down.
  131.  * resources. After that {@link #acquire()} must not be called any more.
  132.  * </p>
  133.  *
  134.  * @since 3.0
  135.  */
  136. public class TimedSemaphore {
  137.     /**
  138.      * Constant for a value representing no limit. If the limit is set to a
  139.      * value less or equal this constant, the {@link TimedSemaphore} will be
  140.      * effectively switched off.
  141.      */
  142.     public static final int NO_LIMIT = 0;

  143.     /** Constant for the thread pool size for the executor. */
  144.     private static final int THREAD_POOL_SIZE = 1;

  145.     /** The executor service for managing the timer thread. */
  146.     private final ScheduledExecutorService executorService;

  147.     /** Stores the period for this timed semaphore. */
  148.     private final long period;

  149.     /** The time unit for the period. */
  150.     private final TimeUnit unit;

  151.     /** A flag whether the executor service was created by this object. */
  152.     private final boolean ownExecutor;

  153.     /** A future object representing the timer task. */
  154.     private ScheduledFuture<?> task; // @GuardedBy("this")

  155.     /** Stores the total number of invocations of the acquire() method. */
  156.     private long totalAcquireCount; // @GuardedBy("this")

  157.     /**
  158.      * The counter for the periods. This counter is increased every time a
  159.      * period ends.
  160.      */
  161.     private long periodCount; // @GuardedBy("this")

  162.     /** The limit. */
  163.     private int limit; // @GuardedBy("this")

  164.     /** The current counter. */
  165.     private int acquireCount;  // @GuardedBy("this")

  166.     /** The number of invocations of acquire() in the last period. */
  167.     private int lastCallsPerPeriod; // @GuardedBy("this")

  168.     /** A flag whether shutdown() was called. */
  169.     private boolean shutdown;  // @GuardedBy("this")

  170.     /**
  171.      * Creates a new instance of {@link TimedSemaphore} and initializes it with
  172.      * the given time period and the limit.
  173.      *
  174.      * @param timePeriod the time period
  175.      * @param timeUnit the unit for the period
  176.      * @param limit the limit for the semaphore
  177.      * @throws IllegalArgumentException if the period is less or equals 0
  178.      */
  179.     public TimedSemaphore(final long timePeriod, final TimeUnit timeUnit, final int limit) {
  180.         this(null, timePeriod, timeUnit, limit);
  181.     }

  182.     /**
  183.      * Creates a new instance of {@link TimedSemaphore} and initializes it with
  184.      * an executor service, the given time period, and the limit. The executor
  185.      * service will be used for creating a periodic task for monitoring the time
  186.      * period. It can be <b>null</b>, then a default service will be created.
  187.      *
  188.      * @param service the executor service
  189.      * @param timePeriod the time period
  190.      * @param timeUnit the unit for the period
  191.      * @param limit the limit for the semaphore
  192.      * @throws IllegalArgumentException if the period is less or equals 0
  193.      */
  194.     public TimedSemaphore(final ScheduledExecutorService service, final long timePeriod,
  195.             final TimeUnit timeUnit, final int limit) {
  196.         Validate.inclusiveBetween(1, Long.MAX_VALUE, timePeriod, "Time period must be greater than 0!");

  197.         period = timePeriod;
  198.         unit = timeUnit;

  199.         if (service != null) {
  200.             executorService = service;
  201.             ownExecutor = false;
  202.         } else {
  203.             final ScheduledThreadPoolExecutor s = new ScheduledThreadPoolExecutor(
  204.                     THREAD_POOL_SIZE);
  205.             s.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
  206.             s.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
  207.             executorService = s;
  208.             ownExecutor = true;
  209.         }

  210.         setLimit(limit);
  211.     }

  212.     /**
  213.      * Acquires a permit from this semaphore. This method will block if
  214.      * the limit for the current period has already been reached. If
  215.      * {@link #shutdown()} has already been invoked, calling this method will
  216.      * cause an exception. The very first call of this method starts the timer
  217.      * task which monitors the time period set for this {@link TimedSemaphore}.
  218.      * From now on the semaphore is active.
  219.      *
  220.      * @throws InterruptedException if the thread gets interrupted
  221.      * @throws IllegalStateException if this semaphore is already shut down
  222.      */
  223.     public synchronized void acquire() throws InterruptedException {
  224.         prepareAcquire();

  225.         boolean canPass;
  226.         do {
  227.             canPass = acquirePermit();
  228.             if (!canPass) {
  229.                 wait();
  230.             }
  231.         } while (!canPass);
  232.     }

  233.     /**
  234.      * Internal helper method for acquiring a permit. This method checks whether currently
  235.      * a permit can be acquired and - if so - increases the internal counter. The return
  236.      * value indicates whether a permit could be acquired. This method must be called with
  237.      * the lock of this object held.
  238.      *
  239.      * @return a flag whether a permit could be acquired
  240.      */
  241.     private boolean acquirePermit() {
  242.         if (getLimit() <= NO_LIMIT || acquireCount < getLimit()) {
  243.             acquireCount++;
  244.             return true;
  245.         }
  246.         return false;
  247.     }

  248.     /**
  249.      * The current time period is finished. This method is called by the timer
  250.      * used internally to monitor the time period. It resets the counter and
  251.      * releases the threads waiting for this barrier.
  252.      */
  253.     synchronized void endOfPeriod() {
  254.         lastCallsPerPeriod = acquireCount;
  255.         totalAcquireCount += acquireCount;
  256.         periodCount++;
  257.         acquireCount = 0;
  258.         notifyAll();
  259.     }

  260.     /**
  261.      * Returns the number of invocations of the {@link #acquire()} method for
  262.      * the current period. This may be useful for testing or debugging purposes.
  263.      *
  264.      * @return the current number of {@link #acquire()} invocations
  265.      */
  266.     public synchronized int getAcquireCount() {
  267.         return acquireCount;
  268.     }

  269.     /**
  270.      * Returns the number of calls to the {@link #acquire()} method that can
  271.      * still be performed in the current period without blocking. This method
  272.      * can give an indication whether it is safe to call the {@link #acquire()}
  273.      * method without risking to be suspended. However, there is no guarantee
  274.      * that a subsequent call to {@link #acquire()} actually is not-blocking
  275.      * because in the meantime other threads may have invoked the semaphore.
  276.      *
  277.      * @return the current number of available {@link #acquire()} calls in the
  278.      * current period
  279.      */
  280.     public synchronized int getAvailablePermits() {
  281.         return getLimit() - getAcquireCount();
  282.     }

  283.     /**
  284.      * Returns the average number of successful (i.e. non-blocking)
  285.      * {@link #acquire()} invocations for the entire life-time of this {@code
  286.      * TimedSemaphore}. This method can be used for instance for statistical
  287.      * calculations.
  288.      *
  289.      * @return the average number of {@link #acquire()} invocations per time
  290.      * unit
  291.      */
  292.     public synchronized double getAverageCallsPerPeriod() {
  293.         return periodCount == 0 ? 0 : (double) totalAcquireCount
  294.                 / (double) periodCount;
  295.     }

  296.     /**
  297.      * Returns the executor service used by this instance.
  298.      *
  299.      * @return the executor service
  300.      */
  301.     protected ScheduledExecutorService getExecutorService() {
  302.         return executorService;
  303.     }

  304.     /**
  305.      * Returns the number of (successful) acquire invocations during the last
  306.      * period. This is the number of times the {@link #acquire()} method was
  307.      * called without blocking. This can be useful for testing or debugging
  308.      * purposes or to determine a meaningful threshold value. If a limit is set,
  309.      * the value returned by this method won't be greater than this limit.
  310.      *
  311.      * @return the number of non-blocking invocations of the {@link #acquire()}
  312.      * method
  313.      */
  314.     public synchronized int getLastAcquiresPerPeriod() {
  315.         return lastCallsPerPeriod;
  316.     }

  317.     /**
  318.      * Returns the limit enforced by this semaphore. The limit determines how
  319.      * many invocations of {@link #acquire()} are allowed within the monitored
  320.      * period.
  321.      *
  322.      * @return the limit
  323.      */
  324.     public final synchronized int getLimit() {
  325.         return limit;
  326.     }

  327.     /**
  328.      * Returns the time period. This is the time monitored by this semaphore.
  329.      * Only a given number of invocations of the {@link #acquire()} method is
  330.      * possible in this period.
  331.      *
  332.      * @return the time period
  333.      */
  334.     public long getPeriod() {
  335.         return period;
  336.     }

  337.     /**
  338.      * Returns the time unit. This is the unit used by {@link #getPeriod()}.
  339.      *
  340.      * @return the time unit
  341.      */
  342.     public TimeUnit getUnit() {
  343.         return unit;
  344.     }

  345.     /**
  346.      * Tests whether the {@link #shutdown()} method has been called on this
  347.      * object. If this method returns <b>true</b>, this instance cannot be used
  348.      * any longer.
  349.      *
  350.      * @return a flag whether a shutdown has been performed
  351.      */
  352.     public synchronized boolean isShutdown() {
  353.         return shutdown;
  354.     }

  355.     /**
  356.      * Prepares an acquire operation. Checks for the current state and starts the internal
  357.      * timer if necessary. This method must be called with the lock of this object held.
  358.      */
  359.     private void prepareAcquire() {
  360.         if (isShutdown()) {
  361.             throw new IllegalStateException("TimedSemaphore is shut down!");
  362.         }

  363.         if (task == null) {
  364.             task = startTimer();
  365.         }
  366.     }

  367.     /**
  368.      * Sets the limit. This is the number of times the {@link #acquire()} method
  369.      * can be called within the time period specified. If this limit is reached,
  370.      * further invocations of {@link #acquire()} will block. Setting the limit
  371.      * to a value &lt;= {@link #NO_LIMIT} will cause the limit to be disabled,
  372.      * i.e. an arbitrary number of{@link #acquire()} invocations is allowed in
  373.      * the time period.
  374.      *
  375.      * @param limit the limit
  376.      */
  377.     public final synchronized void setLimit(final int limit) {
  378.         this.limit = limit;
  379.     }

  380.     /**
  381.      * Initializes a shutdown. After that the object cannot be used anymore.
  382.      * This method can be invoked an arbitrary number of times. All invocations
  383.      * after the first one do not have any effect.
  384.      */
  385.     public synchronized void shutdown() {
  386.         if (!shutdown) {

  387.             if (ownExecutor) {
  388.                 // if the executor was created by this instance, it has
  389.                 // to be shutdown
  390.                 getExecutorService().shutdownNow();
  391.             }
  392.             if (task != null) {
  393.                 task.cancel(false);
  394.             }

  395.             shutdown = true;
  396.         }
  397.     }

  398.     /**
  399.      * Starts the timer. This method is called when {@link #acquire()} is called
  400.      * for the first time. It schedules a task to be executed at fixed rate to
  401.      * monitor the time period specified.
  402.      *
  403.      * @return a future object representing the task scheduled
  404.      */
  405.     protected ScheduledFuture<?> startTimer() {
  406.         return getExecutorService().scheduleAtFixedRate(this::endOfPeriod, getPeriod(), getPeriod(), getUnit());
  407.     }

  408.     /**
  409.      * Tries to acquire a permit from this semaphore. If the limit of this semaphore has
  410.      * not yet been reached, a permit is acquired, and this method returns
  411.      * <strong>true</strong>. Otherwise, this method returns immediately with the result
  412.      * <strong>false</strong>.
  413.      *
  414.      * @return <strong>true</strong> if a permit could be acquired; <strong>false</strong>
  415.      * otherwise
  416.      * @throws IllegalStateException if this semaphore is already shut down
  417.      * @since 3.5
  418.      */
  419.     public synchronized boolean tryAcquire() {
  420.         prepareAcquire();
  421.         return acquirePermit();
  422.     }
  423. }