001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.commons.dbutils;
018
019import java.sql.Connection;
020import java.sql.PreparedStatement;
021import java.sql.ResultSet;
022import java.sql.SQLException;
023import java.util.concurrent.Callable;
024import java.util.concurrent.ExecutorService;
025import java.util.concurrent.Future;
026
027import javax.sql.DataSource;
028
029/**
030 * Executes SQL queries with pluggable strategies for handling
031 * <code>ResultSet</code>s.  This class is thread safe.
032 *
033 * @see ResultSetHandler
034 * @since 1.4
035 */
036public class AsyncQueryRunner extends AbstractQueryRunner {
037
038    private final ExecutorService executorService;
039    private final QueryRunner queryRunner;
040
041    /**
042     * Constructor for AsyncQueryRunner which uses a provided ExecutorService and underlying QueryRunner.
043     *
044     * @param executorService the {@code ExecutorService} instance used to run JDBC invocations concurrently.
045     * @param queryRunner the {@code QueryRunner} instance to use for the queries.
046     * @since DbUtils 1.5
047     */
048    public AsyncQueryRunner(ExecutorService executorService, QueryRunner queryRunner) {
049        this.executorService = executorService;
050        this.queryRunner = queryRunner;
051    }
052
053    /**
054     * Constructor for AsyncQueryRunner.
055     *
056     * @param executorService the {@code ExecutorService} instance used to run JDBC invocations concurrently.
057     */
058    public AsyncQueryRunner(ExecutorService executorService) {
059        this(null, false, executorService);
060    }
061
062    /**
063     * @deprecated Use {@link #AsyncQueryRunner(ExecutorService, QueryRunner)} instead.
064     * Constructor for AsyncQueryRunner that controls the use of <code>ParameterMetaData</code>.
065     *
066     * @param pmdKnownBroken Some drivers don't support {@link java.sql.ParameterMetaData#getParameterType(int) };
067     * if <code>pmdKnownBroken</code> is set to true, we won't even try it; if false, we'll try it,
068     * and if it breaks, we'll remember not to use it again.
069     * @param executorService the {@code ExecutorService} instance used to run JDBC invocations concurrently.
070     */
071    @Deprecated
072    public AsyncQueryRunner(boolean pmdKnownBroken, ExecutorService executorService) {
073        this(null, pmdKnownBroken, executorService);
074    }
075
076    /**
077     * @deprecated Use {@link #AsyncQueryRunner(ExecutorService, QueryRunner)} instead.
078     * Constructor for AsyncQueryRunner that takes a <code>DataSource</code>.
079     *
080     * Methods that do not take a <code>Connection</code> parameter will retrieve connections from this
081     * <code>DataSource</code>.
082     *
083     * @param ds The <code>DataSource</code> to retrieve connections from.
084     * @param executorService the {@code ExecutorService} instance used to run JDBC invocations concurrently.
085     */
086    @Deprecated
087    public AsyncQueryRunner(DataSource ds, ExecutorService executorService) {
088        this(ds, false, executorService);
089    }
090
091    /**
092     * @deprecated Use {@link #AsyncQueryRunner(ExecutorService, QueryRunner)} instead.
093     * Constructor for AsyncQueryRunner that take a <code>DataSource</code> and controls the use of <code>ParameterMetaData</code>.
094     * Methods that do not take a <code>Connection</code> parameter will retrieve connections from this
095     * <code>DataSource</code>.
096     *
097     * @param ds The <code>DataSource</code> to retrieve connections from.
098     * @param pmdKnownBroken Some drivers don't support {@link java.sql.ParameterMetaData#getParameterType(int) };
099     * if <code>pmdKnownBroken</code> is set to true, we won't even try it; if false, we'll try it,
100     * and if it breaks, we'll remember not to use it again.
101     * @param executorService the {@code ExecutorService} instance used to run JDBC invocations concurrently.
102     */
103    @Deprecated
104    public AsyncQueryRunner(DataSource ds, boolean pmdKnownBroken, ExecutorService executorService) {
105        super(ds, pmdKnownBroken);
106        this.executorService = executorService;
107        this.queryRunner = new QueryRunner(ds, pmdKnownBroken);
108    }
109
110    /**
111     * @deprecated No longer used by this class. Will be removed in a future version.
112     * Class that encapsulates the continuation for batch calls.
113     */
114    @Deprecated
115    protected class BatchCallableStatement implements Callable<int[]> {
116        private final String sql;
117        private final Object[][] params;
118        private final Connection conn;
119        private final boolean closeConn;
120        private final PreparedStatement ps;
121
122        /**
123         * Creates a new BatchCallableStatement instance.
124         *
125         * @param sql The SQL statement to execute.
126         * @param params An array of query replacement parameters.  Each row in
127         *        this array is one set of batch replacement values.
128         * @param conn The connection to use for the batch call.
129         * @param closeConn True if the connection should be closed, false otherwise.
130         * @param ps The {@link PreparedStatement} to be executed.
131         */
132        public BatchCallableStatement(String sql, Object[][] params, Connection conn, boolean closeConn, PreparedStatement ps) {
133            this.sql = sql;
134            this.params = params.clone();
135            this.conn = conn;
136            this.closeConn = closeConn;
137            this.ps = ps;
138        }
139
140        /**
141         * The actual call to executeBatch.
142         *
143         * @return an array of update counts containing one element for each command in the batch.
144         * @throws SQLException if a database access error occurs or one of the commands sent to the database fails.
145         * @see PreparedStatement#executeBatch()
146         */
147        @Override
148        public int[] call() throws SQLException {
149            int[] ret = null;
150
151            try {
152                ret = ps.executeBatch();
153            } catch (SQLException e) {
154                rethrow(e, sql, (Object[])params);
155            } finally {
156                close(ps);
157                if (closeConn) {
158                    close(conn);
159                }
160            }
161
162            return ret;
163        }
164    }
165
166    /**
167     * Execute a batch of SQL INSERT, UPDATE, or DELETE queries.
168     *
169     * @param conn The <code>Connection</code> to use to run the query.  The caller is
170     * responsible for closing this Connection.
171     * @param sql The SQL to execute.
172     * @param params An array of query replacement parameters.  Each row in
173     * this array is one set of batch replacement values.
174     * @return A <code>Future</code> which returns the number of rows updated per statement.
175     * @throws SQLException if a database access error occurs
176     */
177    public Future<int[]> batch(final Connection conn, final String sql, final Object[][] params) throws SQLException {
178        return executorService.submit(new Callable<int[]>() {
179
180            @Override
181            public int[] call() throws Exception {
182                return queryRunner.batch(conn, sql, params);
183            }
184
185        });
186    }
187
188    /**
189     * Execute a batch of SQL INSERT, UPDATE, or DELETE queries.  The
190     * <code>Connection</code> is retrieved from the <code>DataSource</code>
191     * set in the constructor.  This <code>Connection</code> must be in
192     * auto-commit mode or the update will not be saved.
193     *
194     * @param sql The SQL to execute.
195     * @param params An array of query replacement parameters.  Each row in
196     * this array is one set of batch replacement values.
197     * @return A <code>Future</code> which returns the number of rows updated per statement.
198     * @throws SQLException if a database access error occurs
199     */
200    public Future<int[]> batch(final String sql, final Object[][] params) throws SQLException {
201        return executorService.submit(new Callable<int[]>() {
202
203            @Override
204            public int[] call() throws Exception {
205                return queryRunner.batch(sql, params);
206            }
207
208        });
209    }
210
211    /**
212     * Class that encapsulates the continuation for query calls.
213     * @param <T> The type of the result from the call to handle.
214     */
215    protected class QueryCallableStatement<T> implements Callable<T> {
216        private final String sql;
217        private final Object[] params;
218        private final Connection conn;
219        private final boolean closeConn;
220        private final PreparedStatement ps;
221        private final ResultSetHandler<T> rsh;
222
223        /**
224         * Creates a new {@code QueryCallableStatement} instance.
225         *
226         * @param conn The connection to use for the batch call.
227         * @param closeConn True if the connection should be closed, false otherwise.
228         * @param ps The {@link PreparedStatement} to be executed.
229         * @param rsh The handler that converts the results into an object.
230         * @param sql The SQL statement to execute.
231         * @param params An array of query replacement parameters.  Each row in
232         *        this array is one set of batch replacement values.
233         */
234        public QueryCallableStatement(Connection conn, boolean closeConn, PreparedStatement ps,
235                ResultSetHandler<T> rsh, String sql, Object... params) {
236            this.sql = sql;
237            this.params = params;
238            this.conn = conn;
239            this.closeConn = closeConn;
240            this.ps = ps;
241            this.rsh = rsh;
242        }
243
244        /**
245         * The actual call to {@code handle()} method.
246         *
247         * @return an array of update counts containing one element for each command in the batch.
248         * @throws SQLException if a database access error occurs.
249         * @see ResultSetHandler#handle(ResultSet)
250         */
251        @Override
252        public T call() throws SQLException {
253            ResultSet rs = null;
254            T ret = null;
255
256            try {
257                rs = wrap(ps.executeQuery());
258                ret = rsh.handle(rs);
259            } catch (SQLException e) {
260                rethrow(e, sql, params);
261            } finally {
262                try {
263                    close(rs);
264                } finally {
265                    close(ps);
266                    if (closeConn) {
267                        close(conn);
268                    }
269                }
270            }
271
272            return ret;
273        }
274
275    }
276
277    /**
278     * Execute an SQL SELECT query with replacement parameters.  The
279     * caller is responsible for closing the connection.
280     * @param <T> The type of object that the handler returns
281     * @param conn The connection to execute the query in.
282     * @param sql The query to execute.
283     * @param rsh The handler that converts the results into an object.
284     * @param params The replacement parameters.
285     * @return A <code>Future</code> which returns the result of the query call.
286     * @throws SQLException if a database access error occurs
287     */
288    public <T> Future<T> query(final Connection conn, final String sql, final ResultSetHandler<T> rsh, final Object... params)
289            throws SQLException {
290        return executorService.submit(new Callable<T>() {
291
292            @Override
293            public T call() throws Exception {
294                return queryRunner.query(conn, sql, rsh, params);
295            }
296
297        });
298    }
299
300    /**
301     * Execute an SQL SELECT query without any replacement parameters.  The
302     * caller is responsible for closing the connection.
303     * @param <T> The type of object that the handler returns
304     * @param conn The connection to execute the query in.
305     * @param sql The query to execute.
306     * @param rsh The handler that converts the results into an object.
307     * @return A <code>Future</code> which returns the result of the query call.
308     * @throws SQLException if a database access error occurs
309     */
310    public <T> Future<T> query(final Connection conn, final String sql, final ResultSetHandler<T> rsh) throws SQLException {
311        return executorService.submit(new Callable<T>() {
312
313            @Override
314            public T call() throws Exception {
315                return queryRunner.query(conn, sql, rsh);
316            }
317
318        });
319    }
320
321    /**
322     * Executes the given SELECT SQL query and returns a result object.
323     * The <code>Connection</code> is retrieved from the
324     * <code>DataSource</code> set in the constructor.
325     * @param <T> The type of object that the handler returns
326     * @param sql The SQL statement to execute.
327     * @param rsh The handler used to create the result object from
328     * the <code>ResultSet</code>.
329     * @param params Initialize the PreparedStatement's IN parameters with
330     * this array.
331     * @return A <code>Future</code> which returns the result of the query call.
332     * @throws SQLException if a database access error occurs
333     */
334    public <T> Future<T> query(final String sql, final ResultSetHandler<T> rsh, final Object... params) throws SQLException {
335        return executorService.submit(new Callable<T>() {
336
337            @Override
338            public T call() throws Exception {
339                return queryRunner.query(sql, rsh, params);
340            }
341
342        });
343    }
344
345    /**
346     * Executes the given SELECT SQL without any replacement parameters.
347     * The <code>Connection</code> is retrieved from the
348     * <code>DataSource</code> set in the constructor.
349     * @param <T> The type of object that the handler returns
350     * @param sql The SQL statement to execute.
351     * @param rsh The handler used to create the result object from
352     * the <code>ResultSet</code>.
353     *
354     * @return A <code>Future</code> which returns the result of the query call.
355     * @throws SQLException if a database access error occurs
356     */
357    public <T> Future<T> query(final String sql, final ResultSetHandler<T> rsh) throws SQLException {
358        return executorService.submit(new Callable<T>() {
359
360            @Override
361            public T call() throws Exception {
362                return queryRunner.query(sql, rsh);
363            }
364
365        });
366    }
367
368    /**
369     * @deprecated No longer used by this class. Will be removed in a future version.
370     * Class that encapsulates the continuation for update calls.
371     */
372    @Deprecated
373    protected class UpdateCallableStatement implements Callable<Integer> {
374        private final String sql;
375        private final Object[] params;
376        private final Connection conn;
377        private final boolean closeConn;
378        private final PreparedStatement ps;
379
380        /**
381         *
382         *
383         * @param conn The connection to use for the batch call.
384         * @param closeConn True if the connection should be closed, false otherwise.
385         * @param ps The {@link PreparedStatement} to be executed.
386         * @param sql The SQL statement to execute.
387         * @param params An array of query replacement parameters.  Each row in
388         *        this array is one set of batch replacement values.
389         */
390        public UpdateCallableStatement(Connection conn, boolean closeConn, PreparedStatement ps, String sql, Object... params) {
391            this.sql = sql;
392            this.params = params;
393            this.conn = conn;
394            this.closeConn = closeConn;
395            this.ps = ps;
396        }
397
398        /**
399         * The actual call to {@code executeUpdate()} method.
400         *
401         * @return either (1) the row count for SQL Data Manipulation Language (DML) statements or
402         *                (2) 0 for SQL statements that return nothing
403         * @throws SQLException if a database access error occurs.
404         * @see PreparedStatement#executeUpdate()
405         */
406        @Override
407        public Integer call() throws SQLException {
408            int rows = 0;
409
410            try {
411                rows = ps.executeUpdate();
412            } catch (SQLException e) {
413                rethrow(e, sql, params);
414            } finally {
415                close(ps);
416                if (closeConn) {
417                    close(conn);
418                }
419            }
420
421            return Integer.valueOf(rows);
422        }
423
424    }
425
426    /**
427     * Execute an SQL INSERT, UPDATE, or DELETE query without replacement
428     * parameters.
429     *
430     * @param conn The connection to use to run the query.
431     * @param sql The SQL to execute.
432     * @return A <code>Future</code> which returns the number of rows updated.
433     * @throws SQLException if a database access error occurs
434     */
435    public Future<Integer> update(final Connection conn, final String sql) throws SQLException {
436        return executorService.submit(new Callable<Integer>() {
437
438            @Override
439            public Integer call() throws Exception {
440                return Integer.valueOf(queryRunner.update(conn, sql));
441            }
442
443        });
444    }
445
446    /**
447     * Execute an SQL INSERT, UPDATE, or DELETE query with a single replacement
448     * parameter.
449     *
450     * @param conn The connection to use to run the query.
451     * @param sql The SQL to execute.
452     * @param param The replacement parameter.
453     * @return A <code>Future</code> which returns the number of rows updated.
454     * @throws SQLException if a database access error occurs
455     */
456    public Future<Integer> update(final Connection conn, final String sql, final Object param) throws SQLException {
457        return executorService.submit(new Callable<Integer>() {
458
459            @Override
460            public Integer call() throws Exception {
461                return Integer.valueOf(queryRunner.update(conn, sql, param));
462            }
463
464        });
465    }
466
467    /**
468     * Execute an SQL INSERT, UPDATE, or DELETE query.
469     *
470     * @param conn The connection to use to run the query.
471     * @param sql The SQL to execute.
472     * @param params The query replacement parameters.
473     * @return A <code>Future</code> which returns the number of rows updated.
474     * @throws SQLException if a database access error occurs
475     */
476    public Future<Integer> update(final Connection conn, final String sql, final Object... params) throws SQLException {
477        return executorService.submit(new Callable<Integer>() {
478
479            @Override
480            public Integer call() throws Exception {
481                return Integer.valueOf(queryRunner.update(conn, sql, params));
482            }
483
484        });
485    }
486
487    /**
488     * Executes the given INSERT, UPDATE, or DELETE SQL statement without
489     * any replacement parameters. The <code>Connection</code> is retrieved
490     * from the <code>DataSource</code> set in the constructor.  This
491     * <code>Connection</code> must be in auto-commit mode or the update will
492     * not be saved.
493     *
494     * @param sql The SQL statement to execute.
495     * @throws SQLException if a database access error occurs
496     * @return A <code>Future</code> which returns the number of rows updated.
497     */
498    public Future<Integer> update(final String sql) throws SQLException {
499        return executorService.submit(new Callable<Integer>() {
500
501            @Override
502            public Integer call() throws Exception {
503                return Integer.valueOf(queryRunner.update(sql));
504            }
505
506        });
507    }
508
509    /**
510     * Executes the given INSERT, UPDATE, or DELETE SQL statement with
511     * a single replacement parameter.  The <code>Connection</code> is
512     * retrieved from the <code>DataSource</code> set in the constructor.
513     * This <code>Connection</code> must be in auto-commit mode or the
514     * update will not be saved.
515     *
516     * @param sql The SQL statement to execute.
517     * @param param The replacement parameter.
518     * @throws SQLException if a database access error occurs
519     * @return A <code>Future</code> which returns the number of rows updated.
520     */
521    public Future<Integer> update(final String sql, final Object param) throws SQLException {
522        return executorService.submit(new Callable<Integer>() {
523
524            @Override
525            public Integer call() throws Exception {
526                return Integer.valueOf(queryRunner.update(sql, param));
527            }
528
529        });
530    }
531
532    /**
533     * Executes the given INSERT, UPDATE, or DELETE SQL statement.  The
534     * <code>Connection</code> is retrieved from the <code>DataSource</code>
535     * set in the constructor.  This <code>Connection</code> must be in
536     * auto-commit mode or the update will not be saved.
537     *
538     * @param sql The SQL statement to execute.
539     * @param params Initializes the PreparedStatement's IN (i.e. '?')
540     * parameters.
541     * @throws SQLException if a database access error occurs
542     * @return A <code>Future</code> which returns the number of rows updated.
543     */
544    public Future<Integer> update(final String sql, final Object... params) throws SQLException {
545        return executorService.submit(new Callable<Integer>() {
546
547            @Override
548            public Integer call() throws Exception {
549                return Integer.valueOf(queryRunner.update(sql, params));
550            }
551
552        });
553    }
554
555    /**
556     * Executes {@link QueryRunner#insert(String, ResultSetHandler)} asynchronously.
557     *
558     * @param <T> Return type expected
559     * @param sql SQL insert statement to execute
560     * @param rsh {@link ResultSetHandler} for handling the results
561     * @return {@link Future} that executes a query runner insert
562     * @see QueryRunner#insert(String, ResultSetHandler)
563     * @throws SQLException if a database access error occurs
564     * @since 1.6
565     */
566    public <T> Future<T> insert(final String sql, final ResultSetHandler<T> rsh) throws SQLException {
567        return executorService.submit(new Callable<T>() {
568
569            @Override
570            public T call() throws Exception {
571                return queryRunner.insert(sql, rsh);
572            }
573
574        });
575    }
576
577    /**
578     * Executes {@link QueryRunner#insert(String, ResultSetHandler, Object...)} asynchronously.
579     *
580     * @param <T> Return type expected
581     * @param sql SQL insert statement to execute
582     * @param rsh {@link ResultSetHandler} for handling the results
583     * @param params Parameter values for substitution in the SQL statement
584     * @return {@link Future} that executes a query runner insert
585     * @see QueryRunner#insert(String, ResultSetHandler, Object...)
586     * @throws SQLException if a database access error occurs
587     * @since 1.6
588     */
589    public <T> Future<T> insert(final String sql, final ResultSetHandler<T> rsh, final Object... params) throws SQLException {
590        return executorService.submit(new Callable<T>() {
591
592            @Override
593            public T call() throws Exception {
594                return queryRunner.insert(sql, rsh, params);
595            }
596        });
597    }
598
599    /**
600     * Executes {@link QueryRunner#insert(Connection, String, ResultSetHandler)} asynchronously.
601     *
602     * @param <T> Return type expected
603     * @param conn {@link Connection} to use to execute the SQL statement
604     * @param sql SQL insert statement to execute
605     * @param rsh {@link ResultSetHandler} for handling the results
606     * @return {@link Future} that executes a query runner insert
607     * @see QueryRunner#insert(Connection, String, ResultSetHandler)
608     * @throws SQLException if a database access error occurs
609     * @since 1.6
610     */
611    public <T> Future<T> insert(final Connection conn, final String sql, final ResultSetHandler<T> rsh) throws SQLException {
612        return executorService.submit(new Callable<T>() {
613
614            @Override
615            public T call() throws Exception {
616                return queryRunner.insert(conn, sql, rsh);
617            }
618        });
619    }
620
621    /**
622     * Executes {@link QueryRunner#insert(Connection, String, ResultSetHandler, Object...)} asynchronously.
623     *
624     * @param <T> Return type expected
625     * @param conn {@link Connection} to use to execute the SQL statement
626     * @param sql SQL insert statement to execute
627     * @param rsh {@link ResultSetHandler} for handling the results
628     * @param params Parameter values for substitution in the SQL statement
629     * @return {@link Future} that executes a query runner insert
630     * @see QueryRunner#insert(Connection, String, ResultSetHandler, Object...)
631     * @throws SQLException if a database access error occurs
632     * @since 1.6
633     */
634    public <T> Future<T> insert(final Connection conn, final String sql, final ResultSetHandler<T> rsh, final Object... params) throws SQLException {
635        return executorService.submit(new Callable<T>() {
636
637            @Override
638            public T call() throws Exception {
639                return queryRunner.insert(conn, sql, rsh, params);
640            }
641        });
642    }
643
644    /**
645     * {@link QueryRunner#insertBatch(String, ResultSetHandler, Object[][])} asynchronously.
646     *
647     * @param <T> Return type expected
648     * @param sql SQL insert statement to execute
649     * @param rsh {@link ResultSetHandler} for handling the results
650     * @param params An array of query replacement parameters.  Each row in
651     *        this array is one set of batch replacement values.
652     * @return {@link Future} that executes a query runner batch insert
653     * @see QueryRunner#insertBatch(String, ResultSetHandler, Object[][])
654     * @throws SQLException if a database access error occurs
655     * @since 1.6
656     */
657    public <T> Future<T> insertBatch(final String sql, final ResultSetHandler<T> rsh, final Object[][] params) throws SQLException {
658        return executorService.submit(new Callable<T>() {
659
660            @Override
661            public T call() throws Exception {
662                return queryRunner.insertBatch(sql, rsh, params);
663            }
664        });
665    }
666
667    /**
668     * {@link QueryRunner#insertBatch(Connection, String, ResultSetHandler, Object[][])} asynchronously.
669     *
670     * @param <T> Return type expected
671     * @param conn {@link Connection} to use to execute the SQL statement
672     * @param sql SQL insert statement to execute
673     * @param rsh {@link ResultSetHandler} for handling the results
674     * @param params An array of query replacement parameters.  Each row in
675     *        this array is one set of batch replacement values.
676     * @return {@link Future} that executes a query runner batch insert
677     * @see QueryRunner#insertBatch(Connection, String, ResultSetHandler, Object[][])
678     * @throws SQLException if a database access error occurs
679     * @since 1.6
680     */
681    public <T> Future<T> insertBatch(final Connection conn, final String sql, final ResultSetHandler<T> rsh, final Object[][] params) throws SQLException {
682        return executorService.submit(new Callable<T>() {
683
684            @Override
685            public T call() throws Exception {
686                return queryRunner.insertBatch(conn, sql, rsh, params);
687            }
688        });
689    }
690
691}