View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.commons.pool2.impl;
18  
19  import java.time.Duration;
20  import java.time.Instant;
21  import java.util.concurrent.TimeoutException;
22  import java.util.concurrent.atomic.AtomicBoolean;
23  import java.util.function.Consumer;
24  import java.util.function.Predicate;
25  import java.util.function.Supplier;
26  
27  import org.apache.commons.pool2.PooledObject;
28  import org.apache.commons.pool2.Waiter;
29  import org.apache.commons.pool2.WaiterFactory;
30  
31  public class DisconnectingWaiterFactory<K> extends WaiterFactory<K> {
32      private static final Duration DEFAULT_TIME_BETWEEN_CONNECTION_CHECKS = Duration.ofMillis(100);
33  
34      private static final Duration DEFAULT_MAX_WAIT = Duration.ofSeconds(10);
35  
36      /**
37       * Default function to perform for activate, passivate, destroy in disconnected
38       * mode - no-op
39       */
40      protected static final Consumer<PooledObject<Waiter>> DEFAULT_DISCONNECTED_LIFECYCLE_ACTION = w -> {
41      };
42  
43      /**
44       * Default supplier determining makeObject action when invoked in disconnected
45       * mode. Default behavior is to block until reconnected for up to maxWait
46       * duration. If maxWait is exceeded, throw ISE; if reconnect happens in time,
47       * return a new DefaultPooledObject<Waiter>.
48       */
49      protected static final Supplier<PooledObject<Waiter>> DEFAULT_DISCONNECTED_CREATE_ACTION = () -> {
50          waitForConnection(null, DEFAULT_TIME_BETWEEN_CONNECTION_CHECKS, DEFAULT_MAX_WAIT);
51          return new DefaultPooledObject<>(new Waiter(true, true, 0));
52      };
53  
54      /**
55       * Default predicate determining what validate does in disconnected state -
56       * default is to always return false
57       */
58      protected static final Predicate<PooledObject<Waiter>> DEFAULT_DISCONNECTED_VALIDATION_ACTION = w -> false;
59  
60      /**
61       * Blocks until connected or maxWait is exceeded.
62       * 
63       * @throws TimeoutException if maxWait is exceeded.
64       */
65      private static void waitForConnection(final AtomicBoolean connected,
66              final Duration timeBetweenConnectionChecks, final Duration maxWait) {
67          final Instant start = Instant.now();
68          while (!connected.get()) {
69              try {
70                  Thread.sleep(timeBetweenConnectionChecks.toMillis());
71              } catch (final InterruptedException e) {
72                  e.printStackTrace();
73              }
74              if (Duration.between(start, Instant.now()).compareTo(maxWait) > 0) {
75                  throw new IllegalStateException(new TimeoutException("Timed out waiting for connection"));
76              }
77          }
78      }
79  
80      /**
81       * 
82       * A WaiterFactory that simulates a resource required by factory methods going
83       * down (and coming back).
84       * <p>
85       * When connected, this factory behaves like a normal WaiterFactory.
86       * When disconnected, factory methods are determined by functional parameters.
87       * </p>
88       */
89      private final AtomicBoolean connected = new AtomicBoolean(true);
90  
91      /** Time between reconnection checks */
92      final Duration timeBetweenConnectionChecks;
93  
94      /**
95       * Maximum amount of time a factory method will wait for reconnect before
96       * throwing TimeOutException
97       */
98      final Duration maxWait;
99  
100     /** Function to perform when makeObject is executed in disconnected mode */
101     final Supplier<PooledObject<Waiter>> disconnectedCreateAction;
102 
103     /**
104      * Function to perform for activate, passsivate and destroy when invoked in
105      * disconnected mode
106      */
107     final Consumer<PooledObject<Waiter>> disconnectedLifcycleAction;
108 
109     /** Function to perform for validate when invoked in disconnected mode */
110     final Predicate<PooledObject<Waiter>> disconnectedValidationAction;
111 
112     public DisconnectingWaiterFactory() {
113         this(DEFAULT_DISCONNECTED_CREATE_ACTION, DEFAULT_DISCONNECTED_LIFECYCLE_ACTION,
114                 DEFAULT_DISCONNECTED_VALIDATION_ACTION);
115     }
116 
117     public DisconnectingWaiterFactory(final long activateLatency, final long destroyLatency,
118             final long makeLatency, final long passivateLatency, final long validateLatency,
119             final long waiterLatency) {
120         this(activateLatency, destroyLatency, makeLatency, passivateLatency,
121                 validateLatency, waiterLatency, Long.MAX_VALUE, Long.MAX_VALUE, 0);
122     }
123 
124     public DisconnectingWaiterFactory(final long activateLatency, final long destroyLatency,
125             final long makeLatency, final long passivateLatency, final long validateLatency,
126             final long waiterLatency, final long maxActive) {
127         this(activateLatency, destroyLatency, makeLatency, passivateLatency,
128                 validateLatency, waiterLatency, maxActive,
129                 Long.MAX_VALUE, 0);
130     }
131 
132     public DisconnectingWaiterFactory(final long activateLatency, final long destroyLatency,
133             final long makeLatency, final long passivateLatency, final long validateLatency,
134             final long waiterLatency, final long maxActive, final long maxActivePerKey,
135             final double passivateInvalidationProbability) {
136         super(activateLatency, destroyLatency, makeLatency, passivateLatency,
137                 validateLatency, waiterLatency, maxActive, maxActivePerKey,
138                 passivateInvalidationProbability);
139         this.timeBetweenConnectionChecks = DEFAULT_TIME_BETWEEN_CONNECTION_CHECKS;
140         this.maxWait = DEFAULT_MAX_WAIT;
141         this.disconnectedCreateAction = DEFAULT_DISCONNECTED_CREATE_ACTION;
142         this.disconnectedLifcycleAction = DEFAULT_DISCONNECTED_LIFECYCLE_ACTION;
143         this.disconnectedValidationAction = DEFAULT_DISCONNECTED_VALIDATION_ACTION;
144     }
145 
146     public DisconnectingWaiterFactory(final Supplier<PooledObject<Waiter>> disconnectedCreateAction,
147             final Consumer<PooledObject<Waiter>> disconnectedLifcycleAction,
148             final Predicate<PooledObject<Waiter>> disconnectedValidationAction) {
149         super(0, 0, 0,
150                 0, 0, 0,
151                 Integer.MAX_VALUE,
152                 Integer.MAX_VALUE,
153                 0);
154         this.timeBetweenConnectionChecks = DEFAULT_TIME_BETWEEN_CONNECTION_CHECKS;
155         this.maxWait = DEFAULT_MAX_WAIT;
156         this.disconnectedCreateAction = disconnectedCreateAction;
157         this.disconnectedLifcycleAction = disconnectedLifcycleAction;
158         this.disconnectedValidationAction = disconnectedValidationAction;
159     }
160 
161     private void activate(final PooledObject<Waiter> obj) {
162         if (connected.get()) {
163             super.activateObject(obj);
164         } else {
165             disconnectedLifcycleAction.accept(obj);
166         }
167     }
168 
169     @Override
170     public void activateObject(final K key, final PooledObject<Waiter> obj) {
171         activate(obj);
172     }
173 
174     @Override
175     public void activateObject(final PooledObject<Waiter> obj) {
176         activate(obj);
177     }
178 
179     /**
180      * Reconnect the factory.
181      */
182     public void connect() {
183         connected.set(true);
184     }
185     /*
186      * TODO: add builder to clean up constructors and make maxWait,
187      * timeBetweenConnectionChecks configurable.
188      */
189 
190     /**
191      * Disconnect the factory.
192      */
193     public void disconnect() {
194         connected.set(false);
195     }
196 
197     private PooledObject<Waiter> make() {
198         if (connected.get()) {
199             return super.makeObject();
200         }
201         return disconnectedCreateAction.get();
202     }
203 
204     @Override
205     public PooledObject<Waiter> makeObject() {
206         return make();
207     }
208 
209     @Override
210     public PooledObject<Waiter> makeObject(final K key) {
211         return make();
212     }
213 
214     private void passivate(final PooledObject<Waiter> obj) {
215         if (connected.get()) {
216             super.passivateObject(obj);
217         } else {
218             disconnectedLifcycleAction.accept(obj);
219         }
220     }
221 
222     @Override
223     public void passivateObject(final K key, final PooledObject<Waiter> obj) {
224         passivate(obj);
225     }
226 
227     @Override
228     public void passivateObject(final PooledObject<Waiter> obj) {
229         passivate(obj);
230     }
231 
232     private boolean validate(final PooledObject<Waiter> obj) {
233         if (connected.get()) {
234             return super.validateObject(obj);
235         }
236         return disconnectedValidationAction.test(obj);
237     }
238 
239     @Override
240     public boolean validateObject(final K key, final PooledObject<Waiter> obj) {
241         return validate(obj);
242     }
243 
244     @Override
245     public boolean validateObject(final PooledObject<Waiter> obj) {
246         return validate(obj);
247     }
248 
249 }