AsyncQueryRunner.java

  1. /*
  2.  * Licensed to the Apache Software Foundation (ASF) under one or more
  3.  * contributor license agreements.  See the NOTICE file distributed with
  4.  * this work for additional information regarding copyright ownership.
  5.  * The ASF licenses this file to You under the Apache License, Version 2.0
  6.  * (the "License"); you may not use this file except in compliance with
  7.  * the License.  You may obtain a copy of the License at
  8.  *
  9.  *      http://www.apache.org/licenses/LICENSE-2.0
  10.  *
  11.  * Unless required by applicable law or agreed to in writing, software
  12.  * distributed under the License is distributed on an "AS IS" BASIS,
  13.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14.  * See the License for the specific language governing permissions and
  15.  * limitations under the License.
  16.  */
  17. package org.apache.commons.dbutils;

  18. import java.sql.Connection;
  19. import java.sql.PreparedStatement;
  20. import java.sql.ResultSet;
  21. import java.sql.SQLException;
  22. import java.util.concurrent.Callable;
  23. import java.util.concurrent.ExecutorService;
  24. import java.util.concurrent.Future;

  25. import javax.sql.DataSource;

  26. /**
  27.  * Executes SQL queries with pluggable strategies for handling
  28.  * {@code ResultSet}s.  This class is thread safe.
  29.  *
  30.  * @see ResultSetHandler
  31.  * @since 1.4
  32.  */
  33. public class AsyncQueryRunner extends AbstractQueryRunner {

  34.     /**
  35.      * @deprecated No longer used by this class. Will be removed in a future version.
  36.      * Class that encapsulates the continuation for batch calls.
  37.      */
  38.     @Deprecated
  39.     protected class BatchCallableStatement implements Callable<int[]> {
  40.         private final String sql;
  41.         private final Object[][] params;
  42.         private final Connection conn;
  43.         private final boolean closeConn;
  44.         private final PreparedStatement ps;

  45.         /**
  46.          * Creates a new BatchCallableStatement instance.
  47.          *
  48.          * @param sql The SQL statement to execute.
  49.          * @param params An array of query replacement parameters.  Each row in
  50.          *        this array is one set of batch replacement values.
  51.          * @param conn The connection to use for the batch call.
  52.          * @param closeConn True if the connection should be closed, false otherwise.
  53.          * @param ps The {@link PreparedStatement} to be executed.
  54.          */
  55.         public BatchCallableStatement(final String sql, final Object[][] params, final Connection conn, final boolean closeConn, final PreparedStatement ps) {
  56.             this.sql = sql;
  57.             this.params = params.clone();
  58.             this.conn = conn;
  59.             this.closeConn = closeConn;
  60.             this.ps = ps;
  61.         }

  62.         /**
  63.          * The actual call to executeBatch.
  64.          *
  65.          * @return an array of update counts containing one element for each command in the batch.
  66.          * @throws SQLException if a database access error occurs or one of the commands sent to the database fails.
  67.          * @see PreparedStatement#executeBatch()
  68.          */
  69.         @Override
  70.         public int[] call() throws SQLException {
  71.             int[] ret = null;

  72.             try {
  73.                 ret = ps.executeBatch();
  74.             } catch (final SQLException e) {
  75.                 rethrow(e, sql, (Object[])params);
  76.             } finally {
  77.                 close(ps);
  78.                 if (closeConn) {
  79.                     close(conn);
  80.                 }
  81.             }

  82.             return ret;
  83.         }
  84.     }
  85.     /**
  86.      * Class that encapsulates the continuation for query calls.
  87.      * @param <T> The type of the result from the call to handle.
  88.      */
  89.     protected class QueryCallableStatement<T> implements Callable<T> {
  90.         private final String sql;
  91.         private final Object[] params;
  92.         private final Connection conn;
  93.         private final boolean closeConn;
  94.         private final PreparedStatement ps;
  95.         private final ResultSetHandler<T> rsh;

  96.         /**
  97.          * Creates a new {@code QueryCallableStatement} instance.
  98.          *
  99.          * @param conn The connection to use for the batch call.
  100.          * @param closeConn True if the connection should be closed, false otherwise.
  101.          * @param ps The {@link PreparedStatement} to be executed.
  102.          * @param rsh The handler that converts the results into an object.
  103.          * @param sql The SQL statement to execute.
  104.          * @param params An array of query replacement parameters.  Each row in
  105.          *        this array is one set of batch replacement values.
  106.          */
  107.         public QueryCallableStatement(final Connection conn, final boolean closeConn, final PreparedStatement ps,
  108.                 final ResultSetHandler<T> rsh, final String sql, final Object... params) {
  109.             this.sql = sql;
  110.             this.params = params;
  111.             this.conn = conn;
  112.             this.closeConn = closeConn;
  113.             this.ps = ps;
  114.             this.rsh = rsh;
  115.         }

  116.         /**
  117.          * The actual call to {@code handle()} method.
  118.          *
  119.          * @return an array of update counts containing one element for each command in the batch.
  120.          * @throws SQLException if a database access error occurs.
  121.          * @see ResultSetHandler#handle(ResultSet)
  122.          */
  123.         @Override
  124.         public T call() throws SQLException {
  125.             ResultSet resultSet = null;
  126.             T ret = null;

  127.             try {
  128.                 resultSet = wrap(ps.executeQuery());
  129.                 ret = rsh.handle(resultSet);
  130.             } catch (final SQLException e) {
  131.                 rethrow(e, sql, params);
  132.             } finally {
  133.                 try {
  134.                     close(resultSet);
  135.                 } finally {
  136.                     close(ps);
  137.                     if (closeConn) {
  138.                         close(conn);
  139.                     }
  140.                 }
  141.             }

  142.             return ret;
  143.         }

  144.     }

  145.     /**
  146.      * @deprecated No longer used by this class. Will be removed in a future version.
  147.      * Class that encapsulates the continuation for update calls.
  148.      */
  149.     @Deprecated
  150.     protected class UpdateCallableStatement implements Callable<Integer> {
  151.         private final String sql;
  152.         private final Object[] params;
  153.         private final Connection conn;
  154.         private final boolean closeConn;
  155.         private final PreparedStatement ps;

  156.         /**
  157.          *
  158.          *
  159.          * @param conn The connection to use for the batch call.
  160.          * @param closeConn True if the connection should be closed, false otherwise.
  161.          * @param ps The {@link PreparedStatement} to be executed.
  162.          * @param sql The SQL statement to execute.
  163.          * @param params An array of query replacement parameters.  Each row in
  164.          *        this array is one set of batch replacement values.
  165.          */
  166.         public UpdateCallableStatement(final Connection conn, final boolean closeConn, final PreparedStatement ps, final String sql, final Object... params) {
  167.             this.sql = sql;
  168.             this.params = params;
  169.             this.conn = conn;
  170.             this.closeConn = closeConn;
  171.             this.ps = ps;
  172.         }

  173.         /**
  174.          * The actual call to {@code executeUpdate()} method.
  175.          *
  176.          * @return either (1) the row count for SQL Data Manipulation Language (DML) statements or
  177.          *                (2) 0 for SQL statements that return nothing
  178.          * @throws SQLException if a database access error occurs.
  179.          * @see PreparedStatement#executeUpdate()
  180.          */
  181.         @Override
  182.         public Integer call() throws SQLException {
  183.             int rows = 0;

  184.             try {
  185.                 rows = ps.executeUpdate();
  186.             } catch (final SQLException e) {
  187.                 rethrow(e, sql, params);
  188.             } finally {
  189.                 close(ps);
  190.                 if (closeConn) {
  191.                     close(conn);
  192.                 }
  193.             }

  194.             return Integer.valueOf(rows);
  195.         }

  196.     }

  197.     private final ExecutorService executorService;

  198.     private final QueryRunner queryRunner;

  199.     /**
  200.      * @deprecated Use {@link #AsyncQueryRunner(ExecutorService, QueryRunner)} instead.
  201.      * Constructor for AsyncQueryRunner that controls the use of {@code ParameterMetaData}.
  202.      *
  203.      * @param pmdKnownBroken Some drivers don't support {@link java.sql.ParameterMetaData#getParameterType(int) };
  204.      * if {@code pmdKnownBroken} is set to true, we won't even try it; if false, we'll try it,
  205.      * and if it breaks, we'll remember not to use it again.
  206.      * @param executorService the {@code ExecutorService} instance used to run JDBC invocations concurrently.
  207.      */
  208.     @Deprecated
  209.     public AsyncQueryRunner(final boolean pmdKnownBroken, final ExecutorService executorService) {
  210.         this(null, pmdKnownBroken, executorService);
  211.     }

  212.     /**
  213.      * @deprecated Use {@link #AsyncQueryRunner(ExecutorService, QueryRunner)} instead.
  214.      * Constructor for AsyncQueryRunner that take a {@code DataSource} and controls the use of {@code ParameterMetaData}.
  215.      * Methods that do not take a {@code Connection} parameter will retrieve connections from this
  216.      * {@code DataSource}.
  217.      *
  218.      * @param ds The {@code DataSource} to retrieve connections from.
  219.      * @param pmdKnownBroken Some drivers don't support {@link java.sql.ParameterMetaData#getParameterType(int) };
  220.      * if {@code pmdKnownBroken} is set to true, we won't even try it; if false, we'll try it,
  221.      * and if it breaks, we'll remember not to use it again.
  222.      * @param executorService the {@code ExecutorService} instance used to run JDBC invocations concurrently.
  223.      */
  224.     @Deprecated
  225.     public AsyncQueryRunner(final DataSource ds, final boolean pmdKnownBroken, final ExecutorService executorService) {
  226.         super(ds, pmdKnownBroken);
  227.         this.executorService = executorService;
  228.         this.queryRunner = new QueryRunner(ds, pmdKnownBroken);
  229.     }

  230.     /**
  231.      * @deprecated Use {@link #AsyncQueryRunner(ExecutorService, QueryRunner)} instead.
  232.      * Constructor for AsyncQueryRunner that takes a {@code DataSource}.
  233.      *
  234.      * Methods that do not take a {@code Connection} parameter will retrieve connections from this
  235.      * {@code DataSource}.
  236.      *
  237.      * @param ds The {@code DataSource} to retrieve connections from.
  238.      * @param executorService the {@code ExecutorService} instance used to run JDBC invocations concurrently.
  239.      */
  240.     @Deprecated
  241.     public AsyncQueryRunner(final DataSource ds, final ExecutorService executorService) {
  242.         this(ds, false, executorService);
  243.     }

  244.     /**
  245.      * Constructor for AsyncQueryRunner.
  246.      *
  247.      * @param executorService the {@code ExecutorService} instance used to run JDBC invocations concurrently.
  248.      */
  249.     public AsyncQueryRunner(final ExecutorService executorService) {
  250.         this(null, false, executorService);
  251.     }

  252.     /**
  253.      * Constructor for AsyncQueryRunner which uses a provided ExecutorService and underlying QueryRunner.
  254.      *
  255.      * @param executorService the {@code ExecutorService} instance used to run JDBC invocations concurrently.
  256.      * @param queryRunner the {@code QueryRunner} instance to use for the queries.
  257.      * @since 1.5
  258.      */
  259.     public AsyncQueryRunner(final ExecutorService executorService, final QueryRunner queryRunner) {
  260.         this.executorService = executorService;
  261.         this.queryRunner = queryRunner;
  262.     }

  263.     /**
  264.      * Execute a batch of SQL INSERT, UPDATE, or DELETE queries.
  265.      *
  266.      * @param conn The {@code Connection} to use to run the query.  The caller is
  267.      * responsible for closing this Connection.
  268.      * @param sql The SQL to execute.
  269.      * @param params An array of query replacement parameters.  Each row in
  270.      * this array is one set of batch replacement values.
  271.      * @return A {@code Future} which returns the number of rows updated per statement.
  272.      * @throws SQLException if a database access error occurs
  273.      */
  274.     public Future<int[]> batch(final Connection conn, final String sql, final Object[][] params) throws SQLException {
  275.         return executorService.submit(() -> queryRunner.batch(conn, sql, params));
  276.     }

  277.     /**
  278.      * Execute a batch of SQL INSERT, UPDATE, or DELETE queries.  The
  279.      * {@code Connection} is retrieved from the {@code DataSource}
  280.      * set in the constructor.  This {@code Connection} must be in
  281.      * auto-commit mode or the update will not be saved.
  282.      *
  283.      * @param sql The SQL to execute.
  284.      * @param params An array of query replacement parameters.  Each row in
  285.      * this array is one set of batch replacement values.
  286.      * @return A {@code Future} which returns the number of rows updated per statement.
  287.      * @throws SQLException if a database access error occurs
  288.      */
  289.     public Future<int[]> batch(final String sql, final Object[][] params) throws SQLException {
  290.         return executorService.submit(() -> queryRunner.batch(sql, params));
  291.     }

  292.     /**
  293.      * Executes {@link QueryRunner#insert(Connection, String, ResultSetHandler)} asynchronously.
  294.      *
  295.      * @param <T> Return type expected
  296.      * @param conn {@link Connection} to use to execute the SQL statement
  297.      * @param sql SQL insert statement to execute
  298.      * @param rsh {@link ResultSetHandler} for handling the results
  299.      * @return {@link Future} that executes a query runner insert
  300.      * @see QueryRunner#insert(Connection, String, ResultSetHandler)
  301.      * @throws SQLException if a database access error occurs
  302.      * @since 1.6
  303.      */
  304.     public <T> Future<T> insert(final Connection conn, final String sql, final ResultSetHandler<T> rsh) throws SQLException {
  305.         return executorService.submit(() -> queryRunner.insert(conn, sql, rsh));
  306.     }

  307.     /**
  308.      * Executes {@link QueryRunner#insert(Connection, String, ResultSetHandler, Object...)} asynchronously.
  309.      *
  310.      * @param <T> Return type expected
  311.      * @param conn {@link Connection} to use to execute the SQL statement
  312.      * @param sql SQL insert statement to execute
  313.      * @param rsh {@link ResultSetHandler} for handling the results
  314.      * @param params Parameter values for substitution in the SQL statement
  315.      * @return {@link Future} that executes a query runner insert
  316.      * @see QueryRunner#insert(Connection, String, ResultSetHandler, Object...)
  317.      * @throws SQLException if a database access error occurs
  318.      * @since 1.6
  319.      */
  320.     public <T> Future<T> insert(final Connection conn, final String sql, final ResultSetHandler<T> rsh, final Object... params) throws SQLException {
  321.         return executorService.submit(() -> queryRunner.insert(conn, sql, rsh, params));
  322.     }

  323.     /**
  324.      * Executes {@link QueryRunner#insert(String, ResultSetHandler)} asynchronously.
  325.      *
  326.      * @param <T> Return type expected
  327.      * @param sql SQL insert statement to execute
  328.      * @param rsh {@link ResultSetHandler} for handling the results
  329.      * @return {@link Future} that executes a query runner insert
  330.      * @see QueryRunner#insert(String, ResultSetHandler)
  331.      * @throws SQLException if a database access error occurs
  332.      * @since 1.6
  333.      */
  334.     public <T> Future<T> insert(final String sql, final ResultSetHandler<T> rsh) throws SQLException {
  335.         return executorService.submit(() -> queryRunner.insert(sql, rsh));
  336.     }

  337.     /**
  338.      * Executes {@link QueryRunner#insert(String, ResultSetHandler, Object...)} asynchronously.
  339.      *
  340.      * @param <T> Return type expected
  341.      * @param sql SQL insert statement to execute
  342.      * @param rsh {@link ResultSetHandler} for handling the results
  343.      * @param params Parameter values for substitution in the SQL statement
  344.      * @return {@link Future} that executes a query runner insert
  345.      * @see QueryRunner#insert(String, ResultSetHandler, Object...)
  346.      * @throws SQLException if a database access error occurs
  347.      * @since 1.6
  348.      */
  349.     public <T> Future<T> insert(final String sql, final ResultSetHandler<T> rsh, final Object... params) throws SQLException {
  350.         return executorService.submit(() -> queryRunner.insert(sql, rsh, params));
  351.     }

  352.     /**
  353.      * {@link QueryRunner#insertBatch(Connection, String, ResultSetHandler, Object[][])} asynchronously.
  354.      *
  355.      * @param <T> Return type expected
  356.      * @param conn {@link Connection} to use to execute the SQL statement
  357.      * @param sql SQL insert statement to execute
  358.      * @param rsh {@link ResultSetHandler} for handling the results
  359.      * @param params An array of query replacement parameters.  Each row in
  360.      *        this array is one set of batch replacement values.
  361.      * @return {@link Future} that executes a query runner batch insert
  362.      * @see QueryRunner#insertBatch(Connection, String, ResultSetHandler, Object[][])
  363.      * @throws SQLException if a database access error occurs
  364.      * @since 1.6
  365.      */
  366.     public <T> Future<T> insertBatch(final Connection conn, final String sql, final ResultSetHandler<T> rsh, final Object[][] params) throws SQLException {
  367.         return executorService.submit(() -> queryRunner.insertBatch(conn, sql, rsh, params));
  368.     }

  369.     /**
  370.      * {@link QueryRunner#insertBatch(String, ResultSetHandler, Object[][])} asynchronously.
  371.      *
  372.      * @param <T> Return type expected
  373.      * @param sql SQL insert statement to execute
  374.      * @param rsh {@link ResultSetHandler} for handling the results
  375.      * @param params An array of query replacement parameters.  Each row in
  376.      *        this array is one set of batch replacement values.
  377.      * @return {@link Future} that executes a query runner batch insert
  378.      * @see QueryRunner#insertBatch(String, ResultSetHandler, Object[][])
  379.      * @throws SQLException if a database access error occurs
  380.      * @since 1.6
  381.      */
  382.     public <T> Future<T> insertBatch(final String sql, final ResultSetHandler<T> rsh, final Object[][] params) throws SQLException {
  383.         return executorService.submit(() -> queryRunner.insertBatch(sql, rsh, params));
  384.     }

  385.     /**
  386.      * Execute an SQL SELECT query without any replacement parameters.  The
  387.      * caller is responsible for closing the connection.
  388.      * @param <T> The type of object that the handler returns
  389.      * @param conn The connection to execute the query in.
  390.      * @param sql The query to execute.
  391.      * @param rsh The handler that converts the results into an object.
  392.      * @return A {@code Future} which returns the result of the query call.
  393.      * @throws SQLException if a database access error occurs
  394.      */
  395.     public <T> Future<T> query(final Connection conn, final String sql, final ResultSetHandler<T> rsh) throws SQLException {
  396.         return executorService.submit(() -> queryRunner.query(conn, sql, rsh));
  397.     }

  398.     /**
  399.      * Execute an SQL SELECT query with replacement parameters.  The
  400.      * caller is responsible for closing the connection.
  401.      * @param <T> The type of object that the handler returns
  402.      * @param conn The connection to execute the query in.
  403.      * @param sql The query to execute.
  404.      * @param rsh The handler that converts the results into an object.
  405.      * @param params The replacement parameters.
  406.      * @return A {@code Future} which returns the result of the query call.
  407.      * @throws SQLException if a database access error occurs
  408.      */
  409.     public <T> Future<T> query(final Connection conn, final String sql, final ResultSetHandler<T> rsh, final Object... params)
  410.             throws SQLException {
  411.         return executorService.submit(() -> queryRunner.query(conn, sql, rsh, params));
  412.     }

  413.     /**
  414.      * Executes the given SELECT SQL without any replacement parameters.
  415.      * The {@code Connection} is retrieved from the
  416.      * {@code DataSource} set in the constructor.
  417.      * @param <T> The type of object that the handler returns
  418.      * @param sql The SQL statement to execute.
  419.      * @param rsh The handler used to create the result object from
  420.      * the {@code ResultSet}.
  421.      *
  422.      * @return A {@code Future} which returns the result of the query call.
  423.      * @throws SQLException if a database access error occurs
  424.      */
  425.     public <T> Future<T> query(final String sql, final ResultSetHandler<T> rsh) throws SQLException {
  426.         return executorService.submit(() -> queryRunner.query(sql, rsh));
  427.     }

  428.     /**
  429.      * Executes the given SELECT SQL query and returns a result object.
  430.      * The {@code Connection} is retrieved from the
  431.      * {@code DataSource} set in the constructor.
  432.      * @param <T> The type of object that the handler returns
  433.      * @param sql The SQL statement to execute.
  434.      * @param rsh The handler used to create the result object from
  435.      * the {@code ResultSet}.
  436.      * @param params Initialize the PreparedStatement's IN parameters with
  437.      * this array.
  438.      * @return A {@code Future} which returns the result of the query call.
  439.      * @throws SQLException if a database access error occurs
  440.      */
  441.     public <T> Future<T> query(final String sql, final ResultSetHandler<T> rsh, final Object... params) throws SQLException {
  442.         return executorService.submit(() -> queryRunner.query(sql, rsh, params));
  443.     }

  444.     /**
  445.      * Execute an SQL INSERT, UPDATE, or DELETE query without replacement
  446.      * parameters.
  447.      *
  448.      * @param conn The connection to use to run the query.
  449.      * @param sql The SQL to execute.
  450.      * @return A {@code Future} which returns the number of rows updated.
  451.      * @throws SQLException if a database access error occurs
  452.      */
  453.     public Future<Integer> update(final Connection conn, final String sql) throws SQLException {
  454.         return executorService.submit(() -> Integer.valueOf(queryRunner.update(conn, sql)));
  455.     }

  456.     /**
  457.      * Execute an SQL INSERT, UPDATE, or DELETE query with a single replacement
  458.      * parameter.
  459.      *
  460.      * @param conn The connection to use to run the query.
  461.      * @param sql The SQL to execute.
  462.      * @param param The replacement parameter.
  463.      * @return A {@code Future} which returns the number of rows updated.
  464.      * @throws SQLException if a database access error occurs
  465.      */
  466.     public Future<Integer> update(final Connection conn, final String sql, final Object param) throws SQLException {
  467.         return executorService.submit(() -> Integer.valueOf(queryRunner.update(conn, sql, param)));
  468.     }

  469.     /**
  470.      * Execute an SQL INSERT, UPDATE, or DELETE query.
  471.      *
  472.      * @param conn The connection to use to run the query.
  473.      * @param sql The SQL to execute.
  474.      * @param params The query replacement parameters.
  475.      * @return A {@code Future} which returns the number of rows updated.
  476.      * @throws SQLException if a database access error occurs
  477.      */
  478.     public Future<Integer> update(final Connection conn, final String sql, final Object... params) throws SQLException {
  479.         return executorService.submit(() -> Integer.valueOf(queryRunner.update(conn, sql, params)));
  480.     }

  481.     /**
  482.      * Executes the given INSERT, UPDATE, or DELETE SQL statement without
  483.      * any replacement parameters. The {@code Connection} is retrieved
  484.      * from the {@code DataSource} set in the constructor.  This
  485.      * {@code Connection} must be in auto-commit mode or the update will
  486.      * not be saved.
  487.      *
  488.      * @param sql The SQL statement to execute.
  489.      * @throws SQLException if a database access error occurs
  490.      * @return A {@code Future} which returns the number of rows updated.
  491.      */
  492.     public Future<Integer> update(final String sql) throws SQLException {
  493.         return executorService.submit(() -> Integer.valueOf(queryRunner.update(sql)));
  494.     }

  495.     /**
  496.      * Executes the given INSERT, UPDATE, or DELETE SQL statement with
  497.      * a single replacement parameter.  The {@code Connection} is
  498.      * retrieved from the {@code DataSource} set in the constructor.
  499.      * This {@code Connection} must be in auto-commit mode or the
  500.      * update will not be saved.
  501.      *
  502.      * @param sql The SQL statement to execute.
  503.      * @param param The replacement parameter.
  504.      * @throws SQLException if a database access error occurs
  505.      * @return A {@code Future} which returns the number of rows updated.
  506.      */
  507.     public Future<Integer> update(final String sql, final Object param) throws SQLException {
  508.         return executorService.submit(() -> Integer.valueOf(queryRunner.update(sql, param)));
  509.     }

  510.     /**
  511.      * Executes the given INSERT, UPDATE, or DELETE SQL statement.  The
  512.      * {@code Connection} is retrieved from the {@code DataSource}
  513.      * set in the constructor.  This {@code Connection} must be in
  514.      * auto-commit mode or the update will not be saved.
  515.      *
  516.      * @param sql The SQL statement to execute.
  517.      * @param params Initializes the PreparedStatement's IN (i.e. '?')
  518.      * parameters.
  519.      * @throws SQLException if a database access error occurs
  520.      * @return A {@code Future} which returns the number of rows updated.
  521.      */
  522.     public Future<Integer> update(final String sql, final Object... params) throws SQLException {
  523.         return executorService.submit(() -> Integer.valueOf(queryRunner.update(sql, params)));
  524.     }

  525. }