View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *      https://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.commons.lang3.concurrent;
19  
20  import static org.junit.jupiter.api.Assertions.assertEquals;
21  import static org.junit.jupiter.api.Assertions.assertInstanceOf;
22  import static org.junit.jupiter.api.Assertions.assertThrows;
23  import static org.junit.jupiter.api.Assertions.assertTrue;
24  
25  import java.util.Arrays;
26  import java.util.List;
27  import java.util.concurrent.CountDownLatch;
28  import java.util.concurrent.ExecutionException;
29  import java.util.concurrent.Future;
30  import java.util.concurrent.TimeUnit;
31  import java.util.concurrent.TimeoutException;
32  import java.util.concurrent.atomic.AtomicBoolean;
33  import java.util.concurrent.atomic.AtomicReference;
34  import java.util.function.Consumer;
35  import java.util.stream.Collectors;
36  
37  import org.apache.commons.lang3.AbstractLangTest;
38  import org.apache.commons.lang3.exception.UncheckedInterruptedException;
39  import org.junit.jupiter.api.Test;
40  
41  /**
42   * Tests {@link UncheckedFuture}.
43   */
44  class UncheckedFutureTest extends AbstractLangTest {
45  
46      private static final class TestFuture<V> extends AbstractFutureProxy<V> {
47  
48          private final Exception exception;
49  
50          TestFuture(final Exception throwable) {
51              super(ConcurrentUtils.constantFuture(null));
52              this.exception = throwable;
53          }
54  
55          TestFuture(final V value) {
56              super(ConcurrentUtils.constantFuture(value));
57              this.exception = null;
58          }
59  
60          @SuppressWarnings("unchecked") // Programming error if call site blows up at runtime.
61          private <T extends Exception> void checkException() throws T {
62              if (exception != null) {
63                  throw (T) exception;
64              }
65          }
66  
67          @Override
68          public V get() throws InterruptedException, ExecutionException {
69              checkException();
70              return super.get();
71          }
72  
73          @Override
74          public V get(final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
75              checkException();
76              return super.get(timeout, unit);
77          }
78  
79      }
80  
81      private static void assertInterruptPreserved(final Consumer<UncheckedFuture<Integer>> call) throws Exception {
82          final CountDownLatch enteredGet = new CountDownLatch(1);
83          final Future<Integer> blockingFuture = new AbstractFutureProxy<Integer>(ConcurrentUtils.constantFuture(42)) {
84  
85              private final CountDownLatch neverRelease = new CountDownLatch(1);
86  
87              @Override
88              public Integer get() throws InterruptedException {
89                  enteredGet.countDown();
90                  neverRelease.await();
91                  throw new AssertionError("We should not get here");
92              }
93  
94              @Override
95              public Integer get(final long timeout, final TimeUnit unit) throws InterruptedException {
96                  enteredGet.countDown();
97                  neverRelease.await();
98                  throw new AssertionError("We should not get here");
99              }
100 
101             @Override
102             public boolean isDone() {
103                 return false;
104             }
105         };
106         final UncheckedFuture<Integer> uf = UncheckedFuture.on(blockingFuture);
107         final AtomicReference<Throwable> thrown = new AtomicReference<>();
108         final AtomicBoolean interruptObserved = new AtomicBoolean(false);
109         final Thread worker = new Thread(() -> {
110             try {
111                 call.accept(uf);
112                 thrown.set(new AssertionError("We should not get here"));
113             } catch (final Throwable e) {
114                 interruptObserved.set(Thread.currentThread().isInterrupted());
115                 thrown.set(e);
116             }
117         }, "unchecked-future-test-worker");
118         worker.start();
119         assertTrue(enteredGet.await(2, TimeUnit.SECONDS), "Worker did not enter Future.get() in time");
120         worker.interrupt();
121         worker.join();
122         final Throwable t = thrown.get();
123         assertInstanceOf(UncheckedInterruptedException.class, t, "Unexpected exception: " + t);
124         assertInstanceOf(InterruptedException.class, t.getCause(), "Cause should be InterruptedException");
125         assertTrue(interruptObserved.get(), "Interrupt flag was not restored by the wrapper");
126     }
127 
128     @Test
129     void interruptFlagIsPreservedOnGet() throws Exception {
130         assertInterruptPreserved(UncheckedFuture::get);
131     }
132 
133     @Test
134     void interruptFlagIsPreservedOnGetWithTimeout() throws Exception {
135         assertInterruptPreserved(uf -> uf.get(1, TimeUnit.DAYS));
136     }
137 
138     @Test
139     void testGetExecutionException() {
140         final ExecutionException e = new ExecutionException(new Exception());
141         assertThrows(UncheckedExecutionException.class, () -> UncheckedFuture.on(new TestFuture<>(e)).get());
142     }
143 
144     @Test
145     void testGetInterruptedException() {
146         final InterruptedException e = new InterruptedException();
147         assertThrows(UncheckedInterruptedException.class, () -> UncheckedFuture.on(new TestFuture<>(e)).get());
148     }
149 
150     @Test
151     void testGetLongExecutionException() {
152         final ExecutionException e = new ExecutionException(new Exception());
153         assertThrows(UncheckedExecutionException.class, () -> UncheckedFuture.on(new TestFuture<>(e)).get(1, TimeUnit.MICROSECONDS));
154     }
155 
156     @Test
157     void testGetLongInterruptedException() {
158         final InterruptedException e = new InterruptedException();
159         assertThrows(UncheckedInterruptedException.class, () -> UncheckedFuture.on(new TestFuture<>(e)).get(1, TimeUnit.MICROSECONDS));
160     }
161 
162     @Test
163     void testGetLongTimeoutException() {
164         final TimeoutException e = new TimeoutException();
165         assertThrows(UncheckedTimeoutException.class, () -> UncheckedFuture.on(new TestFuture<>(e)).get(1, TimeUnit.MICROSECONDS));
166     }
167 
168     @Test
169     void testMap() {
170         final List<String> expected = Arrays.asList("Y", "Z");
171         final List<Future<String>> input = Arrays.asList(new TestFuture<>("Y"), new TestFuture<>("Z"));
172         assertEquals(expected, UncheckedFuture.map(input).map(UncheckedFuture::get).collect(Collectors.toList()));
173     }
174 
175     @Test
176     void testOnCollection() {
177         final List<String> expected = Arrays.asList("Y", "Z");
178         final List<Future<String>> input = Arrays.asList(new TestFuture<>("Y"), new TestFuture<>("Z"));
179         assertEquals(expected, UncheckedFuture.on(input).stream().map(UncheckedFuture::get).collect(Collectors.toList()));
180     }
181 
182     @Test
183     void testOnFuture() {
184         assertEquals("Z", UncheckedFuture.on(new TestFuture<>("Z")).get());
185     }
186 }