View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *      https://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.commons.lang3.concurrent;
18  
19  import static org.apache.commons.lang3.LangAssertions.assertIllegalArgumentException;
20  import static org.junit.jupiter.api.Assertions.assertEquals;
21  import static org.junit.jupiter.api.Assertions.assertFalse;
22  import static org.junit.jupiter.api.Assertions.assertNotNull;
23  import static org.junit.jupiter.api.Assertions.assertThrows;
24  import static org.junit.jupiter.api.Assertions.assertTrue;
25  
26  import java.time.Duration;
27  import java.util.concurrent.CountDownLatch;
28  import java.util.concurrent.ScheduledExecutorService;
29  import java.util.concurrent.ScheduledFuture;
30  import java.util.concurrent.ScheduledThreadPoolExecutor;
31  import java.util.concurrent.TimeUnit;
32  
33  import org.apache.commons.lang3.AbstractLangTest;
34  import org.apache.commons.lang3.ThreadUtils;
35  import org.easymock.EasyMock;
36  import org.junit.jupiter.api.Test;
37  
38  /**
39   * Tests {@link TimedSemaphore}.
40   */
41  class TimedSemaphoreTest extends AbstractLangTest {
42  
43      /**
44       * A test thread class that will be used by tests for triggering the
45       * semaphore. The thread calls the semaphore a configurable number of times.
46       * When this is done, it cannotify the main thread.
47       */
48      private static final class SemaphoreThread extends Thread {
49  
50          /** The semaphore. */
51          private final TimedSemaphore semaphore;
52  
53          /** A latch for communication with the main thread. */
54          private final CountDownLatch latch;
55  
56          /** The number of acquire() calls. */
57          private final int count;
58  
59          /** The number of invocations of the latch. */
60          private final int latchCount;
61  
62          SemaphoreThread(final TimedSemaphore b, final CountDownLatch l, final int c, final int lc) {
63              semaphore = b;
64              latch = l;
65              count = c;
66              latchCount = lc;
67          }
68  
69          /**
70           * Calls acquire() on the semaphore for the specified number of times.
71           * Optionally the latch will also be triggered to synchronize with the
72           * main test thread.
73           */
74          @Override
75          public void run() {
76              try {
77                  for (int i = 0; i < count; i++) {
78                      semaphore.acquire();
79  
80                      if (i < latchCount) {
81                          latch.countDown();
82                      }
83                  }
84              } catch (final InterruptedException iex) {
85                  Thread.currentThread().interrupt();
86              }
87          }
88      }
89  
90      /**
91       * A specialized implementation of {@code TimedSemaphore} that is easier to
92       * test.
93       */
94      private static final class TimedSemaphoreTestImpl extends TimedSemaphore {
95  
96          /** A mock scheduled future. */
97          ScheduledFuture<?> schedFuture;
98  
99          /** A latch for synchronizing with the main thread. */
100         volatile CountDownLatch latch;
101 
102         /** Counter for the endOfPeriod() invocations. */
103         private int periodEnds;
104 
105         TimedSemaphoreTestImpl(final long timePeriod, final TimeUnit timeUnit, final int limit) {
106             super(timePeriod, timeUnit, limit);
107         }
108 
109         TimedSemaphoreTestImpl(final ScheduledExecutorService service, final long timePeriod, final TimeUnit timeUnit, final int limit) {
110             super(service, timePeriod, timeUnit, limit);
111         }
112 
113         /**
114          * Invokes the latch if one is set.
115          *
116          * @throws InterruptedException because it is declared that way in TimedSemaphore
117          */
118         @Override
119         public synchronized void acquire() throws InterruptedException {
120             super.acquire();
121             if (latch != null) {
122                 latch.countDown();
123             }
124         }
125 
126         /**
127          * Counts the number of invocations.
128          */
129         @Override
130         protected synchronized void endOfPeriod() {
131             super.endOfPeriod();
132             periodEnds++;
133         }
134 
135         /**
136          * Returns the number of invocations of the endOfPeriod() method.
137          *
138          * @return the endOfPeriod() invocations
139          */
140         int getPeriodEnds() {
141             synchronized (this) {
142                 return periodEnds;
143             }
144         }
145 
146         /**
147          * Either returns the mock future or calls the super method.
148          */
149         @Override
150         protected ScheduledFuture<?> startTimer() {
151             return schedFuture != null ? schedFuture : super.startTimer();
152         }
153     }
154 
155     /**
156      * A test thread class which invokes {@code tryAcquire()} on the test semaphore and
157      * records the return value.
158      */
159     private static final class TryAcquireThread extends Thread {
160 
161         /** The semaphore. */
162         private final TimedSemaphore semaphore;
163 
164         /** A latch for communication with the main thread. */
165         private final CountDownLatch latch;
166 
167         /** Flag whether a permit could be acquired. */
168         private boolean acquired;
169 
170         TryAcquireThread(final TimedSemaphore s, final CountDownLatch l) {
171             semaphore = s;
172             latch = l;
173         }
174 
175         @Override
176         public void run() {
177             try {
178                 if (latch.await(10, TimeUnit.SECONDS)) {
179                     acquired = semaphore.tryAcquire();
180                 }
181             } catch (final InterruptedException iex) {
182                 // ignore
183             }
184         }
185     }
186 
187     /** Constant for the time period. */
188     private static final long PERIOD_MILLIS = 500;
189 
190     private static final Duration DURATION = Duration.ofMillis(PERIOD_MILLIS);
191 
192     /** Constant for the time unit. */
193     private static final TimeUnit UNIT = TimeUnit.MILLISECONDS;
194 
195     /** Constant for the default limit. */
196     private static final int LIMIT = 10;
197 
198     /**
199      * Prepares an executor service mock to expect the start of the timer.
200      *
201      * @param service the mock
202      * @param future the future
203      */
204     private void prepareStartTimer(final ScheduledExecutorService service,
205             final ScheduledFuture<?> future) {
206         service.scheduleAtFixedRate((Runnable) EasyMock.anyObject(), EasyMock.eq(PERIOD_MILLIS), EasyMock.eq(PERIOD_MILLIS), EasyMock.eq(UNIT));
207         EasyMock.expectLastCall().andReturn(future);
208     }
209 
210     /**
211      * Tests the acquire() method if a limit is set.
212      *
213      * @throws InterruptedException so we don't have to catch it
214      */
215     @Test
216     void testAcquireLimit() throws InterruptedException {
217         final ScheduledExecutorService service = EasyMock.createMock(ScheduledExecutorService.class);
218         final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
219         prepareStartTimer(service, future);
220         EasyMock.replay(service, future);
221         final int count = 10;
222         final CountDownLatch latch = new CountDownLatch(count - 1);
223         final TimedSemaphore semaphore = new TimedSemaphore(service, PERIOD_MILLIS, UNIT, 1);
224         final SemaphoreThread t = new SemaphoreThread(semaphore, latch, count, count - 1);
225         semaphore.setLimit(count - 1);
226         // start a thread that calls the semaphore count times
227         t.start();
228         latch.await();
229         // now the semaphore's limit should be reached and the thread blocked
230         assertEquals(count - 1, semaphore.getAcquireCount(), "Wrong semaphore count");
231         // this wakes up the thread, it should call the semaphore once more
232         semaphore.endOfPeriod();
233         t.join();
234         assertEquals(1, semaphore.getAcquireCount(), "Wrong semaphore count (2)");
235         assertEquals(count - 1, semaphore.getLastAcquiresPerPeriod(), "Wrong acquire() count");
236         EasyMock.verify(service, future);
237     }
238 
239     /**
240      * Tests a bigger number of invocations that span multiple periods. The
241      * period is set to a very short time. A background thread calls the
242      * semaphore a large number of times. While it runs at last one end of a
243      * period should be reached.
244      *
245      * @throws InterruptedException so we don't have to catch it
246      */
247     @Test
248     void testAcquireMultiplePeriods() throws InterruptedException {
249         final int count = 1000;
250         final TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(PERIOD_MILLIS / 10, TimeUnit.MILLISECONDS, 1);
251         semaphore.setLimit(count / 4);
252         final CountDownLatch latch = new CountDownLatch(count);
253         final SemaphoreThread t = new SemaphoreThread(semaphore, latch, count, count);
254         t.start();
255         latch.await();
256         semaphore.shutdown();
257         assertTrue(semaphore.getPeriodEnds() > 0, "End of period not reached");
258     }
259 
260     /**
261      * Tests the acquire() method if more threads are involved than the limit.
262      * This method starts a number of threads that all invoke the semaphore. The
263      * semaphore's limit is set to 1, so in each period only a single thread can
264      * acquire the semaphore.
265      *
266      * @throws InterruptedException so we don't have to catch it
267      */
268     @Test
269     void testAcquireMultipleThreads() throws InterruptedException {
270         final ScheduledExecutorService service = EasyMock.createMock(ScheduledExecutorService.class);
271         final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
272         prepareStartTimer(service, future);
273         EasyMock.replay(service, future);
274         final TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(service, PERIOD_MILLIS, UNIT, 1);
275         semaphore.latch = new CountDownLatch(1);
276         final int count = 10;
277         final SemaphoreThread[] threads = new SemaphoreThread[count];
278         for (int i = 0; i < count; i++) {
279             threads[i] = new SemaphoreThread(semaphore, null, 1, 0);
280             threads[i].start();
281         }
282         for (int i = 0; i < count; i++) {
283             semaphore.latch.await();
284             assertEquals(1, semaphore.getAcquireCount(), "Wrong count");
285             semaphore.latch = new CountDownLatch(1);
286             semaphore.endOfPeriod();
287             assertEquals(1, semaphore.getLastAcquiresPerPeriod(), "Wrong acquire count");
288         }
289         for (int i = 0; i < count; i++) {
290             threads[i].join();
291         }
292         EasyMock.verify(service, future);
293     }
294 
295     /**
296      * Tests the acquire() method if no limit is set. A test thread is started
297      * that calls the semaphore a large number of times. Even if the semaphore's
298      * period does not end, the thread should never block.
299      *
300      * @throws InterruptedException so we don't have to catch it
301      */
302     @Test
303     void testAcquireNoLimit() throws InterruptedException {
304         final ScheduledExecutorService service = EasyMock.createMock(ScheduledExecutorService.class);
305         final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
306         prepareStartTimer(service, future);
307         EasyMock.replay(service, future);
308         final TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(service, PERIOD_MILLIS, UNIT, TimedSemaphore.NO_LIMIT);
309         final int count = 1000;
310         final CountDownLatch latch = new CountDownLatch(count);
311         final SemaphoreThread t = new SemaphoreThread(semaphore, latch, count, count);
312         t.start();
313         latch.await();
314         EasyMock.verify(service, future);
315     }
316 
317     /**
318      * Tests whether the available non-blocking calls can be queried.
319      *
320      * @throws InterruptedException so we don't have to catch it
321      */
322     @Test
323     void testGetAvailablePermits() throws InterruptedException {
324         final ScheduledExecutorService service = EasyMock.createMock(ScheduledExecutorService.class);
325         final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
326         prepareStartTimer(service, future);
327         EasyMock.replay(service, future);
328         final TimedSemaphore semaphore = new TimedSemaphore(service, PERIOD_MILLIS, UNIT, LIMIT);
329         for (int i = 0; i < LIMIT; i++) {
330             assertEquals(LIMIT - i, semaphore.getAvailablePermits(), "Wrong available count at " + i);
331             semaphore.acquire();
332         }
333         semaphore.endOfPeriod();
334         assertEquals(LIMIT, semaphore.getAvailablePermits(), "Wrong available count in new period");
335         EasyMock.verify(service, future);
336     }
337 
338     /**
339      * Tests the methods for statistics.
340      *
341      * @throws InterruptedException so we don't have to catch it
342      */
343     @Test
344     void testGetAverageCallsPerPeriod() throws InterruptedException {
345         final ScheduledExecutorService service = EasyMock.createMock(ScheduledExecutorService.class);
346         final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
347         prepareStartTimer(service, future);
348         EasyMock.replay(service, future);
349         final TimedSemaphore semaphore = new TimedSemaphore(service, PERIOD_MILLIS, UNIT, LIMIT);
350         semaphore.acquire();
351         semaphore.endOfPeriod();
352         assertEquals(1.0, semaphore.getAverageCallsPerPeriod(), .005, "Wrong average (1)");
353         semaphore.acquire();
354         semaphore.acquire();
355         semaphore.endOfPeriod();
356         assertEquals(1.5, semaphore.getAverageCallsPerPeriod(), .005, "Wrong average (2)");
357         EasyMock.verify(service, future);
358     }
359 
360     /**
361      * Tests creating a new instance.
362      */
363     @Test
364     void testInit() {
365         final ScheduledExecutorService service = EasyMock.createMock(ScheduledExecutorService.class);
366         EasyMock.replay(service);
367         final TimedSemaphore semaphore = new TimedSemaphore(service, PERIOD_MILLIS, UNIT, LIMIT);
368         EasyMock.verify(service);
369         assertEquals(service, semaphore.getExecutorService(), "Wrong service");
370         assertEquals(PERIOD_MILLIS, semaphore.getPeriod(), "Wrong period");
371         assertEquals(UNIT, semaphore.getUnit(), "Wrong unit");
372         assertEquals(0, semaphore.getLastAcquiresPerPeriod(), "Statistic available");
373         assertEquals(0.0, semaphore.getAverageCallsPerPeriod(), .05, "Average available");
374         assertFalse(semaphore.isShutdown(), "Already shutdown");
375         assertEquals(LIMIT, semaphore.getLimit(), "Wrong limit");
376     }
377 
378     /**
379      * Tests whether a default executor service is created if no service is
380      * provided.
381      */
382     @Test
383     void testInitDefaultService() {
384         final TimedSemaphore semaphore = new TimedSemaphore(PERIOD_MILLIS, UNIT, LIMIT);
385         final ScheduledThreadPoolExecutor exec = (ScheduledThreadPoolExecutor) semaphore.getExecutorService();
386         assertFalse(exec.getContinueExistingPeriodicTasksAfterShutdownPolicy(), "Wrong periodic task policy");
387         assertFalse(exec.getExecuteExistingDelayedTasksAfterShutdownPolicy(), "Wrong delayed task policy");
388         assertFalse(exec.isShutdown(), "Already shutdown");
389         semaphore.shutdown();
390     }
391 
392     /**
393      * Tries to create an instance with a negative period. This should cause an
394      * exception.
395      */
396     @Test
397     void testInitInvalidPeriod() {
398         assertIllegalArgumentException(() -> new TimedSemaphore(0L, UNIT, LIMIT));
399     }
400 
401     /**
402      * Tries to call acquire() after shutdown(). This should cause an exception.
403      */
404     @Test
405     void testPassAfterShutdown() {
406         final TimedSemaphore semaphore = new TimedSemaphore(PERIOD_MILLIS, UNIT, LIMIT);
407         semaphore.shutdown();
408         assertThrows(IllegalStateException.class, semaphore::acquire);
409     }
410 
411     /**
412      * Tests multiple invocations of the shutdown() method.
413      *
414      * @throws InterruptedException so we don't have to catch it
415      */
416     @Test
417     void testShutdownMultipleTimes() throws InterruptedException {
418         final ScheduledExecutorService service = EasyMock.createMock(ScheduledExecutorService.class);
419         final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
420         prepareStartTimer(service, future);
421         EasyMock.expect(Boolean.valueOf(future.cancel(false))).andReturn(Boolean.TRUE);
422         EasyMock.replay(service, future);
423         final TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(service, PERIOD_MILLIS, UNIT, LIMIT);
424         semaphore.acquire();
425         for (int i = 0; i < 10; i++) {
426             semaphore.shutdown();
427         }
428         EasyMock.verify(service, future);
429     }
430 
431     /**
432      * Tests the shutdown() method if the executor belongs to the semaphore. In
433      * this case it has to be shut down.
434      */
435     @Test
436     void testShutdownOwnExecutor() {
437         final TimedSemaphore semaphore = new TimedSemaphore(PERIOD_MILLIS, UNIT, LIMIT);
438         semaphore.shutdown();
439         assertTrue(semaphore.isShutdown(), "Not shutdown");
440         assertTrue(semaphore.getExecutorService().isShutdown(), "Executor not shutdown");
441     }
442 
443     /**
444      * Tests the shutdown() method for a shared executor service before a task
445      * was started. This should do pretty much nothing.
446      */
447     @Test
448     void testShutdownSharedExecutorNoTask() {
449         final ScheduledExecutorService service = EasyMock.createMock(ScheduledExecutorService.class);
450         EasyMock.replay(service);
451         final TimedSemaphore semaphore = new TimedSemaphore(service, PERIOD_MILLIS, UNIT, LIMIT);
452         semaphore.shutdown();
453         assertTrue(semaphore.isShutdown(), "Not shutdown");
454         EasyMock.verify(service);
455     }
456 
457     /**
458      * Tests the shutdown() method for a shared executor after the task was
459      * started. In this case the task must be canceled.
460      *
461      * @throws InterruptedException so we don't have to catch it
462      */
463     @Test
464     void testShutdownSharedExecutorTask() throws InterruptedException {
465         final ScheduledExecutorService service = EasyMock.createMock(ScheduledExecutorService.class);
466         final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
467         prepareStartTimer(service, future);
468         EasyMock.expect(Boolean.valueOf(future.cancel(false))).andReturn(Boolean.TRUE);
469         EasyMock.replay(service, future);
470         final TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(service, PERIOD_MILLIS, UNIT, LIMIT);
471         semaphore.acquire();
472         semaphore.shutdown();
473         assertTrue(semaphore.isShutdown(), "Not shutdown");
474         EasyMock.verify(service, future);
475     }
476 
477     /**
478      * Tests starting the timer.
479      *
480      * @throws InterruptedException so we don't have to catch it
481      */
482     @Test
483     void testStartTimer() throws InterruptedException {
484         final TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(PERIOD_MILLIS, UNIT, LIMIT);
485         final ScheduledFuture<?> future = semaphore.startTimer();
486         assertNotNull(future, "No future returned");
487         ThreadUtils.sleepQuietly(DURATION);
488         final int trials = 10;
489         int count = 0;
490         do {
491             Thread.sleep(PERIOD_MILLIS);
492             assertFalse(count++ > trials, "endOfPeriod() not called!");
493         } while (semaphore.getPeriodEnds() <= 0);
494         semaphore.shutdown();
495     }
496 
497     /**
498      * Tests the tryAcquire() method. It is checked whether the semaphore can be acquired
499      * by a bunch of threads the expected number of times and not more.
500      */
501     @Test
502     void testTryAcquire() throws InterruptedException {
503         final TimedSemaphore semaphore = new TimedSemaphore(PERIOD_MILLIS, TimeUnit.SECONDS, LIMIT);
504         final TryAcquireThread[] threads = new TryAcquireThread[3 * LIMIT];
505         final CountDownLatch latch = new CountDownLatch(1);
506         for (int i = 0; i < threads.length; i++) {
507             threads[i] = new TryAcquireThread(semaphore, latch);
508             threads[i].start();
509         }
510         latch.countDown();
511         int permits = 0;
512         for (final TryAcquireThread t : threads) {
513             t.join();
514             if (t.acquired) {
515                 permits++;
516             }
517         }
518         assertEquals(LIMIT, permits, "Wrong number of permits granted");
519     }
520 
521     /**
522      * Tries to call tryAcquire() after shutdown(). This should cause an exception.
523      */
524     @Test
525     void testTryAcquireAfterShutdown() {
526         final TimedSemaphore semaphore = new TimedSemaphore(PERIOD_MILLIS, UNIT, LIMIT);
527         semaphore.shutdown();
528         assertThrows(IllegalStateException.class, semaphore::tryAcquire);
529     }
530 }