1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.commons.io.input;
19
20 import static org.junit.jupiter.api.Assertions.assertEquals;
21 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
22 import static org.junit.jupiter.api.Assertions.assertThrows;
23 import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
24 import static org.junit.jupiter.api.Assertions.assertTrue;
25 import static org.mockito.Mockito.spy;
26 import static org.mockito.Mockito.when;
27
28 import java.io.IOException;
29 import java.io.InputStream;
30 import java.io.InterruptedIOException;
31 import java.time.Duration;
32 import java.time.temporal.ChronoUnit;
33 import java.util.concurrent.atomic.AtomicBoolean;
34
35 import org.apache.commons.io.IOUtils;
36 import org.apache.commons.io.input.ThrottledInputStream.Builder;
37 import org.apache.commons.io.test.CustomIOException;
38 import org.junit.jupiter.api.Test;
39
40
41
42
43 class ThrottledInputStreamTest extends ProxyInputStreamTest<ThrottledInputStream> {
44
45 @Override
46 @SuppressWarnings({ "resource" })
47 protected ThrottledInputStream createFixture() throws IOException {
48 return ThrottledInputStream.builder().setInputStream(createOriginInputStream()).get();
49 }
50
51 @Test
52 void testAfterReadConsumer() throws Exception {
53 final AtomicBoolean boolRef = new AtomicBoolean();
54
55 try (InputStream bounded = ThrottledInputStream.builder()
56 .setCharSequence("Hi")
57 .setAfterRead(i -> boolRef.set(true))
58 .get()) {
59 IOUtils.consume(bounded);
60 }
61
62 assertTrue(boolRef.get());
63
64 final String message = "test exception message";
65
66 try (InputStream inputStream = ThrottledInputStream.builder()
67 .setCharSequence("Hi")
68 .setAfterRead(i -> {
69 throw new CustomIOException(message);
70 })
71 .get()) {
72 assertEquals(message, assertThrowsExactly(CustomIOException.class, () -> IOUtils.consume(inputStream)).getMessage());
73 }
74
75 }
76
77 @Test
78 void testBuilder() throws IOException {
79 final Builder builder = ThrottledInputStream.builder();
80 assertThrows(IllegalArgumentException.class, () -> builder.setMaxBytesPerSecond(-1));
81 assertThrows(IllegalArgumentException.class, () -> builder.setMaxBytesPerSecond(0));
82 assertThrows(IllegalArgumentException.class, () -> builder.setMaxBytes(1, Duration.ZERO.minusMillis(1)));
83 assertThrows(IllegalArgumentException.class, () -> builder.setMaxBytes(1, Duration.ZERO));
84 assertThrows(NullPointerException.class, () -> builder.setMaxBytes(1, (Duration) null));
85 assertThrows(NullPointerException.class, () -> builder.setMaxBytes(1, (ChronoUnit) null));
86
87
88 builder.setMaxBytesPerSecond(2);
89 assertEquals(2.0, builder.getMaxBytesPerSecond());
90
91 try (ThrottledInputStream inputStream = builder
92 .setInputStream(createOriginInputStream())
93 .get()) {
94 assertEquals(2.0, builder.getMaxBytesPerSecond());
95 assertEquals(2.0, inputStream.getMaxBytesPerSecond());
96 }
97 try (ThrottledInputStream inputStream = builder
98 .setInputStream(createOriginInputStream())
99 .setMaxBytes(2, ChronoUnit.SECONDS)
100 .get()) {
101 assertEquals(2.0, builder.getMaxBytesPerSecond());
102 assertEquals(2.0, inputStream.getMaxBytesPerSecond());
103 }
104
105 Duration maxBytesPer = Duration.ofSeconds(1);
106
107 try (ThrottledInputStream inputStream = builder
108 .setInputStream(createOriginInputStream())
109 .setMaxBytes(2, maxBytesPer)
110 .get()) {
111 assertEquals(2.0, builder.getMaxBytesPerSecond());
112 assertEquals(2.0, inputStream.getMaxBytesPerSecond());
113 }
114
115
116
117 maxBytesPer = maxBytesPer.dividedBy(2);
118
119 try (ThrottledInputStream inputStream = builder
120 .setInputStream(createOriginInputStream())
121 .setMaxBytes(1, maxBytesPer)
122 .get()) {
123 assertEquals(0.5, inputStream.getMaxBytesPerSecond());
124 }
125
126 try (ThrottledInputStream inputStream = builder
127 .setInputStream(createOriginInputStream())
128 .setMaxBytes(1, ChronoUnit.MILLIS)
129 .get()) {
130 assertEquals(0.001, inputStream.getMaxBytesPerSecond());
131 }
132
133
134 maxBytesPer = Duration.ofSeconds(20).plusMillis(11);
135
136 try (ThrottledInputStream inputStream = builder
137 .setInputStream(createOriginInputStream())
138 .setMaxBytes(1, maxBytesPer)
139 .get()) {
140 assertEquals(20.011, inputStream.getMaxBytesPerSecond());
141 }
142
143
144
145 try (ThrottledInputStream inputStream = builder
146 .setInputStream(createOriginInputStream())
147 .setMaxBytes(100_000, ChronoUnit.SECONDS)
148 .get()) {
149 assertEquals(100_000.0, inputStream.getMaxBytesPerSecond());
150 }
151
152 }
153
154 @Test
155 void testCalSleepTimeMs() {
156
157 assertEquals(0, ThrottledInputStream.toSleepMillis(0, 1_000, 10_000));
158
159 assertEquals(0, ThrottledInputStream.toSleepMillis(Long.MAX_VALUE, 1_000, 0));
160 assertEquals(0, ThrottledInputStream.toSleepMillis(Long.MAX_VALUE, 1_000, -1));
161
162 assertEquals(1500, ThrottledInputStream.toSleepMillis(5, 1_000, 2));
163 assertEquals(500, ThrottledInputStream.toSleepMillis(5, 2_000, 2));
164 assertEquals(6500, ThrottledInputStream.toSleepMillis(15, 1_000, 2));
165 assertEquals(4000, ThrottledInputStream.toSleepMillis(5, 1_000, 1));
166 assertEquals(9000, ThrottledInputStream.toSleepMillis(5, 1_000, 0.5));
167 assertEquals(99000, ThrottledInputStream.toSleepMillis(5, 1_000, 0.05));
168
169 assertEquals(0, ThrottledInputStream.toSleepMillis(1, 1_000, 2));
170 assertEquals(0, ThrottledInputStream.toSleepMillis(2, 2_000, 2));
171 assertEquals(0, ThrottledInputStream.toSleepMillis(1, 1_000, 2));
172 assertEquals(0, ThrottledInputStream.toSleepMillis(1, 1_000, 2.0));
173 assertEquals(0, ThrottledInputStream.toSleepMillis(1, 1_000, 1));
174 assertEquals(0, ThrottledInputStream.toSleepMillis(1, 1_000, 1.0));
175 }
176
177 @Test
178 void testCloseHandleIOException() throws IOException {
179 ProxyInputStreamTest.testCloseHandleIOException(ThrottledInputStream.builder());
180 }
181
182 @Override
183 protected void testEos(final ThrottledInputStream inputStream) {
184 assertEquals(3, inputStream.getByteCount());
185 }
186
187 @Test
188 void testGet() throws IOException {
189 try (ThrottledInputStream inputStream = createFixture()) {
190 inputStream.read();
191 assertEquals(Duration.ZERO, inputStream.getTotalSleepDuration());
192 }
193 }
194
195 @Test
196 synchronized void testReadInterrupt() throws IOException {
197 try (ThrottledInputStream inputStream = ThrottledInputStream.builder()
198
199 .setInputStream(createOriginInputStream())
200 .setMaxBytes(1, ChronoUnit.HOURS)
201 .get()) {
202
203 final ThrottledInputStream spy = spy(inputStream);
204 when(spy.getSleepMillis()).thenReturn(1L);
205 Thread.currentThread().interrupt();
206 assertInstanceOf(InterruptedException.class, assertThrows(InterruptedIOException.class, spy::read).getCause());
207 assertTrue(Thread.interrupted());
208 }
209 }
210
211 }