View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.commons.dbutils;
18  
19  import java.sql.Connection;
20  import java.sql.PreparedStatement;
21  import java.sql.ResultSet;
22  import java.sql.SQLException;
23  import java.util.concurrent.Callable;
24  import java.util.concurrent.ExecutorService;
25  import java.util.concurrent.Future;
26  
27  import javax.sql.DataSource;
28  
29  /**
30   * Executes SQL queries with pluggable strategies for handling
31   * {@code ResultSet}s.  This class is thread safe.
32   *
33   * @see ResultSetHandler
34   * @since 1.4
35   */
36  public class AsyncQueryRunner extends AbstractQueryRunner {
37  
38      /**
39       * @deprecated No longer used by this class. Will be removed in a future version.
40       * Class that encapsulates the continuation for batch calls.
41       */
42      @Deprecated
43      protected class BatchCallableStatement implements Callable<int[]> {
44          private final String sql;
45          private final Object[][] params;
46          private final Connection conn;
47          private final boolean closeConn;
48          private final PreparedStatement ps;
49  
50          /**
51           * Creates a new BatchCallableStatement instance.
52           *
53           * @param sql The SQL statement to execute.
54           * @param params An array of query replacement parameters.  Each row in
55           *        this array is one set of batch replacement values.
56           * @param conn The connection to use for the batch call.
57           * @param closeConn True if the connection should be closed, false otherwise.
58           * @param ps The {@link PreparedStatement} to be executed.
59           */
60          public BatchCallableStatement(final String sql, final Object[][] params, final Connection conn, final boolean closeConn, final PreparedStatement ps) {
61              this.sql = sql;
62              this.params = params.clone();
63              this.conn = conn;
64              this.closeConn = closeConn;
65              this.ps = ps;
66          }
67  
68          /**
69           * The actual call to executeBatch.
70           *
71           * @return an array of update counts containing one element for each command in the batch.
72           * @throws SQLException if a database access error occurs or one of the commands sent to the database fails.
73           * @see PreparedStatement#executeBatch()
74           */
75          @Override
76          public int[] call() throws SQLException {
77              int[] ret = null;
78  
79              try {
80                  ret = ps.executeBatch();
81              } catch (final SQLException e) {
82                  rethrow(e, sql, (Object[])params);
83              } finally {
84                  close(ps);
85                  if (closeConn) {
86                      close(conn);
87                  }
88              }
89  
90              return ret;
91          }
92      }
93      /**
94       * Class that encapsulates the continuation for query calls.
95       * @param <T> The type of the result from the call to handle.
96       */
97      protected class QueryCallableStatement<T> implements Callable<T> {
98          private final String sql;
99          private final Object[] params;
100         private final Connection conn;
101         private final boolean closeConn;
102         private final PreparedStatement ps;
103         private final ResultSetHandler<T> rsh;
104 
105         /**
106          * Creates a new {@code QueryCallableStatement} instance.
107          *
108          * @param conn The connection to use for the batch call.
109          * @param closeConn True if the connection should be closed, false otherwise.
110          * @param ps The {@link PreparedStatement} to be executed.
111          * @param rsh The handler that converts the results into an object.
112          * @param sql The SQL statement to execute.
113          * @param params An array of query replacement parameters.  Each row in
114          *        this array is one set of batch replacement values.
115          */
116         public QueryCallableStatement(final Connection conn, final boolean closeConn, final PreparedStatement ps,
117                 final ResultSetHandler<T> rsh, final String sql, final Object... params) {
118             this.sql = sql;
119             this.params = params;
120             this.conn = conn;
121             this.closeConn = closeConn;
122             this.ps = ps;
123             this.rsh = rsh;
124         }
125 
126         /**
127          * The actual call to {@code handle()} method.
128          *
129          * @return an array of update counts containing one element for each command in the batch.
130          * @throws SQLException if a database access error occurs.
131          * @see ResultSetHandler#handle(ResultSet)
132          */
133         @Override
134         public T call() throws SQLException {
135             ResultSet resultSet = null;
136             T ret = null;
137 
138             try {
139                 resultSet = wrap(ps.executeQuery());
140                 ret = rsh.handle(resultSet);
141             } catch (final SQLException e) {
142                 rethrow(e, sql, params);
143             } finally {
144                 try {
145                     close(resultSet);
146                 } finally {
147                     close(ps);
148                     if (closeConn) {
149                         close(conn);
150                     }
151                 }
152             }
153 
154             return ret;
155         }
156 
157     }
158 
159     /**
160      * @deprecated No longer used by this class. Will be removed in a future version.
161      * Class that encapsulates the continuation for update calls.
162      */
163     @Deprecated
164     protected class UpdateCallableStatement implements Callable<Integer> {
165         private final String sql;
166         private final Object[] params;
167         private final Connection conn;
168         private final boolean closeConn;
169         private final PreparedStatement ps;
170 
171         /**
172          *
173          *
174          * @param conn The connection to use for the batch call.
175          * @param closeConn True if the connection should be closed, false otherwise.
176          * @param ps The {@link PreparedStatement} to be executed.
177          * @param sql The SQL statement to execute.
178          * @param params An array of query replacement parameters.  Each row in
179          *        this array is one set of batch replacement values.
180          */
181         public UpdateCallableStatement(final Connection conn, final boolean closeConn, final PreparedStatement ps, final String sql, final Object... params) {
182             this.sql = sql;
183             this.params = params;
184             this.conn = conn;
185             this.closeConn = closeConn;
186             this.ps = ps;
187         }
188 
189         /**
190          * The actual call to {@code executeUpdate()} method.
191          *
192          * @return either (1) the row count for SQL Data Manipulation Language (DML) statements or
193          *                (2) 0 for SQL statements that return nothing
194          * @throws SQLException if a database access error occurs.
195          * @see PreparedStatement#executeUpdate()
196          */
197         @Override
198         public Integer call() throws SQLException {
199             int rows = 0;
200 
201             try {
202                 rows = ps.executeUpdate();
203             } catch (final SQLException e) {
204                 rethrow(e, sql, params);
205             } finally {
206                 close(ps);
207                 if (closeConn) {
208                     close(conn);
209                 }
210             }
211 
212             return Integer.valueOf(rows);
213         }
214 
215     }
216 
217     private final ExecutorService executorService;
218 
219     private final QueryRunner queryRunner;
220 
221     /**
222      * @deprecated Use {@link #AsyncQueryRunner(ExecutorService, QueryRunner)} instead.
223      * Constructor for AsyncQueryRunner that controls the use of {@code ParameterMetaData}.
224      *
225      * @param pmdKnownBroken Some drivers don't support {@link java.sql.ParameterMetaData#getParameterType(int) };
226      * if {@code pmdKnownBroken} is set to true, we won't even try it; if false, we'll try it,
227      * and if it breaks, we'll remember not to use it again.
228      * @param executorService the {@code ExecutorService} instance used to run JDBC invocations concurrently.
229      */
230     @Deprecated
231     public AsyncQueryRunner(final boolean pmdKnownBroken, final ExecutorService executorService) {
232         this(null, pmdKnownBroken, executorService);
233     }
234 
235     /**
236      * @deprecated Use {@link #AsyncQueryRunner(ExecutorService, QueryRunner)} instead.
237      * Constructor for AsyncQueryRunner that take a {@code DataSource} and controls the use of {@code ParameterMetaData}.
238      * Methods that do not take a {@code Connection} parameter will retrieve connections from this
239      * {@code DataSource}.
240      *
241      * @param ds The {@code DataSource} to retrieve connections from.
242      * @param pmdKnownBroken Some drivers don't support {@link java.sql.ParameterMetaData#getParameterType(int) };
243      * if {@code pmdKnownBroken} is set to true, we won't even try it; if false, we'll try it,
244      * and if it breaks, we'll remember not to use it again.
245      * @param executorService the {@code ExecutorService} instance used to run JDBC invocations concurrently.
246      */
247     @Deprecated
248     public AsyncQueryRunner(final DataSource ds, final boolean pmdKnownBroken, final ExecutorService executorService) {
249         super(ds, pmdKnownBroken);
250         this.executorService = executorService;
251         this.queryRunner = new QueryRunner(ds, pmdKnownBroken);
252     }
253 
254     /**
255      * @deprecated Use {@link #AsyncQueryRunner(ExecutorService, QueryRunner)} instead.
256      * Constructor for AsyncQueryRunner that takes a {@code DataSource}.
257      *
258      * Methods that do not take a {@code Connection} parameter will retrieve connections from this
259      * {@code DataSource}.
260      *
261      * @param ds The {@code DataSource} to retrieve connections from.
262      * @param executorService the {@code ExecutorService} instance used to run JDBC invocations concurrently.
263      */
264     @Deprecated
265     public AsyncQueryRunner(final DataSource ds, final ExecutorService executorService) {
266         this(ds, false, executorService);
267     }
268 
269     /**
270      * Constructor for AsyncQueryRunner.
271      *
272      * @param executorService the {@code ExecutorService} instance used to run JDBC invocations concurrently.
273      */
274     public AsyncQueryRunner(final ExecutorService executorService) {
275         this(null, false, executorService);
276     }
277 
278     /**
279      * Constructor for AsyncQueryRunner which uses a provided ExecutorService and underlying QueryRunner.
280      *
281      * @param executorService the {@code ExecutorService} instance used to run JDBC invocations concurrently.
282      * @param queryRunner the {@code QueryRunner} instance to use for the queries.
283      * @since 1.5
284      */
285     public AsyncQueryRunner(final ExecutorService executorService, final QueryRunner queryRunner) {
286         this.executorService = executorService;
287         this.queryRunner = queryRunner;
288     }
289 
290     /**
291      * Execute a batch of SQL INSERT, UPDATE, or DELETE queries.
292      *
293      * @param conn The {@code Connection} to use to run the query.  The caller is
294      * responsible for closing this Connection.
295      * @param sql The SQL to execute.
296      * @param params An array of query replacement parameters.  Each row in
297      * this array is one set of batch replacement values.
298      * @return A {@code Future} which returns the number of rows updated per statement.
299      * @throws SQLException if a database access error occurs
300      */
301     public Future<int[]> batch(final Connection conn, final String sql, final Object[][] params) throws SQLException {
302         return executorService.submit(() -> queryRunner.batch(conn, sql, params));
303     }
304 
305     /**
306      * Execute a batch of SQL INSERT, UPDATE, or DELETE queries.  The
307      * {@code Connection} is retrieved from the {@code DataSource}
308      * set in the constructor.  This {@code Connection} must be in
309      * auto-commit mode or the update will not be saved.
310      *
311      * @param sql The SQL to execute.
312      * @param params An array of query replacement parameters.  Each row in
313      * this array is one set of batch replacement values.
314      * @return A {@code Future} which returns the number of rows updated per statement.
315      * @throws SQLException if a database access error occurs
316      */
317     public Future<int[]> batch(final String sql, final Object[][] params) throws SQLException {
318         return executorService.submit(() -> queryRunner.batch(sql, params));
319     }
320 
321     /**
322      * Executes {@link QueryRunner#insert(Connection, String, ResultSetHandler)} asynchronously.
323      *
324      * @param <T> Return type expected
325      * @param conn {@link Connection} to use to execute the SQL statement
326      * @param sql SQL insert statement to execute
327      * @param rsh {@link ResultSetHandler} for handling the results
328      * @return {@link Future} that executes a query runner insert
329      * @see QueryRunner#insert(Connection, String, ResultSetHandler)
330      * @throws SQLException if a database access error occurs
331      * @since 1.6
332      */
333     public <T> Future<T> insert(final Connection conn, final String sql, final ResultSetHandler<T> rsh) throws SQLException {
334         return executorService.submit(() -> queryRunner.insert(conn, sql, rsh));
335     }
336 
337     /**
338      * Executes {@link QueryRunner#insert(Connection, String, ResultSetHandler, Object...)} asynchronously.
339      *
340      * @param <T> Return type expected
341      * @param conn {@link Connection} to use to execute the SQL statement
342      * @param sql SQL insert statement to execute
343      * @param rsh {@link ResultSetHandler} for handling the results
344      * @param params Parameter values for substitution in the SQL statement
345      * @return {@link Future} that executes a query runner insert
346      * @see QueryRunner#insert(Connection, String, ResultSetHandler, Object...)
347      * @throws SQLException if a database access error occurs
348      * @since 1.6
349      */
350     public <T> Future<T> insert(final Connection conn, final String sql, final ResultSetHandler<T> rsh, final Object... params) throws SQLException {
351         return executorService.submit(() -> queryRunner.insert(conn, sql, rsh, params));
352     }
353 
354     /**
355      * Executes {@link QueryRunner#insert(String, ResultSetHandler)} asynchronously.
356      *
357      * @param <T> Return type expected
358      * @param sql SQL insert statement to execute
359      * @param rsh {@link ResultSetHandler} for handling the results
360      * @return {@link Future} that executes a query runner insert
361      * @see QueryRunner#insert(String, ResultSetHandler)
362      * @throws SQLException if a database access error occurs
363      * @since 1.6
364      */
365     public <T> Future<T> insert(final String sql, final ResultSetHandler<T> rsh) throws SQLException {
366         return executorService.submit(() -> queryRunner.insert(sql, rsh));
367     }
368 
369     /**
370      * Executes {@link QueryRunner#insert(String, ResultSetHandler, Object...)} asynchronously.
371      *
372      * @param <T> Return type expected
373      * @param sql SQL insert statement to execute
374      * @param rsh {@link ResultSetHandler} for handling the results
375      * @param params Parameter values for substitution in the SQL statement
376      * @return {@link Future} that executes a query runner insert
377      * @see QueryRunner#insert(String, ResultSetHandler, Object...)
378      * @throws SQLException if a database access error occurs
379      * @since 1.6
380      */
381     public <T> Future<T> insert(final String sql, final ResultSetHandler<T> rsh, final Object... params) throws SQLException {
382         return executorService.submit(() -> queryRunner.insert(sql, rsh, params));
383     }
384 
385     /**
386      * {@link QueryRunner#insertBatch(Connection, String, ResultSetHandler, Object[][])} asynchronously.
387      *
388      * @param <T> Return type expected
389      * @param conn {@link Connection} to use to execute the SQL statement
390      * @param sql SQL insert statement to execute
391      * @param rsh {@link ResultSetHandler} for handling the results
392      * @param params An array of query replacement parameters.  Each row in
393      *        this array is one set of batch replacement values.
394      * @return {@link Future} that executes a query runner batch insert
395      * @see QueryRunner#insertBatch(Connection, String, ResultSetHandler, Object[][])
396      * @throws SQLException if a database access error occurs
397      * @since 1.6
398      */
399     public <T> Future<T> insertBatch(final Connection conn, final String sql, final ResultSetHandler<T> rsh, final Object[][] params) throws SQLException {
400         return executorService.submit(() -> queryRunner.insertBatch(conn, sql, rsh, params));
401     }
402 
403     /**
404      * {@link QueryRunner#insertBatch(String, ResultSetHandler, Object[][])} asynchronously.
405      *
406      * @param <T> Return type expected
407      * @param sql SQL insert statement to execute
408      * @param rsh {@link ResultSetHandler} for handling the results
409      * @param params An array of query replacement parameters.  Each row in
410      *        this array is one set of batch replacement values.
411      * @return {@link Future} that executes a query runner batch insert
412      * @see QueryRunner#insertBatch(String, ResultSetHandler, Object[][])
413      * @throws SQLException if a database access error occurs
414      * @since 1.6
415      */
416     public <T> Future<T> insertBatch(final String sql, final ResultSetHandler<T> rsh, final Object[][] params) throws SQLException {
417         return executorService.submit(() -> queryRunner.insertBatch(sql, rsh, params));
418     }
419 
420     /**
421      * Execute an SQL SELECT query without any replacement parameters.  The
422      * caller is responsible for closing the connection.
423      * @param <T> The type of object that the handler returns
424      * @param conn The connection to execute the query in.
425      * @param sql The query to execute.
426      * @param rsh The handler that converts the results into an object.
427      * @return A {@code Future} which returns the result of the query call.
428      * @throws SQLException if a database access error occurs
429      */
430     public <T> Future<T> query(final Connection conn, final String sql, final ResultSetHandler<T> rsh) throws SQLException {
431         return executorService.submit(() -> queryRunner.query(conn, sql, rsh));
432     }
433 
434     /**
435      * Execute an SQL SELECT query with replacement parameters.  The
436      * caller is responsible for closing the connection.
437      * @param <T> The type of object that the handler returns
438      * @param conn The connection to execute the query in.
439      * @param sql The query to execute.
440      * @param rsh The handler that converts the results into an object.
441      * @param params The replacement parameters.
442      * @return A {@code Future} which returns the result of the query call.
443      * @throws SQLException if a database access error occurs
444      */
445     public <T> Future<T> query(final Connection conn, final String sql, final ResultSetHandler<T> rsh, final Object... params)
446             throws SQLException {
447         return executorService.submit(() -> queryRunner.query(conn, sql, rsh, params));
448     }
449 
450     /**
451      * Executes the given SELECT SQL without any replacement parameters.
452      * The {@code Connection} is retrieved from the
453      * {@code DataSource} set in the constructor.
454      * @param <T> The type of object that the handler returns
455      * @param sql The SQL statement to execute.
456      * @param rsh The handler used to create the result object from
457      * the {@code ResultSet}.
458      *
459      * @return A {@code Future} which returns the result of the query call.
460      * @throws SQLException if a database access error occurs
461      */
462     public <T> Future<T> query(final String sql, final ResultSetHandler<T> rsh) throws SQLException {
463         return executorService.submit(() -> queryRunner.query(sql, rsh));
464     }
465 
466     /**
467      * Executes the given SELECT SQL query and returns a result object.
468      * The {@code Connection} is retrieved from the
469      * {@code DataSource} set in the constructor.
470      * @param <T> The type of object that the handler returns
471      * @param sql The SQL statement to execute.
472      * @param rsh The handler used to create the result object from
473      * the {@code ResultSet}.
474      * @param params Initialize the PreparedStatement's IN parameters with
475      * this array.
476      * @return A {@code Future} which returns the result of the query call.
477      * @throws SQLException if a database access error occurs
478      */
479     public <T> Future<T> query(final String sql, final ResultSetHandler<T> rsh, final Object... params) throws SQLException {
480         return executorService.submit(() -> queryRunner.query(sql, rsh, params));
481     }
482 
483     /**
484      * Execute an SQL INSERT, UPDATE, or DELETE query without replacement
485      * parameters.
486      *
487      * @param conn The connection to use to run the query.
488      * @param sql The SQL to execute.
489      * @return A {@code Future} which returns the number of rows updated.
490      * @throws SQLException if a database access error occurs
491      */
492     public Future<Integer> update(final Connection conn, final String sql) throws SQLException {
493         return executorService.submit(() -> Integer.valueOf(queryRunner.update(conn, sql)));
494     }
495 
496     /**
497      * Execute an SQL INSERT, UPDATE, or DELETE query with a single replacement
498      * parameter.
499      *
500      * @param conn The connection to use to run the query.
501      * @param sql The SQL to execute.
502      * @param param The replacement parameter.
503      * @return A {@code Future} which returns the number of rows updated.
504      * @throws SQLException if a database access error occurs
505      */
506     public Future<Integer> update(final Connection conn, final String sql, final Object param) throws SQLException {
507         return executorService.submit(() -> Integer.valueOf(queryRunner.update(conn, sql, param)));
508     }
509 
510     /**
511      * Execute an SQL INSERT, UPDATE, or DELETE query.
512      *
513      * @param conn The connection to use to run the query.
514      * @param sql The SQL to execute.
515      * @param params The query replacement parameters.
516      * @return A {@code Future} which returns the number of rows updated.
517      * @throws SQLException if a database access error occurs
518      */
519     public Future<Integer> update(final Connection conn, final String sql, final Object... params) throws SQLException {
520         return executorService.submit(() -> Integer.valueOf(queryRunner.update(conn, sql, params)));
521     }
522 
523     /**
524      * Executes the given INSERT, UPDATE, or DELETE SQL statement without
525      * any replacement parameters. The {@code Connection} is retrieved
526      * from the {@code DataSource} set in the constructor.  This
527      * {@code Connection} must be in auto-commit mode or the update will
528      * not be saved.
529      *
530      * @param sql The SQL statement to execute.
531      * @throws SQLException if a database access error occurs
532      * @return A {@code Future} which returns the number of rows updated.
533      */
534     public Future<Integer> update(final String sql) throws SQLException {
535         return executorService.submit(() -> Integer.valueOf(queryRunner.update(sql)));
536     }
537 
538     /**
539      * Executes the given INSERT, UPDATE, or DELETE SQL statement with
540      * a single replacement parameter.  The {@code Connection} is
541      * retrieved from the {@code DataSource} set in the constructor.
542      * This {@code Connection} must be in auto-commit mode or the
543      * update will not be saved.
544      *
545      * @param sql The SQL statement to execute.
546      * @param param The replacement parameter.
547      * @throws SQLException if a database access error occurs
548      * @return A {@code Future} which returns the number of rows updated.
549      */
550     public Future<Integer> update(final String sql, final Object param) throws SQLException {
551         return executorService.submit(() -> Integer.valueOf(queryRunner.update(sql, param)));
552     }
553 
554     /**
555      * Executes the given INSERT, UPDATE, or DELETE SQL statement.  The
556      * {@code Connection} is retrieved from the {@code DataSource}
557      * set in the constructor.  This {@code Connection} must be in
558      * auto-commit mode or the update will not be saved.
559      *
560      * @param sql The SQL statement to execute.
561      * @param params Initializes the PreparedStatement's IN (i.e. '?')
562      * parameters.
563      * @throws SQLException if a database access error occurs
564      * @return A {@code Future} which returns the number of rows updated.
565      */
566     public Future<Integer> update(final String sql, final Object... params) throws SQLException {
567         return executorService.submit(() -> Integer.valueOf(queryRunner.update(sql, params)));
568     }
569 
570 }