001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.commons.lang3.concurrent; 018 019import static org.junit.Assert.assertEquals; 020import static org.junit.Assert.assertFalse; 021import static org.junit.Assert.assertNotNull; 022import static org.junit.Assert.assertTrue; 023import static org.junit.Assert.fail; 024 025import java.util.concurrent.CountDownLatch; 026import java.util.concurrent.ScheduledExecutorService; 027import java.util.concurrent.ScheduledFuture; 028import java.util.concurrent.ScheduledThreadPoolExecutor; 029import java.util.concurrent.TimeUnit; 030 031import org.easymock.EasyMock; 032import org.junit.Test; 033 034/** 035 * Test class for TimedSemaphore. 036 * 037 * @version $Id: TimedSemaphoreTest.java 1436770 2013-01-22 07:09:45Z ggregory $ 038 */ 039public class TimedSemaphoreTest { 040 /** Constant for the time period. */ 041 private static final long PERIOD = 500; 042 043 /** Constant for the time unit. */ 044 private static final TimeUnit UNIT = TimeUnit.MILLISECONDS; 045 046 /** Constant for the default limit. */ 047 private static final int LIMIT = 10; 048 049 /** 050 * Tests creating a new instance. 051 */ 052 @Test 053 public void testInit() { 054 final ScheduledExecutorService service = EasyMock 055 .createMock(ScheduledExecutorService.class); 056 EasyMock.replay(service); 057 final TimedSemaphore semaphore = new TimedSemaphore(service, PERIOD, UNIT, 058 LIMIT); 059 EasyMock.verify(service); 060 assertEquals("Wrong service", service, semaphore.getExecutorService()); 061 assertEquals("Wrong period", PERIOD, semaphore.getPeriod()); 062 assertEquals("Wrong unit", UNIT, semaphore.getUnit()); 063 assertEquals("Statistic available", 0, semaphore 064 .getLastAcquiresPerPeriod()); 065 assertEquals("Average available", 0.0, semaphore 066 .getAverageCallsPerPeriod(), .05); 067 assertFalse("Already shutdown", semaphore.isShutdown()); 068 assertEquals("Wrong limit", LIMIT, semaphore.getLimit()); 069 } 070 071 /** 072 * Tries to create an instance with a negative period. This should cause an 073 * exception. 074 */ 075 @Test(expected = IllegalArgumentException.class) 076 public void testInitInvalidPeriod() { 077 new TimedSemaphore(0L, UNIT, LIMIT); 078 } 079 080 /** 081 * Tests whether a default executor service is created if no service is 082 * provided. 083 */ 084 @Test 085 public void testInitDefaultService() { 086 final TimedSemaphore semaphore = new TimedSemaphore(PERIOD, UNIT, LIMIT); 087 final ScheduledThreadPoolExecutor exec = (ScheduledThreadPoolExecutor) semaphore 088 .getExecutorService(); 089 assertFalse("Wrong periodic task policy", exec 090 .getContinueExistingPeriodicTasksAfterShutdownPolicy()); 091 assertFalse("Wrong delayed task policy", exec 092 .getExecuteExistingDelayedTasksAfterShutdownPolicy()); 093 assertFalse("Already shutdown", exec.isShutdown()); 094 semaphore.shutdown(); 095 } 096 097 /** 098 * Tests starting the timer. 099 */ 100 @Test 101 public void testStartTimer() throws InterruptedException { 102 final TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(PERIOD, 103 UNIT, LIMIT); 104 final ScheduledFuture<?> future = semaphore.startTimer(); 105 assertNotNull("No future returned", future); 106 Thread.sleep(PERIOD); 107 final int trials = 10; 108 int count = 0; 109 do { 110 Thread.sleep(PERIOD); 111 if (count++ > trials) { 112 fail("endOfPeriod() not called!"); 113 } 114 } while (semaphore.getPeriodEnds() <= 0); 115 semaphore.shutdown(); 116 } 117 118 /** 119 * Tests the shutdown() method if the executor belongs to the semaphore. In 120 * this case it has to be shut down. 121 */ 122 @Test 123 public void testShutdownOwnExecutor() { 124 final TimedSemaphore semaphore = new TimedSemaphore(PERIOD, UNIT, LIMIT); 125 semaphore.shutdown(); 126 assertTrue("Not shutdown", semaphore.isShutdown()); 127 assertTrue("Executor not shutdown", semaphore.getExecutorService() 128 .isShutdown()); 129 } 130 131 /** 132 * Tests the shutdown() method for a shared executor service before a task 133 * was started. This should do pretty much nothing. 134 */ 135 @Test 136 public void testShutdownSharedExecutorNoTask() { 137 final ScheduledExecutorService service = EasyMock 138 .createMock(ScheduledExecutorService.class); 139 EasyMock.replay(service); 140 final TimedSemaphore semaphore = new TimedSemaphore(service, PERIOD, UNIT, 141 LIMIT); 142 semaphore.shutdown(); 143 assertTrue("Not shutdown", semaphore.isShutdown()); 144 EasyMock.verify(service); 145 } 146 147 /** 148 * Prepares an executor service mock to expect the start of the timer. 149 * 150 * @param service the mock 151 * @param future the future 152 */ 153 private void prepareStartTimer(final ScheduledExecutorService service, 154 final ScheduledFuture<?> future) { 155 service.scheduleAtFixedRate((Runnable) EasyMock.anyObject(), EasyMock 156 .eq(PERIOD), EasyMock.eq(PERIOD), EasyMock.eq(UNIT)); 157 EasyMock.expectLastCall().andReturn(future); 158 } 159 160 /** 161 * Tests the shutdown() method for a shared executor after the task was 162 * started. In this case the task must be canceled. 163 */ 164 @Test 165 public void testShutdownSharedExecutorTask() throws InterruptedException { 166 final ScheduledExecutorService service = EasyMock 167 .createMock(ScheduledExecutorService.class); 168 final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class); 169 prepareStartTimer(service, future); 170 EasyMock.expect(Boolean.valueOf(future.cancel(false))).andReturn(Boolean.TRUE); 171 EasyMock.replay(service, future); 172 final TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(service, 173 PERIOD, UNIT, LIMIT); 174 semaphore.acquire(); 175 semaphore.shutdown(); 176 assertTrue("Not shutdown", semaphore.isShutdown()); 177 EasyMock.verify(service, future); 178 } 179 180 /** 181 * Tests multiple invocations of the shutdown() method. 182 */ 183 @Test 184 public void testShutdownMultipleTimes() throws InterruptedException { 185 final ScheduledExecutorService service = EasyMock 186 .createMock(ScheduledExecutorService.class); 187 final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class); 188 prepareStartTimer(service, future); 189 EasyMock.expect(Boolean.valueOf(future.cancel(false))).andReturn(Boolean.TRUE); 190 EasyMock.replay(service, future); 191 final TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(service, 192 PERIOD, UNIT, LIMIT); 193 semaphore.acquire(); 194 for (int i = 0; i < 10; i++) { 195 semaphore.shutdown(); 196 } 197 EasyMock.verify(service, future); 198 } 199 200 /** 201 * Tests the acquire() method if a limit is set. 202 */ 203 @Test 204 public void testAcquireLimit() throws InterruptedException { 205 final ScheduledExecutorService service = EasyMock 206 .createMock(ScheduledExecutorService.class); 207 final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class); 208 prepareStartTimer(service, future); 209 EasyMock.replay(service, future); 210 final int count = 10; 211 final CountDownLatch latch = new CountDownLatch(count - 1); 212 final TimedSemaphore semaphore = new TimedSemaphore(service, PERIOD, UNIT, 1); 213 final SemaphoreThread t = new SemaphoreThread(semaphore, latch, count, 214 count - 1); 215 semaphore.setLimit(count - 1); 216 217 // start a thread that calls the semaphore count times 218 t.start(); 219 latch.await(); 220 // now the semaphore's limit should be reached and the thread blocked 221 assertEquals("Wrong semaphore count", count - 1, semaphore 222 .getAcquireCount()); 223 224 // this wakes up the thread, it should call the semaphore once more 225 semaphore.endOfPeriod(); 226 t.join(); 227 assertEquals("Wrong semaphore count (2)", 1, semaphore 228 .getAcquireCount()); 229 assertEquals("Wrong acquire() count", count - 1, semaphore 230 .getLastAcquiresPerPeriod()); 231 EasyMock.verify(service, future); 232 } 233 234 /** 235 * Tests the acquire() method if more threads are involved than the limit. 236 * This method starts a number of threads that all invoke the semaphore. The 237 * semaphore's limit is set to 1, so in each period only a single thread can 238 * acquire the semaphore. 239 */ 240 @Test 241 public void testAcquireMultipleThreads() throws InterruptedException { 242 final ScheduledExecutorService service = EasyMock 243 .createMock(ScheduledExecutorService.class); 244 final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class); 245 prepareStartTimer(service, future); 246 EasyMock.replay(service, future); 247 final TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(service, 248 PERIOD, UNIT, 1); 249 semaphore.latch = new CountDownLatch(1); 250 final int count = 10; 251 final SemaphoreThread[] threads = new SemaphoreThread[count]; 252 for (int i = 0; i < count; i++) { 253 threads[i] = new SemaphoreThread(semaphore, null, 1, 0); 254 threads[i].start(); 255 } 256 for (int i = 0; i < count; i++) { 257 semaphore.latch.await(); 258 assertEquals("Wrong count", 1, semaphore.getAcquireCount()); 259 semaphore.latch = new CountDownLatch(1); 260 semaphore.endOfPeriod(); 261 assertEquals("Wrong acquire count", 1, semaphore 262 .getLastAcquiresPerPeriod()); 263 } 264 for (int i = 0; i < count; i++) { 265 threads[i].join(); 266 } 267 EasyMock.verify(service, future); 268 } 269 270 /** 271 * Tests the acquire() method if no limit is set. A test thread is started 272 * that calls the semaphore a large number of times. Even if the semaphore's 273 * period does not end, the thread should never block. 274 */ 275 @Test 276 public void testAcquireNoLimit() throws InterruptedException { 277 final ScheduledExecutorService service = EasyMock 278 .createMock(ScheduledExecutorService.class); 279 final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class); 280 prepareStartTimer(service, future); 281 EasyMock.replay(service, future); 282 final TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(service, 283 PERIOD, UNIT, TimedSemaphore.NO_LIMIT); 284 final int count = 1000; 285 final CountDownLatch latch = new CountDownLatch(count); 286 final SemaphoreThread t = new SemaphoreThread(semaphore, latch, count, count); 287 t.start(); 288 latch.await(); 289 EasyMock.verify(service, future); 290 } 291 292 /** 293 * Tries to call acquire() after shutdown(). This should cause an exception. 294 */ 295 @Test(expected = IllegalStateException.class) 296 public void testPassAfterShutdown() throws InterruptedException { 297 final TimedSemaphore semaphore = new TimedSemaphore(PERIOD, UNIT, LIMIT); 298 semaphore.shutdown(); 299 semaphore.acquire(); 300 } 301 302 /** 303 * Tests a bigger number of invocations that span multiple periods. The 304 * period is set to a very short time. A background thread calls the 305 * semaphore a large number of times. While it runs at last one end of a 306 * period should be reached. 307 */ 308 @Test 309 public void testAcquireMultiplePeriods() throws InterruptedException { 310 final int count = 1000; 311 final TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl( 312 PERIOD / 10, TimeUnit.MILLISECONDS, 1); 313 semaphore.setLimit(count / 4); 314 final CountDownLatch latch = new CountDownLatch(count); 315 final SemaphoreThread t = new SemaphoreThread(semaphore, latch, count, count); 316 t.start(); 317 latch.await(); 318 semaphore.shutdown(); 319 assertTrue("End of period not reached", semaphore.getPeriodEnds() > 0); 320 } 321 322 /** 323 * Tests the methods for statistics. 324 */ 325 @Test 326 public void testGetAverageCallsPerPeriod() throws InterruptedException { 327 final ScheduledExecutorService service = EasyMock 328 .createMock(ScheduledExecutorService.class); 329 final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class); 330 prepareStartTimer(service, future); 331 EasyMock.replay(service, future); 332 final TimedSemaphore semaphore = new TimedSemaphore(service, PERIOD, UNIT, 333 LIMIT); 334 semaphore.acquire(); 335 semaphore.endOfPeriod(); 336 assertEquals("Wrong average (1)", 1.0, semaphore 337 .getAverageCallsPerPeriod(), .005); 338 semaphore.acquire(); 339 semaphore.acquire(); 340 semaphore.endOfPeriod(); 341 assertEquals("Wrong average (2)", 1.5, semaphore 342 .getAverageCallsPerPeriod(), .005); 343 EasyMock.verify(service, future); 344 } 345 346 /** 347 * Tests whether the available non-blocking calls can be queried. 348 */ 349 @Test 350 public void testGetAvailablePermits() throws InterruptedException { 351 final ScheduledExecutorService service = EasyMock 352 .createMock(ScheduledExecutorService.class); 353 final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class); 354 prepareStartTimer(service, future); 355 EasyMock.replay(service, future); 356 final TimedSemaphore semaphore = new TimedSemaphore(service, PERIOD, UNIT, 357 LIMIT); 358 for (int i = 0; i < LIMIT; i++) { 359 assertEquals("Wrong available count at " + i, LIMIT - i, semaphore 360 .getAvailablePermits()); 361 semaphore.acquire(); 362 } 363 semaphore.endOfPeriod(); 364 assertEquals("Wrong available count in new period", LIMIT, semaphore 365 .getAvailablePermits()); 366 EasyMock.verify(service, future); 367 } 368 369 /** 370 * A specialized implementation of {@code TimedSemaphore} that is easier to 371 * test. 372 */ 373 private static class TimedSemaphoreTestImpl extends TimedSemaphore { 374 /** A mock scheduled future. */ 375 ScheduledFuture<?> schedFuture; 376 377 /** A latch for synchronizing with the main thread. */ 378 volatile CountDownLatch latch; 379 380 /** Counter for the endOfPeriod() invocations. */ 381 private int periodEnds; 382 383 public TimedSemaphoreTestImpl(final long timePeriod, final TimeUnit timeUnit, 384 final int limit) { 385 super(timePeriod, timeUnit, limit); 386 } 387 388 public TimedSemaphoreTestImpl(final ScheduledExecutorService service, 389 final long timePeriod, final TimeUnit timeUnit, final int limit) { 390 super(service, timePeriod, timeUnit, limit); 391 } 392 393 /** 394 * Returns the number of invocations of the endOfPeriod() method. 395 * 396 * @return the endOfPeriod() invocations 397 */ 398 public int getPeriodEnds() { 399 synchronized (this) { 400 return periodEnds; 401 } 402 } 403 404 /** 405 * Invokes the latch if one is set. 406 */ 407 @Override 408 public synchronized void acquire() throws InterruptedException { 409 super.acquire(); 410 if (latch != null) { 411 latch.countDown(); 412 } 413 } 414 415 /** 416 * Counts the number of invocations. 417 */ 418 @Override 419 protected synchronized void endOfPeriod() { 420 super.endOfPeriod(); 421 periodEnds++; 422 } 423 424 /** 425 * Either returns the mock future or calls the super method. 426 */ 427 @Override 428 protected ScheduledFuture<?> startTimer() { 429 return schedFuture != null ? schedFuture : super.startTimer(); 430 } 431 } 432 433 /** 434 * A test thread class that will be used by tests for triggering the 435 * semaphore. The thread calls the semaphore a configurable number of times. 436 * When this is done, it can notify the main thread. 437 */ 438 private static class SemaphoreThread extends Thread { 439 /** The semaphore. */ 440 private final TimedSemaphore semaphore; 441 442 /** A latch for communication with the main thread. */ 443 private final CountDownLatch latch; 444 445 /** The number of acquire() calls. */ 446 private final int count; 447 448 /** The number of invocations of the latch. */ 449 private final int latchCount; 450 451 public SemaphoreThread(final TimedSemaphore b, final CountDownLatch l, final int c, final int lc) { 452 semaphore = b; 453 latch = l; 454 count = c; 455 latchCount = lc; 456 } 457 458 /** 459 * Calls acquire() on the semaphore for the specified number of times. 460 * Optionally the latch will also be triggered to synchronize with the 461 * main test thread. 462 */ 463 @Override 464 public void run() { 465 try { 466 for (int i = 0; i < count; i++) { 467 semaphore.acquire(); 468 469 if (i < latchCount) { 470 latch.countDown(); 471 } 472 } 473 } catch (final InterruptedException iex) { 474 Thread.currentThread().interrupt(); 475 } 476 } 477 } 478}