1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.commons.lang3.concurrent;
18
19 import static org.apache.commons.lang3.LangAssertions.assertIllegalArgumentException;
20 import static org.junit.jupiter.api.Assertions.assertEquals;
21 import static org.junit.jupiter.api.Assertions.assertFalse;
22 import static org.junit.jupiter.api.Assertions.assertNotNull;
23 import static org.junit.jupiter.api.Assertions.assertThrows;
24 import static org.junit.jupiter.api.Assertions.assertTrue;
25
26 import java.time.Duration;
27 import java.util.concurrent.CountDownLatch;
28 import java.util.concurrent.ScheduledExecutorService;
29 import java.util.concurrent.ScheduledFuture;
30 import java.util.concurrent.ScheduledThreadPoolExecutor;
31 import java.util.concurrent.TimeUnit;
32
33 import org.apache.commons.lang3.AbstractLangTest;
34 import org.apache.commons.lang3.ThreadUtils;
35 import org.easymock.EasyMock;
36 import org.junit.jupiter.api.Test;
37
38
39
40
41 class TimedSemaphoreTest extends AbstractLangTest {
42
43
44
45
46
47
48 private static final class SemaphoreThread extends Thread {
49
50
51 private final TimedSemaphore semaphore;
52
53
54 private final CountDownLatch latch;
55
56
57 private final int count;
58
59
60 private final int latchCount;
61
62 SemaphoreThread(final TimedSemaphore b, final CountDownLatch l, final int c, final int lc) {
63 semaphore = b;
64 latch = l;
65 count = c;
66 latchCount = lc;
67 }
68
69
70
71
72
73
74 @Override
75 public void run() {
76 try {
77 for (int i = 0; i < count; i++) {
78 semaphore.acquire();
79
80 if (i < latchCount) {
81 latch.countDown();
82 }
83 }
84 } catch (final InterruptedException iex) {
85 Thread.currentThread().interrupt();
86 }
87 }
88 }
89
90
91
92
93
94 private static final class TimedSemaphoreTestImpl extends TimedSemaphore {
95
96
97 ScheduledFuture<?> schedFuture;
98
99
100 volatile CountDownLatch latch;
101
102
103 private int periodEnds;
104
105 TimedSemaphoreTestImpl(final long timePeriod, final TimeUnit timeUnit, final int limit) {
106 super(timePeriod, timeUnit, limit);
107 }
108
109 TimedSemaphoreTestImpl(final ScheduledExecutorService service, final long timePeriod, final TimeUnit timeUnit, final int limit) {
110 super(service, timePeriod, timeUnit, limit);
111 }
112
113
114
115
116
117
118 @Override
119 public synchronized void acquire() throws InterruptedException {
120 super.acquire();
121 if (latch != null) {
122 latch.countDown();
123 }
124 }
125
126
127
128
129 @Override
130 protected synchronized void endOfPeriod() {
131 super.endOfPeriod();
132 periodEnds++;
133 }
134
135
136
137
138
139
140 int getPeriodEnds() {
141 synchronized (this) {
142 return periodEnds;
143 }
144 }
145
146
147
148
149 @Override
150 protected ScheduledFuture<?> startTimer() {
151 return schedFuture != null ? schedFuture : super.startTimer();
152 }
153 }
154
155
156
157
158
159 private static final class TryAcquireThread extends Thread {
160
161
162 private final TimedSemaphore semaphore;
163
164
165 private final CountDownLatch latch;
166
167
168 private boolean acquired;
169
170 TryAcquireThread(final TimedSemaphore s, final CountDownLatch l) {
171 semaphore = s;
172 latch = l;
173 }
174
175 @Override
176 public void run() {
177 try {
178 if (latch.await(10, TimeUnit.SECONDS)) {
179 acquired = semaphore.tryAcquire();
180 }
181 } catch (final InterruptedException iex) {
182
183 }
184 }
185 }
186
187
188 private static final long PERIOD_MILLIS = 500;
189
190 private static final Duration DURATION = Duration.ofMillis(PERIOD_MILLIS);
191
192
193 private static final TimeUnit UNIT = TimeUnit.MILLISECONDS;
194
195
196 private static final int LIMIT = 10;
197
198
199
200
201
202
203
204 private void prepareStartTimer(final ScheduledExecutorService service,
205 final ScheduledFuture<?> future) {
206 service.scheduleAtFixedRate((Runnable) EasyMock.anyObject(), EasyMock.eq(PERIOD_MILLIS), EasyMock.eq(PERIOD_MILLIS), EasyMock.eq(UNIT));
207 EasyMock.expectLastCall().andReturn(future);
208 }
209
210
211
212
213
214
215 @Test
216 void testAcquireLimit() throws InterruptedException {
217 final ScheduledExecutorService service = EasyMock.createMock(ScheduledExecutorService.class);
218 final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
219 prepareStartTimer(service, future);
220 EasyMock.replay(service, future);
221 final int count = 10;
222 final CountDownLatch latch = new CountDownLatch(count - 1);
223 final TimedSemaphore semaphore = new TimedSemaphore(service, PERIOD_MILLIS, UNIT, 1);
224 final SemaphoreThread t = new SemaphoreThread(semaphore, latch, count, count - 1);
225 semaphore.setLimit(count - 1);
226
227 t.start();
228 latch.await();
229
230 assertEquals(count - 1, semaphore.getAcquireCount(), "Wrong semaphore count");
231
232 semaphore.endOfPeriod();
233 t.join();
234 assertEquals(1, semaphore.getAcquireCount(), "Wrong semaphore count (2)");
235 assertEquals(count - 1, semaphore.getLastAcquiresPerPeriod(), "Wrong acquire() count");
236 EasyMock.verify(service, future);
237 }
238
239
240
241
242
243
244
245
246
247 @Test
248 void testAcquireMultiplePeriods() throws InterruptedException {
249 final int count = 1000;
250 final TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(PERIOD_MILLIS / 10, TimeUnit.MILLISECONDS, 1);
251 semaphore.setLimit(count / 4);
252 final CountDownLatch latch = new CountDownLatch(count);
253 final SemaphoreThread t = new SemaphoreThread(semaphore, latch, count, count);
254 t.start();
255 latch.await();
256 semaphore.shutdown();
257 assertTrue(semaphore.getPeriodEnds() > 0, "End of period not reached");
258 }
259
260
261
262
263
264
265
266
267
268 @Test
269 void testAcquireMultipleThreads() throws InterruptedException {
270 final ScheduledExecutorService service = EasyMock.createMock(ScheduledExecutorService.class);
271 final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
272 prepareStartTimer(service, future);
273 EasyMock.replay(service, future);
274 final TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(service, PERIOD_MILLIS, UNIT, 1);
275 semaphore.latch = new CountDownLatch(1);
276 final int count = 10;
277 final SemaphoreThread[] threads = new SemaphoreThread[count];
278 for (int i = 0; i < count; i++) {
279 threads[i] = new SemaphoreThread(semaphore, null, 1, 0);
280 threads[i].start();
281 }
282 for (int i = 0; i < count; i++) {
283 semaphore.latch.await();
284 assertEquals(1, semaphore.getAcquireCount(), "Wrong count");
285 semaphore.latch = new CountDownLatch(1);
286 semaphore.endOfPeriod();
287 assertEquals(1, semaphore.getLastAcquiresPerPeriod(), "Wrong acquire count");
288 }
289 for (int i = 0; i < count; i++) {
290 threads[i].join();
291 }
292 EasyMock.verify(service, future);
293 }
294
295
296
297
298
299
300
301
302 @Test
303 void testAcquireNoLimit() throws InterruptedException {
304 final ScheduledExecutorService service = EasyMock.createMock(ScheduledExecutorService.class);
305 final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
306 prepareStartTimer(service, future);
307 EasyMock.replay(service, future);
308 final TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(service, PERIOD_MILLIS, UNIT, TimedSemaphore.NO_LIMIT);
309 final int count = 1000;
310 final CountDownLatch latch = new CountDownLatch(count);
311 final SemaphoreThread t = new SemaphoreThread(semaphore, latch, count, count);
312 t.start();
313 latch.await();
314 EasyMock.verify(service, future);
315 }
316
317
318
319
320
321
322 @Test
323 void testGetAvailablePermits() throws InterruptedException {
324 final ScheduledExecutorService service = EasyMock.createMock(ScheduledExecutorService.class);
325 final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
326 prepareStartTimer(service, future);
327 EasyMock.replay(service, future);
328 final TimedSemaphore semaphore = new TimedSemaphore(service, PERIOD_MILLIS, UNIT, LIMIT);
329 for (int i = 0; i < LIMIT; i++) {
330 assertEquals(LIMIT - i, semaphore.getAvailablePermits(), "Wrong available count at " + i);
331 semaphore.acquire();
332 }
333 semaphore.endOfPeriod();
334 assertEquals(LIMIT, semaphore.getAvailablePermits(), "Wrong available count in new period");
335 EasyMock.verify(service, future);
336 }
337
338
339
340
341
342
343 @Test
344 void testGetAverageCallsPerPeriod() throws InterruptedException {
345 final ScheduledExecutorService service = EasyMock.createMock(ScheduledExecutorService.class);
346 final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
347 prepareStartTimer(service, future);
348 EasyMock.replay(service, future);
349 final TimedSemaphore semaphore = new TimedSemaphore(service, PERIOD_MILLIS, UNIT, LIMIT);
350 semaphore.acquire();
351 semaphore.endOfPeriod();
352 assertEquals(1.0, semaphore.getAverageCallsPerPeriod(), .005, "Wrong average (1)");
353 semaphore.acquire();
354 semaphore.acquire();
355 semaphore.endOfPeriod();
356 assertEquals(1.5, semaphore.getAverageCallsPerPeriod(), .005, "Wrong average (2)");
357 EasyMock.verify(service, future);
358 }
359
360
361
362
363 @Test
364 void testInit() {
365 final ScheduledExecutorService service = EasyMock.createMock(ScheduledExecutorService.class);
366 EasyMock.replay(service);
367 final TimedSemaphore semaphore = new TimedSemaphore(service, PERIOD_MILLIS, UNIT, LIMIT);
368 EasyMock.verify(service);
369 assertEquals(service, semaphore.getExecutorService(), "Wrong service");
370 assertEquals(PERIOD_MILLIS, semaphore.getPeriod(), "Wrong period");
371 assertEquals(UNIT, semaphore.getUnit(), "Wrong unit");
372 assertEquals(0, semaphore.getLastAcquiresPerPeriod(), "Statistic available");
373 assertEquals(0.0, semaphore.getAverageCallsPerPeriod(), .05, "Average available");
374 assertFalse(semaphore.isShutdown(), "Already shutdown");
375 assertEquals(LIMIT, semaphore.getLimit(), "Wrong limit");
376 }
377
378
379
380
381
382 @Test
383 void testInitDefaultService() {
384 final TimedSemaphore semaphore = new TimedSemaphore(PERIOD_MILLIS, UNIT, LIMIT);
385 final ScheduledThreadPoolExecutor exec = (ScheduledThreadPoolExecutor) semaphore.getExecutorService();
386 assertFalse(exec.getContinueExistingPeriodicTasksAfterShutdownPolicy(), "Wrong periodic task policy");
387 assertFalse(exec.getExecuteExistingDelayedTasksAfterShutdownPolicy(), "Wrong delayed task policy");
388 assertFalse(exec.isShutdown(), "Already shutdown");
389 semaphore.shutdown();
390 }
391
392
393
394
395
396 @Test
397 void testInitInvalidPeriod() {
398 assertIllegalArgumentException(() -> new TimedSemaphore(0L, UNIT, LIMIT));
399 }
400
401
402
403
404 @Test
405 void testPassAfterShutdown() {
406 final TimedSemaphore semaphore = new TimedSemaphore(PERIOD_MILLIS, UNIT, LIMIT);
407 semaphore.shutdown();
408 assertThrows(IllegalStateException.class, semaphore::acquire);
409 }
410
411
412
413
414
415
416 @Test
417 void testShutdownMultipleTimes() throws InterruptedException {
418 final ScheduledExecutorService service = EasyMock.createMock(ScheduledExecutorService.class);
419 final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
420 prepareStartTimer(service, future);
421 EasyMock.expect(Boolean.valueOf(future.cancel(false))).andReturn(Boolean.TRUE);
422 EasyMock.replay(service, future);
423 final TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(service, PERIOD_MILLIS, UNIT, LIMIT);
424 semaphore.acquire();
425 for (int i = 0; i < 10; i++) {
426 semaphore.shutdown();
427 }
428 EasyMock.verify(service, future);
429 }
430
431
432
433
434
435 @Test
436 void testShutdownOwnExecutor() {
437 final TimedSemaphore semaphore = new TimedSemaphore(PERIOD_MILLIS, UNIT, LIMIT);
438 semaphore.shutdown();
439 assertTrue(semaphore.isShutdown(), "Not shutdown");
440 assertTrue(semaphore.getExecutorService().isShutdown(), "Executor not shutdown");
441 }
442
443
444
445
446
447 @Test
448 void testShutdownSharedExecutorNoTask() {
449 final ScheduledExecutorService service = EasyMock.createMock(ScheduledExecutorService.class);
450 EasyMock.replay(service);
451 final TimedSemaphore semaphore = new TimedSemaphore(service, PERIOD_MILLIS, UNIT, LIMIT);
452 semaphore.shutdown();
453 assertTrue(semaphore.isShutdown(), "Not shutdown");
454 EasyMock.verify(service);
455 }
456
457
458
459
460
461
462
463 @Test
464 void testShutdownSharedExecutorTask() throws InterruptedException {
465 final ScheduledExecutorService service = EasyMock.createMock(ScheduledExecutorService.class);
466 final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
467 prepareStartTimer(service, future);
468 EasyMock.expect(Boolean.valueOf(future.cancel(false))).andReturn(Boolean.TRUE);
469 EasyMock.replay(service, future);
470 final TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(service, PERIOD_MILLIS, UNIT, LIMIT);
471 semaphore.acquire();
472 semaphore.shutdown();
473 assertTrue(semaphore.isShutdown(), "Not shutdown");
474 EasyMock.verify(service, future);
475 }
476
477
478
479
480
481
482 @Test
483 void testStartTimer() throws InterruptedException {
484 final TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(PERIOD_MILLIS, UNIT, LIMIT);
485 final ScheduledFuture<?> future = semaphore.startTimer();
486 assertNotNull(future, "No future returned");
487 ThreadUtils.sleepQuietly(DURATION);
488 final int trials = 10;
489 int count = 0;
490 do {
491 Thread.sleep(PERIOD_MILLIS);
492 assertFalse(count++ > trials, "endOfPeriod() not called!");
493 } while (semaphore.getPeriodEnds() <= 0);
494 semaphore.shutdown();
495 }
496
497
498
499
500
501 @Test
502 void testTryAcquire() throws InterruptedException {
503 final TimedSemaphore semaphore = new TimedSemaphore(PERIOD_MILLIS, TimeUnit.SECONDS, LIMIT);
504 final TryAcquireThread[] threads = new TryAcquireThread[3 * LIMIT];
505 final CountDownLatch latch = new CountDownLatch(1);
506 for (int i = 0; i < threads.length; i++) {
507 threads[i] = new TryAcquireThread(semaphore, latch);
508 threads[i].start();
509 }
510 latch.countDown();
511 int permits = 0;
512 for (final TryAcquireThread t : threads) {
513 t.join();
514 if (t.acquired) {
515 permits++;
516 }
517 }
518 assertEquals(LIMIT, permits, "Wrong number of permits granted");
519 }
520
521
522
523
524 @Test
525 void testTryAcquireAfterShutdown() {
526 final TimedSemaphore semaphore = new TimedSemaphore(PERIOD_MILLIS, UNIT, LIMIT);
527 semaphore.shutdown();
528 assertThrows(IllegalStateException.class, semaphore::tryAcquire);
529 }
530 }