001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.commons.dbutils;
018
019import java.sql.Connection;
020import java.sql.PreparedStatement;
021import java.sql.ResultSet;
022import java.sql.SQLException;
023import java.util.concurrent.Callable;
024import java.util.concurrent.ExecutorService;
025import java.util.concurrent.Future;
026
027import javax.sql.DataSource;
028
029/**
030 * Executes SQL queries with pluggable strategies for handling
031 * {@code ResultSet}s.  This class is thread safe.
032 *
033 * @see ResultSetHandler
034 * @since 1.4
035 */
036public class AsyncQueryRunner extends AbstractQueryRunner {
037
038    /**
039     * @deprecated No longer used by this class. Will be removed in a future version.
040     * Class that encapsulates the continuation for batch calls.
041     */
042    @Deprecated
043    protected class BatchCallableStatement implements Callable<int[]> {
044        private final String sql;
045        private final Object[][] params;
046        private final Connection conn;
047        private final boolean closeConn;
048        private final PreparedStatement ps;
049
050        /**
051         * Creates a new BatchCallableStatement instance.
052         *
053         * @param sql The SQL statement to execute.
054         * @param params An array of query replacement parameters.  Each row in
055         *        this array is one set of batch replacement values.
056         * @param conn The connection to use for the batch call.
057         * @param closeConn True if the connection should be closed, false otherwise.
058         * @param ps The {@link PreparedStatement} to be executed.
059         */
060        public BatchCallableStatement(final String sql, final Object[][] params, final Connection conn, final boolean closeConn, final PreparedStatement ps) {
061            this.sql = sql;
062            this.params = params.clone();
063            this.conn = conn;
064            this.closeConn = closeConn;
065            this.ps = ps;
066        }
067
068        /**
069         * The actual call to executeBatch.
070         *
071         * @return an array of update counts containing one element for each command in the batch.
072         * @throws SQLException if a database access error occurs or one of the commands sent to the database fails.
073         * @see PreparedStatement#executeBatch()
074         */
075        @Override
076        public int[] call() throws SQLException {
077            int[] ret = null;
078
079            try {
080                ret = ps.executeBatch();
081            } catch (final SQLException e) {
082                rethrow(e, sql, (Object[])params);
083            } finally {
084                close(ps);
085                if (closeConn) {
086                    close(conn);
087                }
088            }
089
090            return ret;
091        }
092    }
093    /**
094     * Class that encapsulates the continuation for query calls.
095     * @param <T> The type of the result from the call to handle.
096     */
097    protected class QueryCallableStatement<T> implements Callable<T> {
098        private final String sql;
099        private final Object[] params;
100        private final Connection conn;
101        private final boolean closeConn;
102        private final PreparedStatement ps;
103        private final ResultSetHandler<T> rsh;
104
105        /**
106         * Creates a new {@code QueryCallableStatement} instance.
107         *
108         * @param conn The connection to use for the batch call.
109         * @param closeConn True if the connection should be closed, false otherwise.
110         * @param ps The {@link PreparedStatement} to be executed.
111         * @param rsh The handler that converts the results into an object.
112         * @param sql The SQL statement to execute.
113         * @param params An array of query replacement parameters.  Each row in
114         *        this array is one set of batch replacement values.
115         */
116        public QueryCallableStatement(final Connection conn, final boolean closeConn, final PreparedStatement ps,
117                final ResultSetHandler<T> rsh, final String sql, final Object... params) {
118            this.sql = sql;
119            this.params = params;
120            this.conn = conn;
121            this.closeConn = closeConn;
122            this.ps = ps;
123            this.rsh = rsh;
124        }
125
126        /**
127         * The actual call to {@code handle()} method.
128         *
129         * @return an array of update counts containing one element for each command in the batch.
130         * @throws SQLException if a database access error occurs.
131         * @see ResultSetHandler#handle(ResultSet)
132         */
133        @Override
134        public T call() throws SQLException {
135            ResultSet resultSet = null;
136            T ret = null;
137
138            try {
139                resultSet = wrap(ps.executeQuery());
140                ret = rsh.handle(resultSet);
141            } catch (final SQLException e) {
142                rethrow(e, sql, params);
143            } finally {
144                try {
145                    close(resultSet);
146                } finally {
147                    close(ps);
148                    if (closeConn) {
149                        close(conn);
150                    }
151                }
152            }
153
154            return ret;
155        }
156
157    }
158
159    /**
160     * @deprecated No longer used by this class. Will be removed in a future version.
161     * Class that encapsulates the continuation for update calls.
162     */
163    @Deprecated
164    protected class UpdateCallableStatement implements Callable<Integer> {
165        private final String sql;
166        private final Object[] params;
167        private final Connection conn;
168        private final boolean closeConn;
169        private final PreparedStatement ps;
170
171        /**
172         *
173         *
174         * @param conn The connection to use for the batch call.
175         * @param closeConn True if the connection should be closed, false otherwise.
176         * @param ps The {@link PreparedStatement} to be executed.
177         * @param sql The SQL statement to execute.
178         * @param params An array of query replacement parameters.  Each row in
179         *        this array is one set of batch replacement values.
180         */
181        public UpdateCallableStatement(final Connection conn, final boolean closeConn, final PreparedStatement ps, final String sql, final Object... params) {
182            this.sql = sql;
183            this.params = params;
184            this.conn = conn;
185            this.closeConn = closeConn;
186            this.ps = ps;
187        }
188
189        /**
190         * The actual call to {@code executeUpdate()} method.
191         *
192         * @return either (1) the row count for SQL Data Manipulation Language (DML) statements or
193         *                (2) 0 for SQL statements that return nothing
194         * @throws SQLException if a database access error occurs.
195         * @see PreparedStatement#executeUpdate()
196         */
197        @Override
198        public Integer call() throws SQLException {
199            int rows = 0;
200
201            try {
202                rows = ps.executeUpdate();
203            } catch (final SQLException e) {
204                rethrow(e, sql, params);
205            } finally {
206                close(ps);
207                if (closeConn) {
208                    close(conn);
209                }
210            }
211
212            return Integer.valueOf(rows);
213        }
214
215    }
216
217    private final ExecutorService executorService;
218
219    private final QueryRunner queryRunner;
220
221    /**
222     * @deprecated Use {@link #AsyncQueryRunner(ExecutorService, QueryRunner)} instead.
223     * Constructor for AsyncQueryRunner that controls the use of {@code ParameterMetaData}.
224     *
225     * @param pmdKnownBroken Some drivers don't support {@link java.sql.ParameterMetaData#getParameterType(int) };
226     * if {@code pmdKnownBroken} is set to true, we won't even try it; if false, we'll try it,
227     * and if it breaks, we'll remember not to use it again.
228     * @param executorService the {@code ExecutorService} instance used to run JDBC invocations concurrently.
229     */
230    @Deprecated
231    public AsyncQueryRunner(final boolean pmdKnownBroken, final ExecutorService executorService) {
232        this(null, pmdKnownBroken, executorService);
233    }
234
235    /**
236     * @deprecated Use {@link #AsyncQueryRunner(ExecutorService, QueryRunner)} instead.
237     * Constructor for AsyncQueryRunner that take a {@code DataSource} and controls the use of {@code ParameterMetaData}.
238     * Methods that do not take a {@code Connection} parameter will retrieve connections from this
239     * {@code DataSource}.
240     *
241     * @param ds The {@code DataSource} to retrieve connections from.
242     * @param pmdKnownBroken Some drivers don't support {@link java.sql.ParameterMetaData#getParameterType(int) };
243     * if {@code pmdKnownBroken} is set to true, we won't even try it; if false, we'll try it,
244     * and if it breaks, we'll remember not to use it again.
245     * @param executorService the {@code ExecutorService} instance used to run JDBC invocations concurrently.
246     */
247    @Deprecated
248    public AsyncQueryRunner(final DataSource ds, final boolean pmdKnownBroken, final ExecutorService executorService) {
249        super(ds, pmdKnownBroken);
250        this.executorService = executorService;
251        this.queryRunner = new QueryRunner(ds, pmdKnownBroken);
252    }
253
254    /**
255     * @deprecated Use {@link #AsyncQueryRunner(ExecutorService, QueryRunner)} instead.
256     * Constructor for AsyncQueryRunner that takes a {@code DataSource}.
257     *
258     * Methods that do not take a {@code Connection} parameter will retrieve connections from this
259     * {@code DataSource}.
260     *
261     * @param ds The {@code DataSource} to retrieve connections from.
262     * @param executorService the {@code ExecutorService} instance used to run JDBC invocations concurrently.
263     */
264    @Deprecated
265    public AsyncQueryRunner(final DataSource ds, final ExecutorService executorService) {
266        this(ds, false, executorService);
267    }
268
269    /**
270     * Constructor for AsyncQueryRunner.
271     *
272     * @param executorService the {@code ExecutorService} instance used to run JDBC invocations concurrently.
273     */
274    public AsyncQueryRunner(final ExecutorService executorService) {
275        this(null, false, executorService);
276    }
277
278    /**
279     * Constructor for AsyncQueryRunner which uses a provided ExecutorService and underlying QueryRunner.
280     *
281     * @param executorService the {@code ExecutorService} instance used to run JDBC invocations concurrently.
282     * @param queryRunner the {@code QueryRunner} instance to use for the queries.
283     * @since 1.5
284     */
285    public AsyncQueryRunner(final ExecutorService executorService, final QueryRunner queryRunner) {
286        this.executorService = executorService;
287        this.queryRunner = queryRunner;
288    }
289
290    /**
291     * Execute a batch of SQL INSERT, UPDATE, or DELETE queries.
292     *
293     * @param conn The {@code Connection} to use to run the query.  The caller is
294     * responsible for closing this Connection.
295     * @param sql The SQL to execute.
296     * @param params An array of query replacement parameters.  Each row in
297     * this array is one set of batch replacement values.
298     * @return A {@code Future} which returns the number of rows updated per statement.
299     * @throws SQLException if a database access error occurs
300     */
301    public Future<int[]> batch(final Connection conn, final String sql, final Object[][] params) throws SQLException {
302        return executorService.submit(() -> queryRunner.batch(conn, sql, params));
303    }
304
305    /**
306     * Execute a batch of SQL INSERT, UPDATE, or DELETE queries.  The
307     * {@code Connection} is retrieved from the {@code DataSource}
308     * set in the constructor.  This {@code Connection} must be in
309     * auto-commit mode or the update will not be saved.
310     *
311     * @param sql The SQL to execute.
312     * @param params An array of query replacement parameters.  Each row in
313     * this array is one set of batch replacement values.
314     * @return A {@code Future} which returns the number of rows updated per statement.
315     * @throws SQLException if a database access error occurs
316     */
317    public Future<int[]> batch(final String sql, final Object[][] params) throws SQLException {
318        return executorService.submit(() -> queryRunner.batch(sql, params));
319    }
320
321    /**
322     * Executes {@link QueryRunner#insert(Connection, String, ResultSetHandler)} asynchronously.
323     *
324     * @param <T> Return type expected
325     * @param conn {@link Connection} to use to execute the SQL statement
326     * @param sql SQL insert statement to execute
327     * @param rsh {@link ResultSetHandler} for handling the results
328     * @return {@link Future} that executes a query runner insert
329     * @see QueryRunner#insert(Connection, String, ResultSetHandler)
330     * @throws SQLException if a database access error occurs
331     * @since 1.6
332     */
333    public <T> Future<T> insert(final Connection conn, final String sql, final ResultSetHandler<T> rsh) throws SQLException {
334        return executorService.submit(() -> queryRunner.insert(conn, sql, rsh));
335    }
336
337    /**
338     * Executes {@link QueryRunner#insert(Connection, String, ResultSetHandler, Object...)} asynchronously.
339     *
340     * @param <T> Return type expected
341     * @param conn {@link Connection} to use to execute the SQL statement
342     * @param sql SQL insert statement to execute
343     * @param rsh {@link ResultSetHandler} for handling the results
344     * @param params Parameter values for substitution in the SQL statement
345     * @return {@link Future} that executes a query runner insert
346     * @see QueryRunner#insert(Connection, String, ResultSetHandler, Object...)
347     * @throws SQLException if a database access error occurs
348     * @since 1.6
349     */
350    public <T> Future<T> insert(final Connection conn, final String sql, final ResultSetHandler<T> rsh, final Object... params) throws SQLException {
351        return executorService.submit(() -> queryRunner.insert(conn, sql, rsh, params));
352    }
353
354    /**
355     * Executes {@link QueryRunner#insert(String, ResultSetHandler)} asynchronously.
356     *
357     * @param <T> Return type expected
358     * @param sql SQL insert statement to execute
359     * @param rsh {@link ResultSetHandler} for handling the results
360     * @return {@link Future} that executes a query runner insert
361     * @see QueryRunner#insert(String, ResultSetHandler)
362     * @throws SQLException if a database access error occurs
363     * @since 1.6
364     */
365    public <T> Future<T> insert(final String sql, final ResultSetHandler<T> rsh) throws SQLException {
366        return executorService.submit(() -> queryRunner.insert(sql, rsh));
367    }
368
369    /**
370     * Executes {@link QueryRunner#insert(String, ResultSetHandler, Object...)} asynchronously.
371     *
372     * @param <T> Return type expected
373     * @param sql SQL insert statement to execute
374     * @param rsh {@link ResultSetHandler} for handling the results
375     * @param params Parameter values for substitution in the SQL statement
376     * @return {@link Future} that executes a query runner insert
377     * @see QueryRunner#insert(String, ResultSetHandler, Object...)
378     * @throws SQLException if a database access error occurs
379     * @since 1.6
380     */
381    public <T> Future<T> insert(final String sql, final ResultSetHandler<T> rsh, final Object... params) throws SQLException {
382        return executorService.submit(() -> queryRunner.insert(sql, rsh, params));
383    }
384
385    /**
386     * {@link QueryRunner#insertBatch(Connection, String, ResultSetHandler, Object[][])} asynchronously.
387     *
388     * @param <T> Return type expected
389     * @param conn {@link Connection} to use to execute the SQL statement
390     * @param sql SQL insert statement to execute
391     * @param rsh {@link ResultSetHandler} for handling the results
392     * @param params An array of query replacement parameters.  Each row in
393     *        this array is one set of batch replacement values.
394     * @return {@link Future} that executes a query runner batch insert
395     * @see QueryRunner#insertBatch(Connection, String, ResultSetHandler, Object[][])
396     * @throws SQLException if a database access error occurs
397     * @since 1.6
398     */
399    public <T> Future<T> insertBatch(final Connection conn, final String sql, final ResultSetHandler<T> rsh, final Object[][] params) throws SQLException {
400        return executorService.submit(() -> queryRunner.insertBatch(conn, sql, rsh, params));
401    }
402
403    /**
404     * {@link QueryRunner#insertBatch(String, ResultSetHandler, Object[][])} asynchronously.
405     *
406     * @param <T> Return type expected
407     * @param sql SQL insert statement to execute
408     * @param rsh {@link ResultSetHandler} for handling the results
409     * @param params An array of query replacement parameters.  Each row in
410     *        this array is one set of batch replacement values.
411     * @return {@link Future} that executes a query runner batch insert
412     * @see QueryRunner#insertBatch(String, ResultSetHandler, Object[][])
413     * @throws SQLException if a database access error occurs
414     * @since 1.6
415     */
416    public <T> Future<T> insertBatch(final String sql, final ResultSetHandler<T> rsh, final Object[][] params) throws SQLException {
417        return executorService.submit(() -> queryRunner.insertBatch(sql, rsh, params));
418    }
419
420    /**
421     * Execute an SQL SELECT query without any replacement parameters.  The
422     * caller is responsible for closing the connection.
423     * @param <T> The type of object that the handler returns
424     * @param conn The connection to execute the query in.
425     * @param sql The query to execute.
426     * @param rsh The handler that converts the results into an object.
427     * @return A {@code Future} which returns the result of the query call.
428     * @throws SQLException if a database access error occurs
429     */
430    public <T> Future<T> query(final Connection conn, final String sql, final ResultSetHandler<T> rsh) throws SQLException {
431        return executorService.submit(() -> queryRunner.query(conn, sql, rsh));
432    }
433
434    /**
435     * Execute an SQL SELECT query with replacement parameters.  The
436     * caller is responsible for closing the connection.
437     * @param <T> The type of object that the handler returns
438     * @param conn The connection to execute the query in.
439     * @param sql The query to execute.
440     * @param rsh The handler that converts the results into an object.
441     * @param params The replacement parameters.
442     * @return A {@code Future} which returns the result of the query call.
443     * @throws SQLException if a database access error occurs
444     */
445    public <T> Future<T> query(final Connection conn, final String sql, final ResultSetHandler<T> rsh, final Object... params)
446            throws SQLException {
447        return executorService.submit(() -> queryRunner.query(conn, sql, rsh, params));
448    }
449
450    /**
451     * Executes the given SELECT SQL without any replacement parameters.
452     * The {@code Connection} is retrieved from the
453     * {@code DataSource} set in the constructor.
454     * @param <T> The type of object that the handler returns
455     * @param sql The SQL statement to execute.
456     * @param rsh The handler used to create the result object from
457     * the {@code ResultSet}.
458     *
459     * @return A {@code Future} which returns the result of the query call.
460     * @throws SQLException if a database access error occurs
461     */
462    public <T> Future<T> query(final String sql, final ResultSetHandler<T> rsh) throws SQLException {
463        return executorService.submit(() -> queryRunner.query(sql, rsh));
464    }
465
466    /**
467     * Executes the given SELECT SQL query and returns a result object.
468     * The {@code Connection} is retrieved from the
469     * {@code DataSource} set in the constructor.
470     * @param <T> The type of object that the handler returns
471     * @param sql The SQL statement to execute.
472     * @param rsh The handler used to create the result object from
473     * the {@code ResultSet}.
474     * @param params Initialize the PreparedStatement's IN parameters with
475     * this array.
476     * @return A {@code Future} which returns the result of the query call.
477     * @throws SQLException if a database access error occurs
478     */
479    public <T> Future<T> query(final String sql, final ResultSetHandler<T> rsh, final Object... params) throws SQLException {
480        return executorService.submit(() -> queryRunner.query(sql, rsh, params));
481    }
482
483    /**
484     * Execute an SQL INSERT, UPDATE, or DELETE query without replacement
485     * parameters.
486     *
487     * @param conn The connection to use to run the query.
488     * @param sql The SQL to execute.
489     * @return A {@code Future} which returns the number of rows updated.
490     * @throws SQLException if a database access error occurs
491     */
492    public Future<Integer> update(final Connection conn, final String sql) throws SQLException {
493        return executorService.submit(() -> Integer.valueOf(queryRunner.update(conn, sql)));
494    }
495
496    /**
497     * Execute an SQL INSERT, UPDATE, or DELETE query with a single replacement
498     * parameter.
499     *
500     * @param conn The connection to use to run the query.
501     * @param sql The SQL to execute.
502     * @param param The replacement parameter.
503     * @return A {@code Future} which returns the number of rows updated.
504     * @throws SQLException if a database access error occurs
505     */
506    public Future<Integer> update(final Connection conn, final String sql, final Object param) throws SQLException {
507        return executorService.submit(() -> Integer.valueOf(queryRunner.update(conn, sql, param)));
508    }
509
510    /**
511     * Execute an SQL INSERT, UPDATE, or DELETE query.
512     *
513     * @param conn The connection to use to run the query.
514     * @param sql The SQL to execute.
515     * @param params The query replacement parameters.
516     * @return A {@code Future} which returns the number of rows updated.
517     * @throws SQLException if a database access error occurs
518     */
519    public Future<Integer> update(final Connection conn, final String sql, final Object... params) throws SQLException {
520        return executorService.submit(() -> Integer.valueOf(queryRunner.update(conn, sql, params)));
521    }
522
523    /**
524     * Executes the given INSERT, UPDATE, or DELETE SQL statement without
525     * any replacement parameters. The {@code Connection} is retrieved
526     * from the {@code DataSource} set in the constructor.  This
527     * {@code Connection} must be in auto-commit mode or the update will
528     * not be saved.
529     *
530     * @param sql The SQL statement to execute.
531     * @throws SQLException if a database access error occurs
532     * @return A {@code Future} which returns the number of rows updated.
533     */
534    public Future<Integer> update(final String sql) throws SQLException {
535        return executorService.submit(() -> Integer.valueOf(queryRunner.update(sql)));
536    }
537
538    /**
539     * Executes the given INSERT, UPDATE, or DELETE SQL statement with
540     * a single replacement parameter.  The {@code Connection} is
541     * retrieved from the {@code DataSource} set in the constructor.
542     * This {@code Connection} must be in auto-commit mode or the
543     * update will not be saved.
544     *
545     * @param sql The SQL statement to execute.
546     * @param param The replacement parameter.
547     * @throws SQLException if a database access error occurs
548     * @return A {@code Future} which returns the number of rows updated.
549     */
550    public Future<Integer> update(final String sql, final Object param) throws SQLException {
551        return executorService.submit(() -> Integer.valueOf(queryRunner.update(sql, param)));
552    }
553
554    /**
555     * Executes the given INSERT, UPDATE, or DELETE SQL statement.  The
556     * {@code Connection} is retrieved from the {@code DataSource}
557     * set in the constructor.  This {@code Connection} must be in
558     * auto-commit mode or the update will not be saved.
559     *
560     * @param sql The SQL statement to execute.
561     * @param params Initializes the PreparedStatement's IN (i.e. '?')
562     * parameters.
563     * @throws SQLException if a database access error occurs
564     * @return A {@code Future} which returns the number of rows updated.
565     */
566    public Future<Integer> update(final String sql, final Object... params) throws SQLException {
567        return executorService.submit(() -> Integer.valueOf(queryRunner.update(sql, params)));
568    }
569
570}