1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.commons.lang3.concurrent;
18
19 import static org.apache.commons.lang3.LangAssertions.assertIllegalArgumentException;
20 import static org.junit.jupiter.api.Assertions.assertEquals;
21 import static org.junit.jupiter.api.Assertions.assertFalse;
22 import static org.junit.jupiter.api.Assertions.assertNotNull;
23 import static org.junit.jupiter.api.Assertions.assertThrows;
24 import static org.junit.jupiter.api.Assertions.assertTrue;
25
26 import java.time.Duration;
27 import java.util.concurrent.CountDownLatch;
28 import java.util.concurrent.ScheduledExecutorService;
29 import java.util.concurrent.ScheduledFuture;
30 import java.util.concurrent.ScheduledThreadPoolExecutor;
31 import java.util.concurrent.TimeUnit;
32
33 import org.apache.commons.lang3.AbstractLangTest;
34 import org.apache.commons.lang3.ThreadUtils;
35 import org.easymock.EasyMock;
36 import org.junit.jupiter.api.Test;
37
38
39
40
41 class TimedSemaphoreTest extends AbstractLangTest {
42
43
44
45
46
47 private static final class SemaphoreThread extends Thread {
48
49 private final TimedSemaphore semaphore;
50
51
52 private final CountDownLatch latch;
53
54
55 private final int count;
56
57
58 private final int latchCount;
59
60 SemaphoreThread(final TimedSemaphore b, final CountDownLatch l, final int c, final int lc) {
61 semaphore = b;
62 latch = l;
63 count = c;
64 latchCount = lc;
65 }
66
67
68
69
70
71
72 @Override
73 public void run() {
74 try {
75 for (int i = 0; i < count; i++) {
76 semaphore.acquire();
77
78 if (i < latchCount) {
79 latch.countDown();
80 }
81 }
82 } catch (final InterruptedException iex) {
83 Thread.currentThread().interrupt();
84 }
85 }
86 }
87
88
89
90
91
92 private static final class TimedSemaphoreTestImpl extends TimedSemaphore {
93
94 ScheduledFuture<?> schedFuture;
95
96
97 volatile CountDownLatch latch;
98
99
100 private int periodEnds;
101
102 TimedSemaphoreTestImpl(final long timePeriod, final TimeUnit timeUnit,
103 final int limit) {
104 super(timePeriod, timeUnit, limit);
105 }
106
107 TimedSemaphoreTestImpl(final ScheduledExecutorService service,
108 final long timePeriod, final TimeUnit timeUnit, final int limit) {
109 super(service, timePeriod, timeUnit, limit);
110 }
111
112
113
114
115
116
117 @Override
118 public synchronized void acquire() throws InterruptedException {
119 super.acquire();
120 if (latch != null) {
121 latch.countDown();
122 }
123 }
124
125
126
127
128 @Override
129 protected synchronized void endOfPeriod() {
130 super.endOfPeriod();
131 periodEnds++;
132 }
133
134
135
136
137
138
139 int getPeriodEnds() {
140 synchronized (this) {
141 return periodEnds;
142 }
143 }
144
145
146
147
148 @Override
149 protected ScheduledFuture<?> startTimer() {
150 return schedFuture != null ? schedFuture : super.startTimer();
151 }
152 }
153
154
155
156
157
158 private static final class TryAcquireThread extends Thread {
159
160 private final TimedSemaphore semaphore;
161
162
163 private final CountDownLatch latch;
164
165
166 private boolean acquired;
167
168 TryAcquireThread(final TimedSemaphore s, final CountDownLatch l) {
169 semaphore = s;
170 latch = l;
171 }
172
173 @Override
174 public void run() {
175 try {
176 if (latch.await(10, TimeUnit.SECONDS)) {
177 acquired = semaphore.tryAcquire();
178 }
179 } catch (final InterruptedException iex) {
180
181 }
182 }
183 }
184
185
186 private static final long PERIOD_MILLIS = 500;
187
188 private static final Duration DURATION = Duration.ofMillis(PERIOD_MILLIS);
189
190
191 private static final TimeUnit UNIT = TimeUnit.MILLISECONDS;
192
193
194 private static final int LIMIT = 10;
195
196
197
198
199
200
201
202 private void prepareStartTimer(final ScheduledExecutorService service,
203 final ScheduledFuture<?> future) {
204 service.scheduleAtFixedRate((Runnable) EasyMock.anyObject(), EasyMock
205 .eq(PERIOD_MILLIS), EasyMock.eq(PERIOD_MILLIS), EasyMock.eq(UNIT));
206 EasyMock.expectLastCall().andReturn(future);
207 }
208
209
210
211
212
213
214 @Test
215 void testAcquireLimit() throws InterruptedException {
216 final ScheduledExecutorService service = EasyMock
217 .createMock(ScheduledExecutorService.class);
218 final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
219 prepareStartTimer(service, future);
220 EasyMock.replay(service, future);
221 final int count = 10;
222 final CountDownLatch latch = new CountDownLatch(count - 1);
223 final TimedSemaphore semaphore = new TimedSemaphore(service, PERIOD_MILLIS, UNIT, 1);
224 final SemaphoreThread t = new SemaphoreThread(semaphore, latch, count,
225 count - 1);
226 semaphore.setLimit(count - 1);
227
228
229 t.start();
230 latch.await();
231
232 assertEquals(count - 1, semaphore.getAcquireCount(), "Wrong semaphore count");
233
234
235 semaphore.endOfPeriod();
236 t.join();
237 assertEquals(1, semaphore.getAcquireCount(), "Wrong semaphore count (2)");
238 assertEquals(count - 1, semaphore.getLastAcquiresPerPeriod(), "Wrong acquire() count");
239 EasyMock.verify(service, future);
240 }
241
242
243
244
245
246
247
248
249
250 @Test
251 void testAcquireMultiplePeriods() throws InterruptedException {
252 final int count = 1000;
253 final TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(
254 PERIOD_MILLIS / 10, TimeUnit.MILLISECONDS, 1);
255 semaphore.setLimit(count / 4);
256 final CountDownLatch latch = new CountDownLatch(count);
257 final SemaphoreThread t = new SemaphoreThread(semaphore, latch, count, count);
258 t.start();
259 latch.await();
260 semaphore.shutdown();
261 assertTrue(semaphore.getPeriodEnds() > 0, "End of period not reached");
262 }
263
264
265
266
267
268
269
270
271
272 @Test
273 void testAcquireMultipleThreads() throws InterruptedException {
274 final ScheduledExecutorService service = EasyMock
275 .createMock(ScheduledExecutorService.class);
276 final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
277 prepareStartTimer(service, future);
278 EasyMock.replay(service, future);
279 final TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(service,
280 PERIOD_MILLIS, UNIT, 1);
281 semaphore.latch = new CountDownLatch(1);
282 final int count = 10;
283 final SemaphoreThread[] threads = new SemaphoreThread[count];
284 for (int i = 0; i < count; i++) {
285 threads[i] = new SemaphoreThread(semaphore, null, 1, 0);
286 threads[i].start();
287 }
288 for (int i = 0; i < count; i++) {
289 semaphore.latch.await();
290 assertEquals(1, semaphore.getAcquireCount(), "Wrong count");
291 semaphore.latch = new CountDownLatch(1);
292 semaphore.endOfPeriod();
293 assertEquals(1, semaphore.getLastAcquiresPerPeriod(), "Wrong acquire count");
294 }
295 for (int i = 0; i < count; i++) {
296 threads[i].join();
297 }
298 EasyMock.verify(service, future);
299 }
300
301
302
303
304
305
306
307
308 @Test
309 void testAcquireNoLimit() throws InterruptedException {
310 final ScheduledExecutorService service = EasyMock
311 .createMock(ScheduledExecutorService.class);
312 final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
313 prepareStartTimer(service, future);
314 EasyMock.replay(service, future);
315 final TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(service,
316 PERIOD_MILLIS, UNIT, TimedSemaphore.NO_LIMIT);
317 final int count = 1000;
318 final CountDownLatch latch = new CountDownLatch(count);
319 final SemaphoreThread t = new SemaphoreThread(semaphore, latch, count, count);
320 t.start();
321 latch.await();
322 EasyMock.verify(service, future);
323 }
324
325
326
327
328
329
330 @Test
331 void testGetAvailablePermits() throws InterruptedException {
332 final ScheduledExecutorService service = EasyMock
333 .createMock(ScheduledExecutorService.class);
334 final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
335 prepareStartTimer(service, future);
336 EasyMock.replay(service, future);
337 final TimedSemaphore semaphore = new TimedSemaphore(service, PERIOD_MILLIS, UNIT,
338 LIMIT);
339 for (int i = 0; i < LIMIT; i++) {
340 assertEquals(LIMIT - i, semaphore.getAvailablePermits(), "Wrong available count at " + i);
341 semaphore.acquire();
342 }
343 semaphore.endOfPeriod();
344 assertEquals(LIMIT, semaphore.getAvailablePermits(), "Wrong available count in new period");
345 EasyMock.verify(service, future);
346 }
347
348
349
350
351
352
353 @Test
354 void testGetAverageCallsPerPeriod() throws InterruptedException {
355 final ScheduledExecutorService service = EasyMock
356 .createMock(ScheduledExecutorService.class);
357 final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
358 prepareStartTimer(service, future);
359 EasyMock.replay(service, future);
360 final TimedSemaphore semaphore = new TimedSemaphore(service, PERIOD_MILLIS, UNIT,
361 LIMIT);
362 semaphore.acquire();
363 semaphore.endOfPeriod();
364 assertEquals(1.0, semaphore.getAverageCallsPerPeriod(), .005, "Wrong average (1)");
365 semaphore.acquire();
366 semaphore.acquire();
367 semaphore.endOfPeriod();
368 assertEquals(1.5, semaphore.getAverageCallsPerPeriod(), .005, "Wrong average (2)");
369 EasyMock.verify(service, future);
370 }
371
372
373
374
375 @Test
376 void testInit() {
377 final ScheduledExecutorService service = EasyMock
378 .createMock(ScheduledExecutorService.class);
379 EasyMock.replay(service);
380 final TimedSemaphore semaphore = new TimedSemaphore(service, PERIOD_MILLIS, UNIT,
381 LIMIT);
382 EasyMock.verify(service);
383 assertEquals(service, semaphore.getExecutorService(), "Wrong service");
384 assertEquals(PERIOD_MILLIS, semaphore.getPeriod(), "Wrong period");
385 assertEquals(UNIT, semaphore.getUnit(), "Wrong unit");
386 assertEquals(0, semaphore.getLastAcquiresPerPeriod(), "Statistic available");
387 assertEquals(0.0, semaphore.getAverageCallsPerPeriod(), .05, "Average available");
388 assertFalse(semaphore.isShutdown(), "Already shutdown");
389 assertEquals(LIMIT, semaphore.getLimit(), "Wrong limit");
390 }
391
392
393
394
395
396 @Test
397 void testInitDefaultService() {
398 final TimedSemaphore semaphore = new TimedSemaphore(PERIOD_MILLIS, UNIT, LIMIT);
399 final ScheduledThreadPoolExecutor exec = (ScheduledThreadPoolExecutor) semaphore
400 .getExecutorService();
401 assertFalse(exec.getContinueExistingPeriodicTasksAfterShutdownPolicy(), "Wrong periodic task policy");
402 assertFalse(exec.getExecuteExistingDelayedTasksAfterShutdownPolicy(), "Wrong delayed task policy");
403 assertFalse(exec.isShutdown(), "Already shutdown");
404 semaphore.shutdown();
405 }
406
407
408
409
410
411 @Test
412 void testInitInvalidPeriod() {
413 assertIllegalArgumentException(() -> new TimedSemaphore(0L, UNIT, LIMIT));
414 }
415
416
417
418
419 @Test
420 void testPassAfterShutdown() {
421 final TimedSemaphore semaphore = new TimedSemaphore(PERIOD_MILLIS, UNIT, LIMIT);
422 semaphore.shutdown();
423 assertThrows(IllegalStateException.class, semaphore::acquire);
424 }
425
426
427
428
429
430
431 @Test
432 void testShutdownMultipleTimes() throws InterruptedException {
433 final ScheduledExecutorService service = EasyMock
434 .createMock(ScheduledExecutorService.class);
435 final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
436 prepareStartTimer(service, future);
437 EasyMock.expect(Boolean.valueOf(future.cancel(false))).andReturn(Boolean.TRUE);
438 EasyMock.replay(service, future);
439 final TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(service,
440 PERIOD_MILLIS, UNIT, LIMIT);
441 semaphore.acquire();
442 for (int i = 0; i < 10; i++) {
443 semaphore.shutdown();
444 }
445 EasyMock.verify(service, future);
446 }
447
448
449
450
451
452 @Test
453 void testShutdownOwnExecutor() {
454 final TimedSemaphore semaphore = new TimedSemaphore(PERIOD_MILLIS, UNIT, LIMIT);
455 semaphore.shutdown();
456 assertTrue(semaphore.isShutdown(), "Not shutdown");
457 assertTrue(semaphore.getExecutorService().isShutdown(), "Executor not shutdown");
458 }
459
460
461
462
463
464 @Test
465 void testShutdownSharedExecutorNoTask() {
466 final ScheduledExecutorService service = EasyMock
467 .createMock(ScheduledExecutorService.class);
468 EasyMock.replay(service);
469 final TimedSemaphore semaphore = new TimedSemaphore(service, PERIOD_MILLIS, UNIT,
470 LIMIT);
471 semaphore.shutdown();
472 assertTrue(semaphore.isShutdown(), "Not shutdown");
473 EasyMock.verify(service);
474 }
475
476
477
478
479
480
481
482 @Test
483 void testShutdownSharedExecutorTask() throws InterruptedException {
484 final ScheduledExecutorService service = EasyMock
485 .createMock(ScheduledExecutorService.class);
486 final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
487 prepareStartTimer(service, future);
488 EasyMock.expect(Boolean.valueOf(future.cancel(false))).andReturn(Boolean.TRUE);
489 EasyMock.replay(service, future);
490 final TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(service,
491 PERIOD_MILLIS, UNIT, LIMIT);
492 semaphore.acquire();
493 semaphore.shutdown();
494 assertTrue(semaphore.isShutdown(), "Not shutdown");
495 EasyMock.verify(service, future);
496 }
497
498
499
500
501
502
503 @Test
504 void testStartTimer() throws InterruptedException {
505 final TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(PERIOD_MILLIS,
506 UNIT, LIMIT);
507 final ScheduledFuture<?> future = semaphore.startTimer();
508 assertNotNull(future, "No future returned");
509 ThreadUtils.sleepQuietly(DURATION);
510 final int trials = 10;
511 int count = 0;
512 do {
513 Thread.sleep(PERIOD_MILLIS);
514 assertFalse(count++ > trials, "endOfPeriod() not called!");
515 } while (semaphore.getPeriodEnds() <= 0);
516 semaphore.shutdown();
517 }
518
519
520
521
522
523 @Test
524 void testTryAcquire() throws InterruptedException {
525 final TimedSemaphore semaphore = new TimedSemaphore(PERIOD_MILLIS, TimeUnit.SECONDS,
526 LIMIT);
527 final TryAcquireThread[] threads = new TryAcquireThread[3 * LIMIT];
528 final CountDownLatch latch = new CountDownLatch(1);
529 for (int i = 0; i < threads.length; i++) {
530 threads[i] = new TryAcquireThread(semaphore, latch);
531 threads[i].start();
532 }
533
534 latch.countDown();
535 int permits = 0;
536 for (final TryAcquireThread t : threads) {
537 t.join();
538 if (t.acquired) {
539 permits++;
540 }
541 }
542 assertEquals(LIMIT, permits, "Wrong number of permits granted");
543 }
544
545
546
547
548 @Test
549 void testTryAcquireAfterShutdown() {
550 final TimedSemaphore semaphore = new TimedSemaphore(PERIOD_MILLIS, UNIT, LIMIT);
551 semaphore.shutdown();
552 assertThrows(IllegalStateException.class, semaphore::tryAcquire);
553 }
554 }