View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *      https://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.commons.pool2.impl;
18  
19  import java.time.Duration;
20  import java.time.Instant;
21  import java.util.concurrent.TimeoutException;
22  import java.util.concurrent.atomic.AtomicBoolean;
23  import java.util.function.Consumer;
24  import java.util.function.Predicate;
25  import java.util.function.Supplier;
26  
27  import org.apache.commons.pool2.PooledObject;
28  import org.apache.commons.pool2.Waiter;
29  import org.apache.commons.pool2.WaiterFactory;
30  
31  public class DisconnectingWaiterFactory<K> extends WaiterFactory<K> {
32      private static final Duration DEFAULT_TIME_BETWEEN_CONNECTION_CHECKS = Duration.ofMillis(100);
33  
34      private static final Duration DEFAULT_MAX_WAIT = Duration.ofSeconds(10);
35  
36      /**
37       * Default function to perform for activate, passivate, destroy in disconnected
38       * mode - no-op
39       */
40      protected static final Consumer<PooledObject<Waiter>> DEFAULT_DISCONNECTED_LIFECYCLE_ACTION = w -> {
41      };
42  
43      /**
44       * Default supplier determining makeObject action when invoked in disconnected
45       * mode. Default behavior is to block until reconnected for up to maxWait
46       * duration. If maxWait is exceeded, throw ISE; if reconnect happens in time,
47       * return a new DefaultPooledObject<Waiter>.
48       */
49      protected static final Supplier<PooledObject<Waiter>> DEFAULT_DISCONNECTED_CREATE_ACTION = () -> {
50          waitForConnection(null, DEFAULT_TIME_BETWEEN_CONNECTION_CHECKS, DEFAULT_MAX_WAIT);
51          return new DefaultPooledObject<>(new Waiter(true, true, 0));
52      };
53  
54      /**
55       * Default predicate determining what validate does in disconnected state -
56       * default is to always return false
57       */
58      protected static final Predicate<PooledObject<Waiter>> DEFAULT_DISCONNECTED_VALIDATION_ACTION = w -> false;
59  
60      /*
61       * Blocks until connected or maxWait is exceeded.
62       *
63       * @throws TimeoutException if maxWait is exceeded.
64       */
65      private static void waitForConnection(final AtomicBoolean connected, final Duration timeBetweenConnectionChecks, final Duration maxWait) {
66          final Instant start = Instant.now();
67          while (!connected.get()) {
68              try {
69                  Thread.sleep(timeBetweenConnectionChecks.toMillis());
70              } catch (final InterruptedException e) {
71                  e.printStackTrace();
72              }
73              if (Duration.between(start, Instant.now()).compareTo(maxWait) > 0) {
74                  throw new IllegalStateException(new TimeoutException("Timed out waiting for connection"));
75              }
76          }
77      }
78  
79      /**
80       *
81       * A WaiterFactory that simulates a resource required by factory methods going
82       * down (and coming back).
83       * <p>
84       * When connected, this factory behaves like a normal WaiterFactory.
85       * When disconnected, factory methods are determined by functional parameters.
86       * </p>
87       */
88      private final AtomicBoolean connected = new AtomicBoolean(true);
89  
90      /** Time between reconnection checks */
91      final Duration timeBetweenConnectionChecks;
92  
93      /**
94       * Maximum amount of time a factory method will wait for reconnect before
95       * throwing TimeOutException
96       */
97      final Duration maxWait;
98  
99      /** Function to perform when makeObject is executed in disconnected mode */
100     final Supplier<PooledObject<Waiter>> disconnectedCreateAction;
101 
102     /**
103      * Function to perform for activate, passsivate and destroy when invoked in
104      * disconnected mode
105      */
106     final Consumer<PooledObject<Waiter>> disconnectedLifcycleAction;
107 
108     /** Function to perform for validate when invoked in disconnected mode */
109     final Predicate<PooledObject<Waiter>> disconnectedValidationAction;
110 
111     public DisconnectingWaiterFactory() {
112         this(DEFAULT_DISCONNECTED_CREATE_ACTION, DEFAULT_DISCONNECTED_LIFECYCLE_ACTION,
113                 DEFAULT_DISCONNECTED_VALIDATION_ACTION);
114     }
115 
116     public DisconnectingWaiterFactory(final long activateLatency, final long destroyLatency,
117             final long makeLatency, final long passivateLatency, final long validateLatency,
118             final long waiterLatency) {
119         this(activateLatency, destroyLatency, makeLatency, passivateLatency,
120                 validateLatency, waiterLatency, Long.MAX_VALUE, Long.MAX_VALUE, 0);
121     }
122 
123     public DisconnectingWaiterFactory(final long activateLatency, final long destroyLatency,
124             final long makeLatency, final long passivateLatency, final long validateLatency,
125             final long waiterLatency, final long maxActive) {
126         this(activateLatency, destroyLatency, makeLatency, passivateLatency,
127                 validateLatency, waiterLatency, maxActive,
128                 Long.MAX_VALUE, 0);
129     }
130 
131     public DisconnectingWaiterFactory(final long activateLatency, final long destroyLatency,
132             final long makeLatency, final long passivateLatency, final long validateLatency,
133             final long waiterLatency, final long maxActive, final long maxActivePerKey,
134             final double passivateInvalidationProbability) {
135         super(activateLatency, destroyLatency, makeLatency, passivateLatency,
136                 validateLatency, waiterLatency, maxActive, maxActivePerKey,
137                 passivateInvalidationProbability);
138         this.timeBetweenConnectionChecks = DEFAULT_TIME_BETWEEN_CONNECTION_CHECKS;
139         this.maxWait = DEFAULT_MAX_WAIT;
140         this.disconnectedCreateAction = DEFAULT_DISCONNECTED_CREATE_ACTION;
141         this.disconnectedLifcycleAction = DEFAULT_DISCONNECTED_LIFECYCLE_ACTION;
142         this.disconnectedValidationAction = DEFAULT_DISCONNECTED_VALIDATION_ACTION;
143     }
144 
145     public DisconnectingWaiterFactory(final Supplier<PooledObject<Waiter>> disconnectedCreateAction,
146             final Consumer<PooledObject<Waiter>> disconnectedLifcycleAction,
147             final Predicate<PooledObject<Waiter>> disconnectedValidationAction) {
148         super(0, 0, 0,
149                 0, 0, 0,
150                 Integer.MAX_VALUE,
151                 Integer.MAX_VALUE,
152                 0);
153         this.timeBetweenConnectionChecks = DEFAULT_TIME_BETWEEN_CONNECTION_CHECKS;
154         this.maxWait = DEFAULT_MAX_WAIT;
155         this.disconnectedCreateAction = disconnectedCreateAction;
156         this.disconnectedLifcycleAction = disconnectedLifcycleAction;
157         this.disconnectedValidationAction = disconnectedValidationAction;
158     }
159 
160     private void activate(final PooledObject<Waiter> obj) {
161         if (connected.get()) {
162             super.activateObject(obj);
163         } else {
164             disconnectedLifcycleAction.accept(obj);
165         }
166     }
167 
168     @Override
169     public void activateObject(final K key, final PooledObject<Waiter> obj) {
170         activate(obj);
171     }
172 
173     @Override
174     public void activateObject(final PooledObject<Waiter> obj) {
175         activate(obj);
176     }
177 
178     /**
179      * Reconnect the factory.
180      */
181     public void connect() {
182         connected.set(true);
183     }
184     /*
185      * TODO: add builder to clean up constructors and make maxWait,
186      * timeBetweenConnectionChecks configurable.
187      */
188 
189     /**
190      * Disconnect the factory.
191      */
192     public void disconnect() {
193         connected.set(false);
194     }
195 
196     private PooledObject<Waiter> make() {
197         if (connected.get()) {
198             return super.makeObject();
199         }
200         return disconnectedCreateAction.get();
201     }
202 
203     @Override
204     public PooledObject<Waiter> makeObject() {
205         return make();
206     }
207 
208     @Override
209     public PooledObject<Waiter> makeObject(final K key) {
210         return make();
211     }
212 
213     private void passivate(final PooledObject<Waiter> obj) {
214         if (connected.get()) {
215             super.passivateObject(obj);
216         } else {
217             disconnectedLifcycleAction.accept(obj);
218         }
219     }
220 
221     @Override
222     public void passivateObject(final K key, final PooledObject<Waiter> obj) {
223         passivate(obj);
224     }
225 
226     @Override
227     public void passivateObject(final PooledObject<Waiter> obj) {
228         passivate(obj);
229     }
230 
231     private boolean validate(final PooledObject<Waiter> obj) {
232         if (connected.get()) {
233             return super.validateObject(obj);
234         }
235         return disconnectedValidationAction.test(obj);
236     }
237 
238     @Override
239     public boolean validateObject(final K key, final PooledObject<Waiter> obj) {
240         return validate(obj);
241     }
242 
243     @Override
244     public boolean validateObject(final PooledObject<Waiter> obj) {
245         return validate(obj);
246     }
247 
248 }