1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.commons.lang3.concurrent;
18
19 import static org.junit.jupiter.api.Assertions.assertEquals;
20 import static org.junit.jupiter.api.Assertions.assertFalse;
21 import static org.junit.jupiter.api.Assertions.assertNotNull;
22 import static org.junit.jupiter.api.Assertions.assertThrows;
23 import static org.junit.jupiter.api.Assertions.assertTrue;
24
25 import java.time.Duration;
26 import java.util.concurrent.CountDownLatch;
27 import java.util.concurrent.ScheduledExecutorService;
28 import java.util.concurrent.ScheduledFuture;
29 import java.util.concurrent.ScheduledThreadPoolExecutor;
30 import java.util.concurrent.TimeUnit;
31
32 import org.apache.commons.lang3.AbstractLangTest;
33 import org.apache.commons.lang3.ThreadUtils;
34 import org.easymock.EasyMock;
35 import org.junit.jupiter.api.Test;
36
37
38
39
40 public class TimedSemaphoreTest extends AbstractLangTest {
41
42
43
44
45
46 private static final class SemaphoreThread extends Thread {
47
48 private final TimedSemaphore semaphore;
49
50
51 private final CountDownLatch latch;
52
53
54 private final int count;
55
56
57 private final int latchCount;
58
59 SemaphoreThread(final TimedSemaphore b, final CountDownLatch l, final int c, final int lc) {
60 semaphore = b;
61 latch = l;
62 count = c;
63 latchCount = lc;
64 }
65
66
67
68
69
70
71 @Override
72 public void run() {
73 try {
74 for (int i = 0; i < count; i++) {
75 semaphore.acquire();
76
77 if (i < latchCount) {
78 latch.countDown();
79 }
80 }
81 } catch (final InterruptedException iex) {
82 Thread.currentThread().interrupt();
83 }
84 }
85 }
86
87
88
89
90
91 private static final class TimedSemaphoreTestImpl extends TimedSemaphore {
92
93 ScheduledFuture<?> schedFuture;
94
95
96 volatile CountDownLatch latch;
97
98
99 private int periodEnds;
100
101 TimedSemaphoreTestImpl(final long timePeriod, final TimeUnit timeUnit,
102 final int limit) {
103 super(timePeriod, timeUnit, limit);
104 }
105
106 TimedSemaphoreTestImpl(final ScheduledExecutorService service,
107 final long timePeriod, final TimeUnit timeUnit, final int limit) {
108 super(service, timePeriod, timeUnit, limit);
109 }
110
111
112
113
114
115
116 @Override
117 public synchronized void acquire() throws InterruptedException {
118 super.acquire();
119 if (latch != null) {
120 latch.countDown();
121 }
122 }
123
124
125
126
127 @Override
128 protected synchronized void endOfPeriod() {
129 super.endOfPeriod();
130 periodEnds++;
131 }
132
133
134
135
136
137
138 int getPeriodEnds() {
139 synchronized (this) {
140 return periodEnds;
141 }
142 }
143
144
145
146
147 @Override
148 protected ScheduledFuture<?> startTimer() {
149 return schedFuture != null ? schedFuture : super.startTimer();
150 }
151 }
152
153
154
155
156
157 private static final class TryAcquireThread extends Thread {
158
159 private final TimedSemaphore semaphore;
160
161
162 private final CountDownLatch latch;
163
164
165 private boolean acquired;
166
167 TryAcquireThread(final TimedSemaphore s, final CountDownLatch l) {
168 semaphore = s;
169 latch = l;
170 }
171
172 @Override
173 public void run() {
174 try {
175 if (latch.await(10, TimeUnit.SECONDS)) {
176 acquired = semaphore.tryAcquire();
177 }
178 } catch (final InterruptedException iex) {
179
180 }
181 }
182 }
183
184
185 private static final long PERIOD_MILLIS = 500;
186
187 private static final Duration DURATION = Duration.ofMillis(PERIOD_MILLIS);
188
189
190 private static final TimeUnit UNIT = TimeUnit.MILLISECONDS;
191
192
193 private static final int LIMIT = 10;
194
195
196
197
198
199
200
201 private void prepareStartTimer(final ScheduledExecutorService service,
202 final ScheduledFuture<?> future) {
203 service.scheduleAtFixedRate((Runnable) EasyMock.anyObject(), EasyMock
204 .eq(PERIOD_MILLIS), EasyMock.eq(PERIOD_MILLIS), EasyMock.eq(UNIT));
205 EasyMock.expectLastCall().andReturn(future);
206 }
207
208
209
210
211
212
213 @Test
214 public void testAcquireLimit() throws InterruptedException {
215 final ScheduledExecutorService service = EasyMock
216 .createMock(ScheduledExecutorService.class);
217 final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
218 prepareStartTimer(service, future);
219 EasyMock.replay(service, future);
220 final int count = 10;
221 final CountDownLatch latch = new CountDownLatch(count - 1);
222 final TimedSemaphore semaphore = new TimedSemaphore(service, PERIOD_MILLIS, UNIT, 1);
223 final SemaphoreThread t = new SemaphoreThread(semaphore, latch, count,
224 count - 1);
225 semaphore.setLimit(count - 1);
226
227
228 t.start();
229 latch.await();
230
231 assertEquals(count - 1, semaphore.getAcquireCount(), "Wrong semaphore count");
232
233
234 semaphore.endOfPeriod();
235 t.join();
236 assertEquals(1, semaphore.getAcquireCount(), "Wrong semaphore count (2)");
237 assertEquals(count - 1, semaphore.getLastAcquiresPerPeriod(), "Wrong acquire() count");
238 EasyMock.verify(service, future);
239 }
240
241
242
243
244
245
246
247
248
249 @Test
250 public void testAcquireMultiplePeriods() throws InterruptedException {
251 final int count = 1000;
252 final TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(
253 PERIOD_MILLIS / 10, TimeUnit.MILLISECONDS, 1);
254 semaphore.setLimit(count / 4);
255 final CountDownLatch latch = new CountDownLatch(count);
256 final SemaphoreThread t = new SemaphoreThread(semaphore, latch, count, count);
257 t.start();
258 latch.await();
259 semaphore.shutdown();
260 assertTrue(semaphore.getPeriodEnds() > 0, "End of period not reached");
261 }
262
263
264
265
266
267
268
269
270
271 @Test
272 public void testAcquireMultipleThreads() throws InterruptedException {
273 final ScheduledExecutorService service = EasyMock
274 .createMock(ScheduledExecutorService.class);
275 final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
276 prepareStartTimer(service, future);
277 EasyMock.replay(service, future);
278 final TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(service,
279 PERIOD_MILLIS, UNIT, 1);
280 semaphore.latch = new CountDownLatch(1);
281 final int count = 10;
282 final SemaphoreThread[] threads = new SemaphoreThread[count];
283 for (int i = 0; i < count; i++) {
284 threads[i] = new SemaphoreThread(semaphore, null, 1, 0);
285 threads[i].start();
286 }
287 for (int i = 0; i < count; i++) {
288 semaphore.latch.await();
289 assertEquals(1, semaphore.getAcquireCount(), "Wrong count");
290 semaphore.latch = new CountDownLatch(1);
291 semaphore.endOfPeriod();
292 assertEquals(1, semaphore.getLastAcquiresPerPeriod(), "Wrong acquire count");
293 }
294 for (int i = 0; i < count; i++) {
295 threads[i].join();
296 }
297 EasyMock.verify(service, future);
298 }
299
300
301
302
303
304
305
306
307 @Test
308 public void testAcquireNoLimit() throws InterruptedException {
309 final ScheduledExecutorService service = EasyMock
310 .createMock(ScheduledExecutorService.class);
311 final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
312 prepareStartTimer(service, future);
313 EasyMock.replay(service, future);
314 final TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(service,
315 PERIOD_MILLIS, UNIT, TimedSemaphore.NO_LIMIT);
316 final int count = 1000;
317 final CountDownLatch latch = new CountDownLatch(count);
318 final SemaphoreThread t = new SemaphoreThread(semaphore, latch, count, count);
319 t.start();
320 latch.await();
321 EasyMock.verify(service, future);
322 }
323
324
325
326
327
328
329 @Test
330 public void testGetAvailablePermits() throws InterruptedException {
331 final ScheduledExecutorService service = EasyMock
332 .createMock(ScheduledExecutorService.class);
333 final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
334 prepareStartTimer(service, future);
335 EasyMock.replay(service, future);
336 final TimedSemaphore semaphore = new TimedSemaphore(service, PERIOD_MILLIS, UNIT,
337 LIMIT);
338 for (int i = 0; i < LIMIT; i++) {
339 assertEquals(LIMIT - i, semaphore.getAvailablePermits(), "Wrong available count at " + i);
340 semaphore.acquire();
341 }
342 semaphore.endOfPeriod();
343 assertEquals(LIMIT, semaphore.getAvailablePermits(), "Wrong available count in new period");
344 EasyMock.verify(service, future);
345 }
346
347
348
349
350
351
352 @Test
353 public void testGetAverageCallsPerPeriod() throws InterruptedException {
354 final ScheduledExecutorService service = EasyMock
355 .createMock(ScheduledExecutorService.class);
356 final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
357 prepareStartTimer(service, future);
358 EasyMock.replay(service, future);
359 final TimedSemaphore semaphore = new TimedSemaphore(service, PERIOD_MILLIS, UNIT,
360 LIMIT);
361 semaphore.acquire();
362 semaphore.endOfPeriod();
363 assertEquals(1.0, semaphore.getAverageCallsPerPeriod(), .005, "Wrong average (1)");
364 semaphore.acquire();
365 semaphore.acquire();
366 semaphore.endOfPeriod();
367 assertEquals(1.5, semaphore.getAverageCallsPerPeriod(), .005, "Wrong average (2)");
368 EasyMock.verify(service, future);
369 }
370
371
372
373
374 @Test
375 public void testInit() {
376 final ScheduledExecutorService service = EasyMock
377 .createMock(ScheduledExecutorService.class);
378 EasyMock.replay(service);
379 final TimedSemaphore semaphore = new TimedSemaphore(service, PERIOD_MILLIS, UNIT,
380 LIMIT);
381 EasyMock.verify(service);
382 assertEquals(service, semaphore.getExecutorService(), "Wrong service");
383 assertEquals(PERIOD_MILLIS, semaphore.getPeriod(), "Wrong period");
384 assertEquals(UNIT, semaphore.getUnit(), "Wrong unit");
385 assertEquals(0, semaphore.getLastAcquiresPerPeriod(), "Statistic available");
386 assertEquals(0.0, semaphore.getAverageCallsPerPeriod(), .05, "Average available");
387 assertFalse(semaphore.isShutdown(), "Already shutdown");
388 assertEquals(LIMIT, semaphore.getLimit(), "Wrong limit");
389 }
390
391
392
393
394
395 @Test
396 public void testInitDefaultService() {
397 final TimedSemaphore semaphore = new TimedSemaphore(PERIOD_MILLIS, UNIT, LIMIT);
398 final ScheduledThreadPoolExecutor exec = (ScheduledThreadPoolExecutor) semaphore
399 .getExecutorService();
400 assertFalse(exec.getContinueExistingPeriodicTasksAfterShutdownPolicy(), "Wrong periodic task policy");
401 assertFalse(exec.getExecuteExistingDelayedTasksAfterShutdownPolicy(), "Wrong delayed task policy");
402 assertFalse(exec.isShutdown(), "Already shutdown");
403 semaphore.shutdown();
404 }
405
406
407
408
409
410 @Test
411 public void testInitInvalidPeriod() {
412 assertThrows(IllegalArgumentException.class, () -> new TimedSemaphore(0L, UNIT, LIMIT));
413 }
414
415
416
417
418 @Test
419 public void testPassAfterShutdown() {
420 final TimedSemaphore semaphore = new TimedSemaphore(PERIOD_MILLIS, UNIT, LIMIT);
421 semaphore.shutdown();
422 assertThrows(IllegalStateException.class, semaphore::acquire);
423 }
424
425
426
427
428
429
430 @Test
431 public void testShutdownMultipleTimes() throws InterruptedException {
432 final ScheduledExecutorService service = EasyMock
433 .createMock(ScheduledExecutorService.class);
434 final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
435 prepareStartTimer(service, future);
436 EasyMock.expect(Boolean.valueOf(future.cancel(false))).andReturn(Boolean.TRUE);
437 EasyMock.replay(service, future);
438 final TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(service,
439 PERIOD_MILLIS, UNIT, LIMIT);
440 semaphore.acquire();
441 for (int i = 0; i < 10; i++) {
442 semaphore.shutdown();
443 }
444 EasyMock.verify(service, future);
445 }
446
447
448
449
450
451 @Test
452 public void testShutdownOwnExecutor() {
453 final TimedSemaphore semaphore = new TimedSemaphore(PERIOD_MILLIS, UNIT, LIMIT);
454 semaphore.shutdown();
455 assertTrue(semaphore.isShutdown(), "Not shutdown");
456 assertTrue(semaphore.getExecutorService().isShutdown(), "Executor not shutdown");
457 }
458
459
460
461
462
463 @Test
464 public void testShutdownSharedExecutorNoTask() {
465 final ScheduledExecutorService service = EasyMock
466 .createMock(ScheduledExecutorService.class);
467 EasyMock.replay(service);
468 final TimedSemaphore semaphore = new TimedSemaphore(service, PERIOD_MILLIS, UNIT,
469 LIMIT);
470 semaphore.shutdown();
471 assertTrue(semaphore.isShutdown(), "Not shutdown");
472 EasyMock.verify(service);
473 }
474
475
476
477
478
479
480
481 @Test
482 public void testShutdownSharedExecutorTask() throws InterruptedException {
483 final ScheduledExecutorService service = EasyMock
484 .createMock(ScheduledExecutorService.class);
485 final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
486 prepareStartTimer(service, future);
487 EasyMock.expect(Boolean.valueOf(future.cancel(false))).andReturn(Boolean.TRUE);
488 EasyMock.replay(service, future);
489 final TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(service,
490 PERIOD_MILLIS, UNIT, LIMIT);
491 semaphore.acquire();
492 semaphore.shutdown();
493 assertTrue(semaphore.isShutdown(), "Not shutdown");
494 EasyMock.verify(service, future);
495 }
496
497
498
499
500
501
502 @Test
503 public void testStartTimer() throws InterruptedException {
504 final TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(PERIOD_MILLIS,
505 UNIT, LIMIT);
506 final ScheduledFuture<?> future = semaphore.startTimer();
507 assertNotNull(future, "No future returned");
508 ThreadUtils.sleepQuietly(DURATION);
509 final int trials = 10;
510 int count = 0;
511 do {
512 Thread.sleep(PERIOD_MILLIS);
513 assertFalse(count++ > trials, "endOfPeriod() not called!");
514 } while (semaphore.getPeriodEnds() <= 0);
515 semaphore.shutdown();
516 }
517
518
519
520
521
522 @Test
523 public void testTryAcquire() throws InterruptedException {
524 final TimedSemaphore semaphore = new TimedSemaphore(PERIOD_MILLIS, TimeUnit.SECONDS,
525 LIMIT);
526 final TryAcquireThread[] threads = new TryAcquireThread[3 * LIMIT];
527 final CountDownLatch latch = new CountDownLatch(1);
528 for (int i = 0; i < threads.length; i++) {
529 threads[i] = new TryAcquireThread(semaphore, latch);
530 threads[i].start();
531 }
532
533 latch.countDown();
534 int permits = 0;
535 for (final TryAcquireThread t : threads) {
536 t.join();
537 if (t.acquired) {
538 permits++;
539 }
540 }
541 assertEquals(LIMIT, permits, "Wrong number of permits granted");
542 }
543
544
545
546
547 @Test
548 public void testTryAcquireAfterShutdown() {
549 final TimedSemaphore semaphore = new TimedSemaphore(PERIOD_MILLIS, UNIT, LIMIT);
550 semaphore.shutdown();
551 assertThrows(IllegalStateException.class, semaphore::tryAcquire);
552 }
553 }