View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.commons.lang3.concurrent;
18  
19  import static org.junit.jupiter.api.Assertions.assertEquals;
20  import static org.junit.jupiter.api.Assertions.assertFalse;
21  import static org.junit.jupiter.api.Assertions.assertNotNull;
22  import static org.junit.jupiter.api.Assertions.assertThrows;
23  import static org.junit.jupiter.api.Assertions.assertTrue;
24  
25  import java.time.Duration;
26  import java.util.concurrent.CountDownLatch;
27  import java.util.concurrent.ScheduledExecutorService;
28  import java.util.concurrent.ScheduledFuture;
29  import java.util.concurrent.ScheduledThreadPoolExecutor;
30  import java.util.concurrent.TimeUnit;
31  
32  import org.apache.commons.lang3.AbstractLangTest;
33  import org.apache.commons.lang3.ThreadUtils;
34  import org.easymock.EasyMock;
35  import org.junit.jupiter.api.Test;
36  
37  /**
38   * Test class for TimedSemaphore.
39   */
40  public class TimedSemaphoreTest extends AbstractLangTest {
41      /**
42       * A test thread class that will be used by tests for triggering the
43       * semaphore. The thread calls the semaphore a configurable number of times.
44       * When this is done, it can notify the main thread.
45       */
46      private static final class SemaphoreThread extends Thread {
47          /** The semaphore. */
48          private final TimedSemaphore semaphore;
49  
50          /** A latch for communication with the main thread. */
51          private final CountDownLatch latch;
52  
53          /** The number of acquire() calls. */
54          private final int count;
55  
56          /** The number of invocations of the latch. */
57          private final int latchCount;
58  
59          SemaphoreThread(final TimedSemaphore b, final CountDownLatch l, final int c, final int lc) {
60              semaphore = b;
61              latch = l;
62              count = c;
63              latchCount = lc;
64          }
65  
66          /**
67           * Calls acquire() on the semaphore for the specified number of times.
68           * Optionally the latch will also be triggered to synchronize with the
69           * main test thread.
70           */
71          @Override
72          public void run() {
73              try {
74                  for (int i = 0; i < count; i++) {
75                      semaphore.acquire();
76  
77                      if (i < latchCount) {
78                          latch.countDown();
79                      }
80                  }
81              } catch (final InterruptedException iex) {
82                  Thread.currentThread().interrupt();
83              }
84          }
85      }
86  
87      /**
88       * A specialized implementation of {@code TimedSemaphore} that is easier to
89       * test.
90       */
91      private static final class TimedSemaphoreTestImpl extends TimedSemaphore {
92          /** A mock scheduled future. */
93          ScheduledFuture<?> schedFuture;
94  
95          /** A latch for synchronizing with the main thread. */
96          volatile CountDownLatch latch;
97  
98          /** Counter for the endOfPeriod() invocations. */
99          private int periodEnds;
100 
101         TimedSemaphoreTestImpl(final long timePeriod, final TimeUnit timeUnit,
102                 final int limit) {
103             super(timePeriod, timeUnit, limit);
104         }
105 
106         TimedSemaphoreTestImpl(final ScheduledExecutorService service,
107                 final long timePeriod, final TimeUnit timeUnit, final int limit) {
108             super(service, timePeriod, timeUnit, limit);
109         }
110 
111         /**
112          * Invokes the latch if one is set.
113          *
114          * @throws InterruptedException because it is declared that way in TimedSemaphore
115          */
116         @Override
117         public synchronized void acquire() throws InterruptedException {
118             super.acquire();
119             if (latch != null) {
120                 latch.countDown();
121             }
122         }
123 
124         /**
125          * Counts the number of invocations.
126          */
127         @Override
128         protected synchronized void endOfPeriod() {
129             super.endOfPeriod();
130             periodEnds++;
131         }
132 
133         /**
134          * Returns the number of invocations of the endOfPeriod() method.
135          *
136          * @return the endOfPeriod() invocations
137          */
138         int getPeriodEnds() {
139             synchronized (this) {
140                 return periodEnds;
141             }
142         }
143 
144         /**
145          * Either returns the mock future or calls the super method.
146          */
147         @Override
148         protected ScheduledFuture<?> startTimer() {
149             return schedFuture != null ? schedFuture : super.startTimer();
150         }
151     }
152 
153     /**
154      * A test thread class which invokes {@code tryAcquire()} on the test semaphore and
155      * records the return value.
156      */
157     private static final class TryAcquireThread extends Thread {
158         /** The semaphore. */
159         private final TimedSemaphore semaphore;
160 
161         /** A latch for communication with the main thread. */
162         private final CountDownLatch latch;
163 
164         /** Flag whether a permit could be acquired. */
165         private boolean acquired;
166 
167         TryAcquireThread(final TimedSemaphore s, final CountDownLatch l) {
168             semaphore = s;
169             latch = l;
170         }
171 
172         @Override
173         public void run() {
174             try {
175                 if (latch.await(10, TimeUnit.SECONDS)) {
176                     acquired = semaphore.tryAcquire();
177                 }
178             } catch (final InterruptedException iex) {
179                 // ignore
180             }
181         }
182     }
183 
184     /** Constant for the time period. */
185     private static final long PERIOD_MILLIS = 500;
186 
187     private static final Duration DURATION = Duration.ofMillis(PERIOD_MILLIS);
188 
189     /** Constant for the time unit. */
190     private static final TimeUnit UNIT = TimeUnit.MILLISECONDS;
191 
192     /** Constant for the default limit. */
193     private static final int LIMIT = 10;
194 
195     /**
196      * Prepares an executor service mock to expect the start of the timer.
197      *
198      * @param service the mock
199      * @param future the future
200      */
201     private void prepareStartTimer(final ScheduledExecutorService service,
202             final ScheduledFuture<?> future) {
203         service.scheduleAtFixedRate((Runnable) EasyMock.anyObject(), EasyMock
204                 .eq(PERIOD_MILLIS), EasyMock.eq(PERIOD_MILLIS), EasyMock.eq(UNIT));
205         EasyMock.expectLastCall().andReturn(future);
206     }
207 
208     /**
209      * Tests the acquire() method if a limit is set.
210      *
211      * @throws InterruptedException so we don't have to catch it
212      */
213     @Test
214     public void testAcquireLimit() throws InterruptedException {
215         final ScheduledExecutorService service = EasyMock
216                 .createMock(ScheduledExecutorService.class);
217         final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
218         prepareStartTimer(service, future);
219         EasyMock.replay(service, future);
220         final int count = 10;
221         final CountDownLatch latch = new CountDownLatch(count - 1);
222         final TimedSemaphore semaphore = new TimedSemaphore(service, PERIOD_MILLIS, UNIT, 1);
223         final SemaphoreThread t = new SemaphoreThread(semaphore, latch, count,
224                 count - 1);
225         semaphore.setLimit(count - 1);
226 
227         // start a thread that calls the semaphore count times
228         t.start();
229         latch.await();
230         // now the semaphore's limit should be reached and the thread blocked
231         assertEquals(count - 1, semaphore.getAcquireCount(), "Wrong semaphore count");
232 
233         // this wakes up the thread, it should call the semaphore once more
234         semaphore.endOfPeriod();
235         t.join();
236         assertEquals(1, semaphore.getAcquireCount(), "Wrong semaphore count (2)");
237         assertEquals(count - 1, semaphore.getLastAcquiresPerPeriod(), "Wrong acquire() count");
238         EasyMock.verify(service, future);
239     }
240 
241     /**
242      * Tests a bigger number of invocations that span multiple periods. The
243      * period is set to a very short time. A background thread calls the
244      * semaphore a large number of times. While it runs at last one end of a
245      * period should be reached.
246      *
247      * @throws InterruptedException so we don't have to catch it
248      */
249     @Test
250     public void testAcquireMultiplePeriods() throws InterruptedException {
251         final int count = 1000;
252         final TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(
253                 PERIOD_MILLIS / 10, TimeUnit.MILLISECONDS, 1);
254         semaphore.setLimit(count / 4);
255         final CountDownLatch latch = new CountDownLatch(count);
256         final SemaphoreThread t = new SemaphoreThread(semaphore, latch, count, count);
257         t.start();
258         latch.await();
259         semaphore.shutdown();
260         assertTrue(semaphore.getPeriodEnds() > 0, "End of period not reached");
261     }
262 
263     /**
264      * Tests the acquire() method if more threads are involved than the limit.
265      * This method starts a number of threads that all invoke the semaphore. The
266      * semaphore's limit is set to 1, so in each period only a single thread can
267      * acquire the semaphore.
268      *
269      * @throws InterruptedException so we don't have to catch it
270      */
271     @Test
272     public void testAcquireMultipleThreads() throws InterruptedException {
273         final ScheduledExecutorService service = EasyMock
274                 .createMock(ScheduledExecutorService.class);
275         final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
276         prepareStartTimer(service, future);
277         EasyMock.replay(service, future);
278         final TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(service,
279                 PERIOD_MILLIS, UNIT, 1);
280         semaphore.latch = new CountDownLatch(1);
281         final int count = 10;
282         final SemaphoreThread[] threads = new SemaphoreThread[count];
283         for (int i = 0; i < count; i++) {
284             threads[i] = new SemaphoreThread(semaphore, null, 1, 0);
285             threads[i].start();
286         }
287         for (int i = 0; i < count; i++) {
288             semaphore.latch.await();
289             assertEquals(1, semaphore.getAcquireCount(), "Wrong count");
290             semaphore.latch = new CountDownLatch(1);
291             semaphore.endOfPeriod();
292             assertEquals(1, semaphore.getLastAcquiresPerPeriod(), "Wrong acquire count");
293         }
294         for (int i = 0; i < count; i++) {
295             threads[i].join();
296         }
297         EasyMock.verify(service, future);
298     }
299 
300     /**
301      * Tests the acquire() method if no limit is set. A test thread is started
302      * that calls the semaphore a large number of times. Even if the semaphore's
303      * period does not end, the thread should never block.
304      *
305      * @throws InterruptedException so we don't have to catch it
306      */
307     @Test
308     public void testAcquireNoLimit() throws InterruptedException {
309         final ScheduledExecutorService service = EasyMock
310                 .createMock(ScheduledExecutorService.class);
311         final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
312         prepareStartTimer(service, future);
313         EasyMock.replay(service, future);
314         final TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(service,
315                 PERIOD_MILLIS, UNIT, TimedSemaphore.NO_LIMIT);
316         final int count = 1000;
317         final CountDownLatch latch = new CountDownLatch(count);
318         final SemaphoreThread t = new SemaphoreThread(semaphore, latch, count, count);
319         t.start();
320         latch.await();
321         EasyMock.verify(service, future);
322     }
323 
324     /**
325      * Tests whether the available non-blocking calls can be queried.
326      *
327      * @throws InterruptedException so we don't have to catch it
328      */
329     @Test
330     public void testGetAvailablePermits() throws InterruptedException {
331         final ScheduledExecutorService service = EasyMock
332                 .createMock(ScheduledExecutorService.class);
333         final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
334         prepareStartTimer(service, future);
335         EasyMock.replay(service, future);
336         final TimedSemaphore semaphore = new TimedSemaphore(service, PERIOD_MILLIS, UNIT,
337                 LIMIT);
338         for (int i = 0; i < LIMIT; i++) {
339             assertEquals(LIMIT - i, semaphore.getAvailablePermits(), "Wrong available count at " + i);
340             semaphore.acquire();
341         }
342         semaphore.endOfPeriod();
343         assertEquals(LIMIT, semaphore.getAvailablePermits(), "Wrong available count in new period");
344         EasyMock.verify(service, future);
345     }
346 
347     /**
348      * Tests the methods for statistics.
349      *
350      * @throws InterruptedException so we don't have to catch it
351      */
352     @Test
353     public void testGetAverageCallsPerPeriod() throws InterruptedException {
354         final ScheduledExecutorService service = EasyMock
355                 .createMock(ScheduledExecutorService.class);
356         final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
357         prepareStartTimer(service, future);
358         EasyMock.replay(service, future);
359         final TimedSemaphore semaphore = new TimedSemaphore(service, PERIOD_MILLIS, UNIT,
360                 LIMIT);
361         semaphore.acquire();
362         semaphore.endOfPeriod();
363         assertEquals(1.0, semaphore.getAverageCallsPerPeriod(), .005, "Wrong average (1)");
364         semaphore.acquire();
365         semaphore.acquire();
366         semaphore.endOfPeriod();
367         assertEquals(1.5, semaphore.getAverageCallsPerPeriod(), .005, "Wrong average (2)");
368         EasyMock.verify(service, future);
369     }
370 
371     /**
372      * Tests creating a new instance.
373      */
374     @Test
375     public void testInit() {
376         final ScheduledExecutorService service = EasyMock
377                 .createMock(ScheduledExecutorService.class);
378         EasyMock.replay(service);
379         final TimedSemaphore semaphore = new TimedSemaphore(service, PERIOD_MILLIS, UNIT,
380                 LIMIT);
381         EasyMock.verify(service);
382         assertEquals(service, semaphore.getExecutorService(), "Wrong service");
383         assertEquals(PERIOD_MILLIS, semaphore.getPeriod(), "Wrong period");
384         assertEquals(UNIT, semaphore.getUnit(), "Wrong unit");
385         assertEquals(0, semaphore.getLastAcquiresPerPeriod(), "Statistic available");
386         assertEquals(0.0, semaphore.getAverageCallsPerPeriod(), .05, "Average available");
387         assertFalse(semaphore.isShutdown(), "Already shutdown");
388         assertEquals(LIMIT, semaphore.getLimit(), "Wrong limit");
389     }
390 
391     /**
392      * Tests whether a default executor service is created if no service is
393      * provided.
394      */
395     @Test
396     public void testInitDefaultService() {
397         final TimedSemaphore semaphore = new TimedSemaphore(PERIOD_MILLIS, UNIT, LIMIT);
398         final ScheduledThreadPoolExecutor exec = (ScheduledThreadPoolExecutor) semaphore
399                 .getExecutorService();
400         assertFalse(exec.getContinueExistingPeriodicTasksAfterShutdownPolicy(), "Wrong periodic task policy");
401         assertFalse(exec.getExecuteExistingDelayedTasksAfterShutdownPolicy(), "Wrong delayed task policy");
402         assertFalse(exec.isShutdown(), "Already shutdown");
403         semaphore.shutdown();
404     }
405 
406     /**
407      * Tries to create an instance with a negative period. This should cause an
408      * exception.
409      */
410     @Test
411     public void testInitInvalidPeriod() {
412         assertThrows(IllegalArgumentException.class, () -> new TimedSemaphore(0L, UNIT, LIMIT));
413     }
414 
415     /**
416      * Tries to call acquire() after shutdown(). This should cause an exception.
417      */
418     @Test
419     public void testPassAfterShutdown() {
420         final TimedSemaphore semaphore = new TimedSemaphore(PERIOD_MILLIS, UNIT, LIMIT);
421         semaphore.shutdown();
422         assertThrows(IllegalStateException.class, semaphore::acquire);
423     }
424 
425     /**
426      * Tests multiple invocations of the shutdown() method.
427      *
428      * @throws InterruptedException so we don't have to catch it
429      */
430     @Test
431     public void testShutdownMultipleTimes() throws InterruptedException {
432         final ScheduledExecutorService service = EasyMock
433                 .createMock(ScheduledExecutorService.class);
434         final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
435         prepareStartTimer(service, future);
436         EasyMock.expect(Boolean.valueOf(future.cancel(false))).andReturn(Boolean.TRUE);
437         EasyMock.replay(service, future);
438         final TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(service,
439                 PERIOD_MILLIS, UNIT, LIMIT);
440         semaphore.acquire();
441         for (int i = 0; i < 10; i++) {
442             semaphore.shutdown();
443         }
444         EasyMock.verify(service, future);
445     }
446 
447     /**
448      * Tests the shutdown() method if the executor belongs to the semaphore. In
449      * this case it has to be shut down.
450      */
451     @Test
452     public void testShutdownOwnExecutor() {
453         final TimedSemaphore semaphore = new TimedSemaphore(PERIOD_MILLIS, UNIT, LIMIT);
454         semaphore.shutdown();
455         assertTrue(semaphore.isShutdown(), "Not shutdown");
456         assertTrue(semaphore.getExecutorService().isShutdown(), "Executor not shutdown");
457     }
458 
459     /**
460      * Tests the shutdown() method for a shared executor service before a task
461      * was started. This should do pretty much nothing.
462      */
463     @Test
464     public void testShutdownSharedExecutorNoTask() {
465         final ScheduledExecutorService service = EasyMock
466                 .createMock(ScheduledExecutorService.class);
467         EasyMock.replay(service);
468         final TimedSemaphore semaphore = new TimedSemaphore(service, PERIOD_MILLIS, UNIT,
469                 LIMIT);
470         semaphore.shutdown();
471         assertTrue(semaphore.isShutdown(), "Not shutdown");
472         EasyMock.verify(service);
473     }
474 
475     /**
476      * Tests the shutdown() method for a shared executor after the task was
477      * started. In this case the task must be canceled.
478      *
479      * @throws InterruptedException so we don't have to catch it
480      */
481     @Test
482     public void testShutdownSharedExecutorTask() throws InterruptedException {
483         final ScheduledExecutorService service = EasyMock
484                 .createMock(ScheduledExecutorService.class);
485         final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
486         prepareStartTimer(service, future);
487         EasyMock.expect(Boolean.valueOf(future.cancel(false))).andReturn(Boolean.TRUE);
488         EasyMock.replay(service, future);
489         final TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(service,
490                 PERIOD_MILLIS, UNIT, LIMIT);
491         semaphore.acquire();
492         semaphore.shutdown();
493         assertTrue(semaphore.isShutdown(), "Not shutdown");
494         EasyMock.verify(service, future);
495     }
496 
497     /**
498      * Tests starting the timer.
499      *
500      * @throws InterruptedException so we don't have to catch it
501      */
502     @Test
503     public void testStartTimer() throws InterruptedException {
504         final TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(PERIOD_MILLIS,
505                 UNIT, LIMIT);
506         final ScheduledFuture<?> future = semaphore.startTimer();
507         assertNotNull(future, "No future returned");
508         ThreadUtils.sleepQuietly(DURATION);
509         final int trials = 10;
510         int count = 0;
511         do {
512             Thread.sleep(PERIOD_MILLIS);
513             assertFalse(count++ > trials, "endOfPeriod() not called!");
514         } while (semaphore.getPeriodEnds() <= 0);
515         semaphore.shutdown();
516     }
517 
518     /**
519      * Tests the tryAcquire() method. It is checked whether the semaphore can be acquired
520      * by a bunch of threads the expected number of times and not more.
521      */
522     @Test
523     public void testTryAcquire() throws InterruptedException {
524         final TimedSemaphore semaphore = new TimedSemaphore(PERIOD_MILLIS, TimeUnit.SECONDS,
525                 LIMIT);
526         final TryAcquireThread[] threads = new TryAcquireThread[3 * LIMIT];
527         final CountDownLatch latch = new CountDownLatch(1);
528         for (int i = 0; i < threads.length; i++) {
529             threads[i] = new TryAcquireThread(semaphore, latch);
530             threads[i].start();
531         }
532 
533         latch.countDown();
534         int permits = 0;
535         for (final TryAcquireThread t : threads) {
536             t.join();
537             if (t.acquired) {
538                 permits++;
539             }
540         }
541         assertEquals(LIMIT, permits, "Wrong number of permits granted");
542     }
543 
544     /**
545      * Tries to call tryAcquire() after shutdown(). This should cause an exception.
546      */
547     @Test
548     public void testTryAcquireAfterShutdown() {
549         final TimedSemaphore semaphore = new TimedSemaphore(PERIOD_MILLIS, UNIT, LIMIT);
550         semaphore.shutdown();
551         assertThrows(IllegalStateException.class, semaphore::tryAcquire);
552     }
553 }