1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.commons.pool2.impl;
18
19 import java.time.Duration;
20 import java.time.Instant;
21 import java.util.concurrent.TimeoutException;
22 import java.util.concurrent.atomic.AtomicBoolean;
23 import java.util.function.Consumer;
24 import java.util.function.Predicate;
25 import java.util.function.Supplier;
26
27 import org.apache.commons.pool2.PooledObject;
28 import org.apache.commons.pool2.Waiter;
29 import org.apache.commons.pool2.WaiterFactory;
30
31 public class DisconnectingWaiterFactory<K> extends WaiterFactory<K> {
32 private static final Duration DEFAULT_TIME_BETWEEN_CONNECTION_CHECKS = Duration.ofMillis(100);
33
34 private static final Duration DEFAULT_MAX_WAIT = Duration.ofSeconds(10);
35
36
37
38
39
40 protected static final Consumer<PooledObject<Waiter>> DEFAULT_DISCONNECTED_LIFECYCLE_ACTION = w -> {
41 };
42
43
44
45
46
47
48
49 protected static final Supplier<PooledObject<Waiter>> DEFAULT_DISCONNECTED_CREATE_ACTION = () -> {
50 waitForConnection(null, DEFAULT_TIME_BETWEEN_CONNECTION_CHECKS, DEFAULT_MAX_WAIT);
51 return new DefaultPooledObject<>(new Waiter(true, true, 0));
52 };
53
54
55
56
57
58 protected static final Predicate<PooledObject<Waiter>> DEFAULT_DISCONNECTED_VALIDATION_ACTION = w -> false;
59
60
61
62
63
64
65 private static void waitForConnection(final AtomicBoolean connected,
66 final Duration timeBetweenConnectionChecks, final Duration maxWait) {
67 final Instant start = Instant.now();
68 while (!connected.get()) {
69 try {
70 Thread.sleep(timeBetweenConnectionChecks.toMillis());
71 } catch (final InterruptedException e) {
72 e.printStackTrace();
73 }
74 if (Duration.between(start, Instant.now()).compareTo(maxWait) > 0) {
75 throw new IllegalStateException(new TimeoutException("Timed out waiting for connection"));
76 }
77 }
78 }
79
80
81
82
83
84
85
86
87
88
89 private final AtomicBoolean connected = new AtomicBoolean(true);
90
91
92 final Duration timeBetweenConnectionChecks;
93
94
95
96
97
98 final Duration maxWait;
99
100
101 final Supplier<PooledObject<Waiter>> disconnectedCreateAction;
102
103
104
105
106
107 final Consumer<PooledObject<Waiter>> disconnectedLifcycleAction;
108
109
110 final Predicate<PooledObject<Waiter>> disconnectedValidationAction;
111
112 public DisconnectingWaiterFactory() {
113 this(DEFAULT_DISCONNECTED_CREATE_ACTION, DEFAULT_DISCONNECTED_LIFECYCLE_ACTION,
114 DEFAULT_DISCONNECTED_VALIDATION_ACTION);
115 }
116
117 public DisconnectingWaiterFactory(final long activateLatency, final long destroyLatency,
118 final long makeLatency, final long passivateLatency, final long validateLatency,
119 final long waiterLatency) {
120 this(activateLatency, destroyLatency, makeLatency, passivateLatency,
121 validateLatency, waiterLatency, Long.MAX_VALUE, Long.MAX_VALUE, 0);
122 }
123
124 public DisconnectingWaiterFactory(final long activateLatency, final long destroyLatency,
125 final long makeLatency, final long passivateLatency, final long validateLatency,
126 final long waiterLatency, final long maxActive) {
127 this(activateLatency, destroyLatency, makeLatency, passivateLatency,
128 validateLatency, waiterLatency, maxActive,
129 Long.MAX_VALUE, 0);
130 }
131
132 public DisconnectingWaiterFactory(final long activateLatency, final long destroyLatency,
133 final long makeLatency, final long passivateLatency, final long validateLatency,
134 final long waiterLatency, final long maxActive, final long maxActivePerKey,
135 final double passivateInvalidationProbability) {
136 super(activateLatency, destroyLatency, makeLatency, passivateLatency,
137 validateLatency, waiterLatency, maxActive, maxActivePerKey,
138 passivateInvalidationProbability);
139 this.timeBetweenConnectionChecks = DEFAULT_TIME_BETWEEN_CONNECTION_CHECKS;
140 this.maxWait = DEFAULT_MAX_WAIT;
141 this.disconnectedCreateAction = DEFAULT_DISCONNECTED_CREATE_ACTION;
142 this.disconnectedLifcycleAction = DEFAULT_DISCONNECTED_LIFECYCLE_ACTION;
143 this.disconnectedValidationAction = DEFAULT_DISCONNECTED_VALIDATION_ACTION;
144 }
145
146 public DisconnectingWaiterFactory(final Supplier<PooledObject<Waiter>> disconnectedCreateAction,
147 final Consumer<PooledObject<Waiter>> disconnectedLifcycleAction,
148 final Predicate<PooledObject<Waiter>> disconnectedValidationAction) {
149 super(0, 0, 0,
150 0, 0, 0,
151 Integer.MAX_VALUE,
152 Integer.MAX_VALUE,
153 0);
154 this.timeBetweenConnectionChecks = DEFAULT_TIME_BETWEEN_CONNECTION_CHECKS;
155 this.maxWait = DEFAULT_MAX_WAIT;
156 this.disconnectedCreateAction = disconnectedCreateAction;
157 this.disconnectedLifcycleAction = disconnectedLifcycleAction;
158 this.disconnectedValidationAction = disconnectedValidationAction;
159 }
160
161 private void activate(final PooledObject<Waiter> obj) {
162 if (connected.get()) {
163 super.activateObject(obj);
164 } else {
165 disconnectedLifcycleAction.accept(obj);
166 }
167 }
168
169 @Override
170 public void activateObject(final K key, final PooledObject<Waiter> obj) {
171 activate(obj);
172 }
173
174 @Override
175 public void activateObject(final PooledObject<Waiter> obj) {
176 activate(obj);
177 }
178
179
180
181
182 public void connect() {
183 connected.set(true);
184 }
185
186
187
188
189
190
191
192
193 public void disconnect() {
194 connected.set(false);
195 }
196
197 private PooledObject<Waiter> make() {
198 if (connected.get()) {
199 return super.makeObject();
200 }
201 return disconnectedCreateAction.get();
202 }
203
204 @Override
205 public PooledObject<Waiter> makeObject() {
206 return make();
207 }
208
209 @Override
210 public PooledObject<Waiter> makeObject(final K key) {
211 return make();
212 }
213
214 private void passivate(final PooledObject<Waiter> obj) {
215 if (connected.get()) {
216 super.passivateObject(obj);
217 } else {
218 disconnectedLifcycleAction.accept(obj);
219 }
220 }
221
222 @Override
223 public void passivateObject(final K key, final PooledObject<Waiter> obj) {
224 passivate(obj);
225 }
226
227 @Override
228 public void passivateObject(final PooledObject<Waiter> obj) {
229 passivate(obj);
230 }
231
232 private boolean validate(final PooledObject<Waiter> obj) {
233 if (connected.get()) {
234 return super.validateObject(obj);
235 }
236 return disconnectedValidationAction.test(obj);
237 }
238
239 @Override
240 public boolean validateObject(final K key, final PooledObject<Waiter> obj) {
241 return validate(obj);
242 }
243
244 @Override
245 public boolean validateObject(final PooledObject<Waiter> obj) {
246 return validate(obj);
247 }
248
249 }