View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *      https://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.commons.lang3.concurrent;
18  
19  import static org.apache.commons.lang3.LangAssertions.assertIllegalArgumentException;
20  import static org.junit.jupiter.api.Assertions.assertEquals;
21  import static org.junit.jupiter.api.Assertions.assertFalse;
22  import static org.junit.jupiter.api.Assertions.assertNotNull;
23  import static org.junit.jupiter.api.Assertions.assertThrows;
24  import static org.junit.jupiter.api.Assertions.assertTrue;
25  
26  import java.time.Duration;
27  import java.util.concurrent.CountDownLatch;
28  import java.util.concurrent.ScheduledExecutorService;
29  import java.util.concurrent.ScheduledFuture;
30  import java.util.concurrent.ScheduledThreadPoolExecutor;
31  import java.util.concurrent.TimeUnit;
32  
33  import org.apache.commons.lang3.AbstractLangTest;
34  import org.apache.commons.lang3.ThreadUtils;
35  import org.easymock.EasyMock;
36  import org.junit.jupiter.api.Test;
37  
38  /**
39   * Test class for TimedSemaphore.
40   */
41  class TimedSemaphoreTest extends AbstractLangTest {
42      /**
43       * A test thread class that will be used by tests for triggering the
44       * semaphore. The thread calls the semaphore a configurable number of times.
45       * When this is done, it cannotify the main thread.
46       */
47      private static final class SemaphoreThread extends Thread {
48          /** The semaphore. */
49          private final TimedSemaphore semaphore;
50  
51          /** A latch for communication with the main thread. */
52          private final CountDownLatch latch;
53  
54          /** The number of acquire() calls. */
55          private final int count;
56  
57          /** The number of invocations of the latch. */
58          private final int latchCount;
59  
60          SemaphoreThread(final TimedSemaphore b, final CountDownLatch l, final int c, final int lc) {
61              semaphore = b;
62              latch = l;
63              count = c;
64              latchCount = lc;
65          }
66  
67          /**
68           * Calls acquire() on the semaphore for the specified number of times.
69           * Optionally the latch will also be triggered to synchronize with the
70           * main test thread.
71           */
72          @Override
73          public void run() {
74              try {
75                  for (int i = 0; i < count; i++) {
76                      semaphore.acquire();
77  
78                      if (i < latchCount) {
79                          latch.countDown();
80                      }
81                  }
82              } catch (final InterruptedException iex) {
83                  Thread.currentThread().interrupt();
84              }
85          }
86      }
87  
88      /**
89       * A specialized implementation of {@code TimedSemaphore} that is easier to
90       * test.
91       */
92      private static final class TimedSemaphoreTestImpl extends TimedSemaphore {
93          /** A mock scheduled future. */
94          ScheduledFuture<?> schedFuture;
95  
96          /** A latch for synchronizing with the main thread. */
97          volatile CountDownLatch latch;
98  
99          /** Counter for the endOfPeriod() invocations. */
100         private int periodEnds;
101 
102         TimedSemaphoreTestImpl(final long timePeriod, final TimeUnit timeUnit,
103                 final int limit) {
104             super(timePeriod, timeUnit, limit);
105         }
106 
107         TimedSemaphoreTestImpl(final ScheduledExecutorService service,
108                 final long timePeriod, final TimeUnit timeUnit, final int limit) {
109             super(service, timePeriod, timeUnit, limit);
110         }
111 
112         /**
113          * Invokes the latch if one is set.
114          *
115          * @throws InterruptedException because it is declared that way in TimedSemaphore
116          */
117         @Override
118         public synchronized void acquire() throws InterruptedException {
119             super.acquire();
120             if (latch != null) {
121                 latch.countDown();
122             }
123         }
124 
125         /**
126          * Counts the number of invocations.
127          */
128         @Override
129         protected synchronized void endOfPeriod() {
130             super.endOfPeriod();
131             periodEnds++;
132         }
133 
134         /**
135          * Returns the number of invocations of the endOfPeriod() method.
136          *
137          * @return the endOfPeriod() invocations
138          */
139         int getPeriodEnds() {
140             synchronized (this) {
141                 return periodEnds;
142             }
143         }
144 
145         /**
146          * Either returns the mock future or calls the super method.
147          */
148         @Override
149         protected ScheduledFuture<?> startTimer() {
150             return schedFuture != null ? schedFuture : super.startTimer();
151         }
152     }
153 
154     /**
155      * A test thread class which invokes {@code tryAcquire()} on the test semaphore and
156      * records the return value.
157      */
158     private static final class TryAcquireThread extends Thread {
159         /** The semaphore. */
160         private final TimedSemaphore semaphore;
161 
162         /** A latch for communication with the main thread. */
163         private final CountDownLatch latch;
164 
165         /** Flag whether a permit could be acquired. */
166         private boolean acquired;
167 
168         TryAcquireThread(final TimedSemaphore s, final CountDownLatch l) {
169             semaphore = s;
170             latch = l;
171         }
172 
173         @Override
174         public void run() {
175             try {
176                 if (latch.await(10, TimeUnit.SECONDS)) {
177                     acquired = semaphore.tryAcquire();
178                 }
179             } catch (final InterruptedException iex) {
180                 // ignore
181             }
182         }
183     }
184 
185     /** Constant for the time period. */
186     private static final long PERIOD_MILLIS = 500;
187 
188     private static final Duration DURATION = Duration.ofMillis(PERIOD_MILLIS);
189 
190     /** Constant for the time unit. */
191     private static final TimeUnit UNIT = TimeUnit.MILLISECONDS;
192 
193     /** Constant for the default limit. */
194     private static final int LIMIT = 10;
195 
196     /**
197      * Prepares an executor service mock to expect the start of the timer.
198      *
199      * @param service the mock
200      * @param future the future
201      */
202     private void prepareStartTimer(final ScheduledExecutorService service,
203             final ScheduledFuture<?> future) {
204         service.scheduleAtFixedRate((Runnable) EasyMock.anyObject(), EasyMock
205                 .eq(PERIOD_MILLIS), EasyMock.eq(PERIOD_MILLIS), EasyMock.eq(UNIT));
206         EasyMock.expectLastCall().andReturn(future);
207     }
208 
209     /**
210      * Tests the acquire() method if a limit is set.
211      *
212      * @throws InterruptedException so we don't have to catch it
213      */
214     @Test
215     void testAcquireLimit() throws InterruptedException {
216         final ScheduledExecutorService service = EasyMock
217                 .createMock(ScheduledExecutorService.class);
218         final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
219         prepareStartTimer(service, future);
220         EasyMock.replay(service, future);
221         final int count = 10;
222         final CountDownLatch latch = new CountDownLatch(count - 1);
223         final TimedSemaphore semaphore = new TimedSemaphore(service, PERIOD_MILLIS, UNIT, 1);
224         final SemaphoreThread t = new SemaphoreThread(semaphore, latch, count,
225                 count - 1);
226         semaphore.setLimit(count - 1);
227 
228         // start a thread that calls the semaphore count times
229         t.start();
230         latch.await();
231         // now the semaphore's limit should be reached and the thread blocked
232         assertEquals(count - 1, semaphore.getAcquireCount(), "Wrong semaphore count");
233 
234         // this wakes up the thread, it should call the semaphore once more
235         semaphore.endOfPeriod();
236         t.join();
237         assertEquals(1, semaphore.getAcquireCount(), "Wrong semaphore count (2)");
238         assertEquals(count - 1, semaphore.getLastAcquiresPerPeriod(), "Wrong acquire() count");
239         EasyMock.verify(service, future);
240     }
241 
242     /**
243      * Tests a bigger number of invocations that span multiple periods. The
244      * period is set to a very short time. A background thread calls the
245      * semaphore a large number of times. While it runs at last one end of a
246      * period should be reached.
247      *
248      * @throws InterruptedException so we don't have to catch it
249      */
250     @Test
251     void testAcquireMultiplePeriods() throws InterruptedException {
252         final int count = 1000;
253         final TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(
254                 PERIOD_MILLIS / 10, TimeUnit.MILLISECONDS, 1);
255         semaphore.setLimit(count / 4);
256         final CountDownLatch latch = new CountDownLatch(count);
257         final SemaphoreThread t = new SemaphoreThread(semaphore, latch, count, count);
258         t.start();
259         latch.await();
260         semaphore.shutdown();
261         assertTrue(semaphore.getPeriodEnds() > 0, "End of period not reached");
262     }
263 
264     /**
265      * Tests the acquire() method if more threads are involved than the limit.
266      * This method starts a number of threads that all invoke the semaphore. The
267      * semaphore's limit is set to 1, so in each period only a single thread can
268      * acquire the semaphore.
269      *
270      * @throws InterruptedException so we don't have to catch it
271      */
272     @Test
273     void testAcquireMultipleThreads() throws InterruptedException {
274         final ScheduledExecutorService service = EasyMock
275                 .createMock(ScheduledExecutorService.class);
276         final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
277         prepareStartTimer(service, future);
278         EasyMock.replay(service, future);
279         final TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(service,
280                 PERIOD_MILLIS, UNIT, 1);
281         semaphore.latch = new CountDownLatch(1);
282         final int count = 10;
283         final SemaphoreThread[] threads = new SemaphoreThread[count];
284         for (int i = 0; i < count; i++) {
285             threads[i] = new SemaphoreThread(semaphore, null, 1, 0);
286             threads[i].start();
287         }
288         for (int i = 0; i < count; i++) {
289             semaphore.latch.await();
290             assertEquals(1, semaphore.getAcquireCount(), "Wrong count");
291             semaphore.latch = new CountDownLatch(1);
292             semaphore.endOfPeriod();
293             assertEquals(1, semaphore.getLastAcquiresPerPeriod(), "Wrong acquire count");
294         }
295         for (int i = 0; i < count; i++) {
296             threads[i].join();
297         }
298         EasyMock.verify(service, future);
299     }
300 
301     /**
302      * Tests the acquire() method if no limit is set. A test thread is started
303      * that calls the semaphore a large number of times. Even if the semaphore's
304      * period does not end, the thread should never block.
305      *
306      * @throws InterruptedException so we don't have to catch it
307      */
308     @Test
309     void testAcquireNoLimit() throws InterruptedException {
310         final ScheduledExecutorService service = EasyMock
311                 .createMock(ScheduledExecutorService.class);
312         final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
313         prepareStartTimer(service, future);
314         EasyMock.replay(service, future);
315         final TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(service,
316                 PERIOD_MILLIS, UNIT, TimedSemaphore.NO_LIMIT);
317         final int count = 1000;
318         final CountDownLatch latch = new CountDownLatch(count);
319         final SemaphoreThread t = new SemaphoreThread(semaphore, latch, count, count);
320         t.start();
321         latch.await();
322         EasyMock.verify(service, future);
323     }
324 
325     /**
326      * Tests whether the available non-blocking calls can be queried.
327      *
328      * @throws InterruptedException so we don't have to catch it
329      */
330     @Test
331     void testGetAvailablePermits() throws InterruptedException {
332         final ScheduledExecutorService service = EasyMock
333                 .createMock(ScheduledExecutorService.class);
334         final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
335         prepareStartTimer(service, future);
336         EasyMock.replay(service, future);
337         final TimedSemaphore semaphore = new TimedSemaphore(service, PERIOD_MILLIS, UNIT,
338                 LIMIT);
339         for (int i = 0; i < LIMIT; i++) {
340             assertEquals(LIMIT - i, semaphore.getAvailablePermits(), "Wrong available count at " + i);
341             semaphore.acquire();
342         }
343         semaphore.endOfPeriod();
344         assertEquals(LIMIT, semaphore.getAvailablePermits(), "Wrong available count in new period");
345         EasyMock.verify(service, future);
346     }
347 
348     /**
349      * Tests the methods for statistics.
350      *
351      * @throws InterruptedException so we don't have to catch it
352      */
353     @Test
354     void testGetAverageCallsPerPeriod() throws InterruptedException {
355         final ScheduledExecutorService service = EasyMock
356                 .createMock(ScheduledExecutorService.class);
357         final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
358         prepareStartTimer(service, future);
359         EasyMock.replay(service, future);
360         final TimedSemaphore semaphore = new TimedSemaphore(service, PERIOD_MILLIS, UNIT,
361                 LIMIT);
362         semaphore.acquire();
363         semaphore.endOfPeriod();
364         assertEquals(1.0, semaphore.getAverageCallsPerPeriod(), .005, "Wrong average (1)");
365         semaphore.acquire();
366         semaphore.acquire();
367         semaphore.endOfPeriod();
368         assertEquals(1.5, semaphore.getAverageCallsPerPeriod(), .005, "Wrong average (2)");
369         EasyMock.verify(service, future);
370     }
371 
372     /**
373      * Tests creating a new instance.
374      */
375     @Test
376     void testInit() {
377         final ScheduledExecutorService service = EasyMock
378                 .createMock(ScheduledExecutorService.class);
379         EasyMock.replay(service);
380         final TimedSemaphore semaphore = new TimedSemaphore(service, PERIOD_MILLIS, UNIT,
381                 LIMIT);
382         EasyMock.verify(service);
383         assertEquals(service, semaphore.getExecutorService(), "Wrong service");
384         assertEquals(PERIOD_MILLIS, semaphore.getPeriod(), "Wrong period");
385         assertEquals(UNIT, semaphore.getUnit(), "Wrong unit");
386         assertEquals(0, semaphore.getLastAcquiresPerPeriod(), "Statistic available");
387         assertEquals(0.0, semaphore.getAverageCallsPerPeriod(), .05, "Average available");
388         assertFalse(semaphore.isShutdown(), "Already shutdown");
389         assertEquals(LIMIT, semaphore.getLimit(), "Wrong limit");
390     }
391 
392     /**
393      * Tests whether a default executor service is created if no service is
394      * provided.
395      */
396     @Test
397     void testInitDefaultService() {
398         final TimedSemaphore semaphore = new TimedSemaphore(PERIOD_MILLIS, UNIT, LIMIT);
399         final ScheduledThreadPoolExecutor exec = (ScheduledThreadPoolExecutor) semaphore
400                 .getExecutorService();
401         assertFalse(exec.getContinueExistingPeriodicTasksAfterShutdownPolicy(), "Wrong periodic task policy");
402         assertFalse(exec.getExecuteExistingDelayedTasksAfterShutdownPolicy(), "Wrong delayed task policy");
403         assertFalse(exec.isShutdown(), "Already shutdown");
404         semaphore.shutdown();
405     }
406 
407     /**
408      * Tries to create an instance with a negative period. This should cause an
409      * exception.
410      */
411     @Test
412     void testInitInvalidPeriod() {
413         assertIllegalArgumentException(() -> new TimedSemaphore(0L, UNIT, LIMIT));
414     }
415 
416     /**
417      * Tries to call acquire() after shutdown(). This should cause an exception.
418      */
419     @Test
420     void testPassAfterShutdown() {
421         final TimedSemaphore semaphore = new TimedSemaphore(PERIOD_MILLIS, UNIT, LIMIT);
422         semaphore.shutdown();
423         assertThrows(IllegalStateException.class, semaphore::acquire);
424     }
425 
426     /**
427      * Tests multiple invocations of the shutdown() method.
428      *
429      * @throws InterruptedException so we don't have to catch it
430      */
431     @Test
432     void testShutdownMultipleTimes() throws InterruptedException {
433         final ScheduledExecutorService service = EasyMock
434                 .createMock(ScheduledExecutorService.class);
435         final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
436         prepareStartTimer(service, future);
437         EasyMock.expect(Boolean.valueOf(future.cancel(false))).andReturn(Boolean.TRUE);
438         EasyMock.replay(service, future);
439         final TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(service,
440                 PERIOD_MILLIS, UNIT, LIMIT);
441         semaphore.acquire();
442         for (int i = 0; i < 10; i++) {
443             semaphore.shutdown();
444         }
445         EasyMock.verify(service, future);
446     }
447 
448     /**
449      * Tests the shutdown() method if the executor belongs to the semaphore. In
450      * this case it has to be shut down.
451      */
452     @Test
453     void testShutdownOwnExecutor() {
454         final TimedSemaphore semaphore = new TimedSemaphore(PERIOD_MILLIS, UNIT, LIMIT);
455         semaphore.shutdown();
456         assertTrue(semaphore.isShutdown(), "Not shutdown");
457         assertTrue(semaphore.getExecutorService().isShutdown(), "Executor not shutdown");
458     }
459 
460     /**
461      * Tests the shutdown() method for a shared executor service before a task
462      * was started. This should do pretty much nothing.
463      */
464     @Test
465     void testShutdownSharedExecutorNoTask() {
466         final ScheduledExecutorService service = EasyMock
467                 .createMock(ScheduledExecutorService.class);
468         EasyMock.replay(service);
469         final TimedSemaphore semaphore = new TimedSemaphore(service, PERIOD_MILLIS, UNIT,
470                 LIMIT);
471         semaphore.shutdown();
472         assertTrue(semaphore.isShutdown(), "Not shutdown");
473         EasyMock.verify(service);
474     }
475 
476     /**
477      * Tests the shutdown() method for a shared executor after the task was
478      * started. In this case the task must be canceled.
479      *
480      * @throws InterruptedException so we don't have to catch it
481      */
482     @Test
483     void testShutdownSharedExecutorTask() throws InterruptedException {
484         final ScheduledExecutorService service = EasyMock
485                 .createMock(ScheduledExecutorService.class);
486         final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
487         prepareStartTimer(service, future);
488         EasyMock.expect(Boolean.valueOf(future.cancel(false))).andReturn(Boolean.TRUE);
489         EasyMock.replay(service, future);
490         final TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(service,
491                 PERIOD_MILLIS, UNIT, LIMIT);
492         semaphore.acquire();
493         semaphore.shutdown();
494         assertTrue(semaphore.isShutdown(), "Not shutdown");
495         EasyMock.verify(service, future);
496     }
497 
498     /**
499      * Tests starting the timer.
500      *
501      * @throws InterruptedException so we don't have to catch it
502      */
503     @Test
504     void testStartTimer() throws InterruptedException {
505         final TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(PERIOD_MILLIS,
506                 UNIT, LIMIT);
507         final ScheduledFuture<?> future = semaphore.startTimer();
508         assertNotNull(future, "No future returned");
509         ThreadUtils.sleepQuietly(DURATION);
510         final int trials = 10;
511         int count = 0;
512         do {
513             Thread.sleep(PERIOD_MILLIS);
514             assertFalse(count++ > trials, "endOfPeriod() not called!");
515         } while (semaphore.getPeriodEnds() <= 0);
516         semaphore.shutdown();
517     }
518 
519     /**
520      * Tests the tryAcquire() method. It is checked whether the semaphore can be acquired
521      * by a bunch of threads the expected number of times and not more.
522      */
523     @Test
524     void testTryAcquire() throws InterruptedException {
525         final TimedSemaphore semaphore = new TimedSemaphore(PERIOD_MILLIS, TimeUnit.SECONDS,
526                 LIMIT);
527         final TryAcquireThread[] threads = new TryAcquireThread[3 * LIMIT];
528         final CountDownLatch latch = new CountDownLatch(1);
529         for (int i = 0; i < threads.length; i++) {
530             threads[i] = new TryAcquireThread(semaphore, latch);
531             threads[i].start();
532         }
533 
534         latch.countDown();
535         int permits = 0;
536         for (final TryAcquireThread t : threads) {
537             t.join();
538             if (t.acquired) {
539                 permits++;
540             }
541         }
542         assertEquals(LIMIT, permits, "Wrong number of permits granted");
543     }
544 
545     /**
546      * Tries to call tryAcquire() after shutdown(). This should cause an exception.
547      */
548     @Test
549     void testTryAcquireAfterShutdown() {
550         final TimedSemaphore semaphore = new TimedSemaphore(PERIOD_MILLIS, UNIT, LIMIT);
551         semaphore.shutdown();
552         assertThrows(IllegalStateException.class, semaphore::tryAcquire);
553     }
554 }