001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.commons.lang3.concurrent;
018
019import static org.junit.Assert.assertEquals;
020import static org.junit.Assert.assertFalse;
021import static org.junit.Assert.assertNotNull;
022import static org.junit.Assert.assertTrue;
023import static org.junit.Assert.fail;
024
025import java.util.concurrent.CountDownLatch;
026import java.util.concurrent.ScheduledExecutorService;
027import java.util.concurrent.ScheduledFuture;
028import java.util.concurrent.ScheduledThreadPoolExecutor;
029import java.util.concurrent.TimeUnit;
030
031import org.easymock.EasyMock;
032import org.junit.Test;
033
034/**
035 * Test class for TimedSemaphore.
036 *
037 * @version $Id: TimedSemaphoreTest.java 1436770 2013-01-22 07:09:45Z ggregory $
038 */
039public class TimedSemaphoreTest {
040    /** Constant for the time period. */
041    private static final long PERIOD = 500;
042
043    /** Constant for the time unit. */
044    private static final TimeUnit UNIT = TimeUnit.MILLISECONDS;
045
046    /** Constant for the default limit. */
047    private static final int LIMIT = 10;
048
049    /**
050     * Tests creating a new instance.
051     */
052    @Test
053    public void testInit() {
054        final ScheduledExecutorService service = EasyMock
055                .createMock(ScheduledExecutorService.class);
056        EasyMock.replay(service);
057        final TimedSemaphore semaphore = new TimedSemaphore(service, PERIOD, UNIT,
058                LIMIT);
059        EasyMock.verify(service);
060        assertEquals("Wrong service", service, semaphore.getExecutorService());
061        assertEquals("Wrong period", PERIOD, semaphore.getPeriod());
062        assertEquals("Wrong unit", UNIT, semaphore.getUnit());
063        assertEquals("Statistic available", 0, semaphore
064                .getLastAcquiresPerPeriod());
065        assertEquals("Average available", 0.0, semaphore
066                .getAverageCallsPerPeriod(), .05);
067        assertFalse("Already shutdown", semaphore.isShutdown());
068        assertEquals("Wrong limit", LIMIT, semaphore.getLimit());
069    }
070
071    /**
072     * Tries to create an instance with a negative period. This should cause an
073     * exception.
074     */
075    @Test(expected = IllegalArgumentException.class)
076    public void testInitInvalidPeriod() {
077        new TimedSemaphore(0L, UNIT, LIMIT);
078    }
079
080    /**
081     * Tests whether a default executor service is created if no service is
082     * provided.
083     */
084    @Test
085    public void testInitDefaultService() {
086        final TimedSemaphore semaphore = new TimedSemaphore(PERIOD, UNIT, LIMIT);
087        final ScheduledThreadPoolExecutor exec = (ScheduledThreadPoolExecutor) semaphore
088                .getExecutorService();
089        assertFalse("Wrong periodic task policy", exec
090                .getContinueExistingPeriodicTasksAfterShutdownPolicy());
091        assertFalse("Wrong delayed task policy", exec
092                .getExecuteExistingDelayedTasksAfterShutdownPolicy());
093        assertFalse("Already shutdown", exec.isShutdown());
094        semaphore.shutdown();
095    }
096
097    /**
098     * Tests starting the timer.
099     */
100    @Test
101    public void testStartTimer() throws InterruptedException {
102        final TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(PERIOD,
103                UNIT, LIMIT);
104        final ScheduledFuture<?> future = semaphore.startTimer();
105        assertNotNull("No future returned", future);
106        Thread.sleep(PERIOD);
107        final int trials = 10;
108        int count = 0;
109        do {
110            Thread.sleep(PERIOD);
111            if (count++ > trials) {
112                fail("endOfPeriod() not called!");
113            }
114        } while (semaphore.getPeriodEnds() <= 0);
115        semaphore.shutdown();
116    }
117
118    /**
119     * Tests the shutdown() method if the executor belongs to the semaphore. In
120     * this case it has to be shut down.
121     */
122    @Test
123    public void testShutdownOwnExecutor() {
124        final TimedSemaphore semaphore = new TimedSemaphore(PERIOD, UNIT, LIMIT);
125        semaphore.shutdown();
126        assertTrue("Not shutdown", semaphore.isShutdown());
127        assertTrue("Executor not shutdown", semaphore.getExecutorService()
128                .isShutdown());
129    }
130
131    /**
132     * Tests the shutdown() method for a shared executor service before a task
133     * was started. This should do pretty much nothing.
134     */
135    @Test
136    public void testShutdownSharedExecutorNoTask() {
137        final ScheduledExecutorService service = EasyMock
138                .createMock(ScheduledExecutorService.class);
139        EasyMock.replay(service);
140        final TimedSemaphore semaphore = new TimedSemaphore(service, PERIOD, UNIT,
141                LIMIT);
142        semaphore.shutdown();
143        assertTrue("Not shutdown", semaphore.isShutdown());
144        EasyMock.verify(service);
145    }
146
147    /**
148     * Prepares an executor service mock to expect the start of the timer.
149     *
150     * @param service the mock
151     * @param future the future
152     */
153    private void prepareStartTimer(final ScheduledExecutorService service,
154            final ScheduledFuture<?> future) {
155        service.scheduleAtFixedRate((Runnable) EasyMock.anyObject(), EasyMock
156                .eq(PERIOD), EasyMock.eq(PERIOD), EasyMock.eq(UNIT));
157        EasyMock.expectLastCall().andReturn(future);
158    }
159
160    /**
161     * Tests the shutdown() method for a shared executor after the task was
162     * started. In this case the task must be canceled.
163     */
164    @Test
165    public void testShutdownSharedExecutorTask() throws InterruptedException {
166        final ScheduledExecutorService service = EasyMock
167                .createMock(ScheduledExecutorService.class);
168        final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
169        prepareStartTimer(service, future);
170        EasyMock.expect(Boolean.valueOf(future.cancel(false))).andReturn(Boolean.TRUE);
171        EasyMock.replay(service, future);
172        final TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(service,
173                PERIOD, UNIT, LIMIT);
174        semaphore.acquire();
175        semaphore.shutdown();
176        assertTrue("Not shutdown", semaphore.isShutdown());
177        EasyMock.verify(service, future);
178    }
179
180    /**
181     * Tests multiple invocations of the shutdown() method.
182     */
183    @Test
184    public void testShutdownMultipleTimes() throws InterruptedException {
185        final ScheduledExecutorService service = EasyMock
186                .createMock(ScheduledExecutorService.class);
187        final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
188        prepareStartTimer(service, future);
189        EasyMock.expect(Boolean.valueOf(future.cancel(false))).andReturn(Boolean.TRUE);
190        EasyMock.replay(service, future);
191        final TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(service,
192                PERIOD, UNIT, LIMIT);
193        semaphore.acquire();
194        for (int i = 0; i < 10; i++) {
195            semaphore.shutdown();
196        }
197        EasyMock.verify(service, future);
198    }
199
200    /**
201     * Tests the acquire() method if a limit is set.
202     */
203    @Test
204    public void testAcquireLimit() throws InterruptedException {
205        final ScheduledExecutorService service = EasyMock
206                .createMock(ScheduledExecutorService.class);
207        final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
208        prepareStartTimer(service, future);
209        EasyMock.replay(service, future);
210        final int count = 10;
211        final CountDownLatch latch = new CountDownLatch(count - 1);
212        final TimedSemaphore semaphore = new TimedSemaphore(service, PERIOD, UNIT, 1);
213        final SemaphoreThread t = new SemaphoreThread(semaphore, latch, count,
214                count - 1);
215        semaphore.setLimit(count - 1);
216
217        // start a thread that calls the semaphore count times
218        t.start();
219        latch.await();
220        // now the semaphore's limit should be reached and the thread blocked
221        assertEquals("Wrong semaphore count", count - 1, semaphore
222                .getAcquireCount());
223
224        // this wakes up the thread, it should call the semaphore once more
225        semaphore.endOfPeriod();
226        t.join();
227        assertEquals("Wrong semaphore count (2)", 1, semaphore
228                .getAcquireCount());
229        assertEquals("Wrong acquire() count", count - 1, semaphore
230                .getLastAcquiresPerPeriod());
231        EasyMock.verify(service, future);
232    }
233
234    /**
235     * Tests the acquire() method if more threads are involved than the limit.
236     * This method starts a number of threads that all invoke the semaphore. The
237     * semaphore's limit is set to 1, so in each period only a single thread can
238     * acquire the semaphore.
239     */
240    @Test
241    public void testAcquireMultipleThreads() throws InterruptedException {
242        final ScheduledExecutorService service = EasyMock
243                .createMock(ScheduledExecutorService.class);
244        final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
245        prepareStartTimer(service, future);
246        EasyMock.replay(service, future);
247        final TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(service,
248                PERIOD, UNIT, 1);
249        semaphore.latch = new CountDownLatch(1);
250        final int count = 10;
251        final SemaphoreThread[] threads = new SemaphoreThread[count];
252        for (int i = 0; i < count; i++) {
253            threads[i] = new SemaphoreThread(semaphore, null, 1, 0);
254            threads[i].start();
255        }
256        for (int i = 0; i < count; i++) {
257            semaphore.latch.await();
258            assertEquals("Wrong count", 1, semaphore.getAcquireCount());
259            semaphore.latch = new CountDownLatch(1);
260            semaphore.endOfPeriod();
261            assertEquals("Wrong acquire count", 1, semaphore
262                    .getLastAcquiresPerPeriod());
263        }
264        for (int i = 0; i < count; i++) {
265            threads[i].join();
266        }
267        EasyMock.verify(service, future);
268    }
269
270    /**
271     * Tests the acquire() method if no limit is set. A test thread is started
272     * that calls the semaphore a large number of times. Even if the semaphore's
273     * period does not end, the thread should never block.
274     */
275    @Test
276    public void testAcquireNoLimit() throws InterruptedException {
277        final ScheduledExecutorService service = EasyMock
278                .createMock(ScheduledExecutorService.class);
279        final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
280        prepareStartTimer(service, future);
281        EasyMock.replay(service, future);
282        final TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(service,
283                PERIOD, UNIT, TimedSemaphore.NO_LIMIT);
284        final int count = 1000;
285        final CountDownLatch latch = new CountDownLatch(count);
286        final SemaphoreThread t = new SemaphoreThread(semaphore, latch, count, count);
287        t.start();
288        latch.await();
289        EasyMock.verify(service, future);
290    }
291
292    /**
293     * Tries to call acquire() after shutdown(). This should cause an exception.
294     */
295    @Test(expected = IllegalStateException.class)
296    public void testPassAfterShutdown() throws InterruptedException {
297        final TimedSemaphore semaphore = new TimedSemaphore(PERIOD, UNIT, LIMIT);
298        semaphore.shutdown();
299        semaphore.acquire();
300    }
301
302    /**
303     * Tests a bigger number of invocations that span multiple periods. The
304     * period is set to a very short time. A background thread calls the
305     * semaphore a large number of times. While it runs at last one end of a
306     * period should be reached.
307     */
308    @Test
309    public void testAcquireMultiplePeriods() throws InterruptedException {
310        final int count = 1000;
311        final TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(
312                PERIOD / 10, TimeUnit.MILLISECONDS, 1);
313        semaphore.setLimit(count / 4);
314        final CountDownLatch latch = new CountDownLatch(count);
315        final SemaphoreThread t = new SemaphoreThread(semaphore, latch, count, count);
316        t.start();
317        latch.await();
318        semaphore.shutdown();
319        assertTrue("End of period not reached", semaphore.getPeriodEnds() > 0);
320    }
321
322    /**
323     * Tests the methods for statistics.
324     */
325    @Test
326    public void testGetAverageCallsPerPeriod() throws InterruptedException {
327        final ScheduledExecutorService service = EasyMock
328                .createMock(ScheduledExecutorService.class);
329        final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
330        prepareStartTimer(service, future);
331        EasyMock.replay(service, future);
332        final TimedSemaphore semaphore = new TimedSemaphore(service, PERIOD, UNIT,
333                LIMIT);
334        semaphore.acquire();
335        semaphore.endOfPeriod();
336        assertEquals("Wrong average (1)", 1.0, semaphore
337                .getAverageCallsPerPeriod(), .005);
338        semaphore.acquire();
339        semaphore.acquire();
340        semaphore.endOfPeriod();
341        assertEquals("Wrong average (2)", 1.5, semaphore
342                .getAverageCallsPerPeriod(), .005);
343        EasyMock.verify(service, future);
344    }
345
346    /**
347     * Tests whether the available non-blocking calls can be queried.
348     */
349    @Test
350    public void testGetAvailablePermits() throws InterruptedException {
351        final ScheduledExecutorService service = EasyMock
352                .createMock(ScheduledExecutorService.class);
353        final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
354        prepareStartTimer(service, future);
355        EasyMock.replay(service, future);
356        final TimedSemaphore semaphore = new TimedSemaphore(service, PERIOD, UNIT,
357                LIMIT);
358        for (int i = 0; i < LIMIT; i++) {
359            assertEquals("Wrong available count at " + i, LIMIT - i, semaphore
360                    .getAvailablePermits());
361            semaphore.acquire();
362        }
363        semaphore.endOfPeriod();
364        assertEquals("Wrong available count in new period", LIMIT, semaphore
365                .getAvailablePermits());
366        EasyMock.verify(service, future);
367    }
368
369    /**
370     * A specialized implementation of {@code TimedSemaphore} that is easier to
371     * test.
372     */
373    private static class TimedSemaphoreTestImpl extends TimedSemaphore {
374        /** A mock scheduled future. */
375        ScheduledFuture<?> schedFuture;
376
377        /** A latch for synchronizing with the main thread. */
378        volatile CountDownLatch latch;
379
380        /** Counter for the endOfPeriod() invocations. */
381        private int periodEnds;
382
383        public TimedSemaphoreTestImpl(final long timePeriod, final TimeUnit timeUnit,
384                final int limit) {
385            super(timePeriod, timeUnit, limit);
386        }
387
388        public TimedSemaphoreTestImpl(final ScheduledExecutorService service,
389                final long timePeriod, final TimeUnit timeUnit, final int limit) {
390            super(service, timePeriod, timeUnit, limit);
391        }
392
393        /**
394         * Returns the number of invocations of the endOfPeriod() method.
395         *
396         * @return the endOfPeriod() invocations
397         */
398        public int getPeriodEnds() {
399            synchronized (this) {
400                return periodEnds;
401            }
402        }
403
404        /**
405         * Invokes the latch if one is set.
406         */
407        @Override
408        public synchronized void acquire() throws InterruptedException {
409            super.acquire();
410            if (latch != null) {
411                latch.countDown();
412            }
413        }
414
415        /**
416         * Counts the number of invocations.
417         */
418        @Override
419        protected synchronized void endOfPeriod() {
420            super.endOfPeriod();
421            periodEnds++;
422        }
423
424        /**
425         * Either returns the mock future or calls the super method.
426         */
427        @Override
428        protected ScheduledFuture<?> startTimer() {
429            return schedFuture != null ? schedFuture : super.startTimer();
430        }
431    }
432
433    /**
434     * A test thread class that will be used by tests for triggering the
435     * semaphore. The thread calls the semaphore a configurable number of times.
436     * When this is done, it can notify the main thread.
437     */
438    private static class SemaphoreThread extends Thread {
439        /** The semaphore. */
440        private final TimedSemaphore semaphore;
441
442        /** A latch for communication with the main thread. */
443        private final CountDownLatch latch;
444
445        /** The number of acquire() calls. */
446        private final int count;
447
448        /** The number of invocations of the latch. */
449        private final int latchCount;
450
451        public SemaphoreThread(final TimedSemaphore b, final CountDownLatch l, final int c, final int lc) {
452            semaphore = b;
453            latch = l;
454            count = c;
455            latchCount = lc;
456        }
457
458        /**
459         * Calls acquire() on the semaphore for the specified number of times.
460         * Optionally the latch will also be triggered to synchronize with the
461         * main test thread.
462         */
463        @Override
464        public void run() {
465            try {
466                for (int i = 0; i < count; i++) {
467                    semaphore.acquire();
468
469                    if (i < latchCount) {
470                        latch.countDown();
471                    }
472                }
473            } catch (final InterruptedException iex) {
474                Thread.currentThread().interrupt();
475            }
476        }
477    }
478}