From 80fa47834f3ef536f553702dee3ddc80e18981bb Mon Sep 17 00:00:00 2001 From: Prashant Mishra <11733935+prash-mi@users.noreply.github.com> Date: Wed, 12 Oct 2022 19:32:02 +0530 Subject: [PATCH] feat: Add executeSelectAsync and Refactor (#2294) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Add executeSelectAsync * Add executeSelectAsync * Add ExecuteSelectResponse * Add executeSelectAsync(...) methods * feat: Add executeSelectAsync * implemented ExecuteSelectResponse Builder * Refactored executeSelect. Added getExecuteSelectResponse * marked getExecuteSelectFuture private * marked getExecuteSelectFuture private * Add UT for Async methods * Added IT for async methods * Removed testFastQueryNullSchema as it is no longer needed * removed dryRun calls as now we wait till the job is complete * Added testExecuteSelectAsyncTimeout * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * updated getExecuteSelectFuture * lint * Add getters and setters for BigQuerySQLException * Add javadoc for overloaded executeSelectAsync and refactored getExecuteSelectFuture to handle BigQuerySQLException * Marked ResultSet and BQSQLException optional * minor refactor: getExecuteSelectFuture * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * update getExecuteSelectFuture * update javadoc Co-authored-by: Owl Bot --- .../clirr-ignored-differences.xml | 12 +- .../com/google/cloud/bigquery/Connection.java | 100 ++++++++ .../google/cloud/bigquery/ConnectionImpl.java | 235 ++++++++++++++---- .../cloud/bigquery/ExecuteSelectResponse.java | 47 ++++ .../cloud/bigquery/ConnectionImplTest.java | 160 +++++++++--- .../cloud/bigquery/it/ITBigQueryTest.java | 152 +++++++++++ 6 files changed, 613 insertions(+), 93 deletions(-) create mode 100644 google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ExecuteSelectResponse.java diff --git a/google-cloud-bigquery/clirr-ignored-differences.xml b/google-cloud-bigquery/clirr-ignored-differences.xml index 6b76f78eb..02cfd9967 100644 --- a/google-cloud-bigquery/clirr-ignored-differences.xml +++ b/google-cloud-bigquery/clirr-ignored-differences.xml @@ -24,4 +24,14 @@ com/google/cloud/bigquery/RoutineInfo* *RemoteFunctionOptions(*) - \ No newline at end of file + + 7012 + com/google/cloud/bigquery/Connection + com.google.common.util.concurrent.ListenableFuture executeSelectAsync(java.lang.String) + + + 7012 + com/google/cloud/bigquery/Connection + com.google.common.util.concurrent.ListenableFuture executeSelectAsync(java.lang.String, java.util.List, java.util.Map[]) + + \ No newline at end of file diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/Connection.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/Connection.java index 109838d8b..3a30d1f7b 100644 --- a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/Connection.java +++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/Connection.java @@ -17,6 +17,7 @@ package com.google.cloud.bigquery; import com.google.api.core.BetaApi; +import com.google.common.util.concurrent.ListenableFuture; import java.util.List; import java.util.Map; @@ -89,4 +90,103 @@ public interface Connection { BigQueryResult executeSelect( String sql, List parameters, Map... labels) throws BigQuerySQLException; + + /** + * Execute a SQL statement that returns a single ResultSet and returns a ListenableFuture to + * process the response asynchronously. + * + *

Example of running a query. + * + *

+   * {
+   *   @code
+   *  ConnectionSettings connectionSettings =
+   *        ConnectionSettings.newBuilder()
+   *            .setUseReadAPI(true)
+   *            .build();
+   *   Connection connection = bigquery.createConnection(connectionSettings);
+   *   String selectQuery = "SELECT corpus FROM `bigquery-public-data.samples.shakespeare` GROUP BY corpus;";
+   * ListenableFuture executeSelectFuture = connection.executeSelectAsync(selectQuery);
+   * ExecuteSelectResponse executeSelectRes = executeSelectFuture.get();
+   *
+   *  if(!executeSelectRes.getIsSuccessful()){
+   * throw executeSelectRes.getBigQuerySQLException();
+   * }
+   *
+   *  BigQueryResult bigQueryResult = executeSelectRes.getBigQueryResult();
+   * ResultSet rs = bigQueryResult.getResultSet();
+   * while (rs.next()) {
+   * System.out.println(rs.getString(1));
+   * }
+   *
+   * 
+ * + * @param sql a static SQL SELECT statement + * @return a ListenableFuture that is used to get the data produced by the query + * @throws BigQuerySQLException upon failure + */ + @BetaApi + ListenableFuture executeSelectAsync(String sql) + throws BigQuerySQLException; + + /** + * Execute a SQL statement that returns a single ResultSet and returns a ListenableFuture to + * process the response asynchronously. + * + *

Example of running a query. + * + *

+   * {
+   *   @code
+   *  ConnectionSettings connectionSettings =
+   *        ConnectionSettings.newBuilder()
+   *            ..setUseReadAPI(true)
+   *            .build();
+   *   Connection connection = bigquery.createConnection(connectionSettings);
+   *     String selectQuery =
+   *         "SELECT TimestampField, StringField, BooleanField FROM "
+   *             + MY_TABLE
+   *             + " WHERE StringField = @stringParam"
+   *             + " AND IntegerField IN UNNEST(@integerList)";
+   *     QueryParameterValue stringParameter = QueryParameterValue.string("stringValue");
+   *     QueryParameterValue intArrayParameter =
+   *         QueryParameterValue.array(new Integer[] {3, 4}, Integer.class);
+   *     Parameter stringParam =
+   *         Parameter.newBuilder().setName("stringParam").setValue(stringParameter).build();
+   *     Parameter intArrayParam =
+   *         Parameter.newBuilder().setName("integerList").setValue(intArrayParameter).build();
+   *     List parameters = ImmutableList.of(stringParam, intArrayParam);
+   *
+   *     ListenableFuture executeSelectFut =
+   *         connection.executeSelectAsync(selectQuery, parameters);
+   * ExecuteSelectResponse executeSelectRes = executeSelectFuture.get();
+   *
+   *  if(!executeSelectRes.getIsSuccessful()){
+   * throw executeSelectRes.getBigQuerySQLException();
+   * }
+   *
+   *  BigQueryResult bigQueryResult = executeSelectRes.getBigQueryResult();
+   * ResultSet rs = bigQueryResult.getResultSet();
+   * while (rs.next()) {
+   * System.out.println(rs.getString(1));
+   * }
+   *
+   * 
+ * + * @param sql SQL SELECT query + * @param parameters named or positional parameters. The set of query parameters must either be + * all positional or all named parameters. + * @param labels (optional) the labels associated with this query. You can use these to organize + * and group your query jobs. Label keys and values can be no longer than 63 characters, can + * only contain lowercase letters, numeric characters, underscores and dashes. International + * characters are allowed. Label values are optional and Label is a Varargs. You should pass + * all the Labels in a single Map .Label keys must start with a letter and each label in the + * list must have a different key. + * @return a ListenableFuture that is used to get the data produced by the query + * @throws BigQuerySQLException upon failure + */ + @BetaApi + ListenableFuture executeSelectAsync( + String sql, List parameters, Map... labels) + throws BigQuerySQLException; } diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ConnectionImpl.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ConnectionImpl.java index 0310832c5..eb0072905 100644 --- a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ConnectionImpl.java +++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ConnectionImpl.java @@ -47,6 +47,11 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import java.io.IOException; import java.util.AbstractList; import java.util.ArrayList; @@ -187,24 +192,7 @@ public BigQueryDryRunResult dryRun(String sql) throws BigQuerySQLException { @BetaApi @Override public BigQueryResult executeSelect(String sql) throws BigQuerySQLException { - try { - // use jobs.query if all the properties of connectionSettings are supported - if (isFastQuerySupported()) { - logger.log(Level.INFO, "\n Using Fast Query Path"); - String projectId = bigQueryOptions.getProjectId(); - QueryRequest queryRequest = createQueryRequest(connectionSettings, sql, null, null); - return queryRpc(projectId, queryRequest, sql, false); - } - // use jobs.insert otherwise - logger.log(Level.INFO, "\n Not Using Fast Query Path, using jobs.insert"); - com.google.api.services.bigquery.model.Job queryJob = - createQueryJob(sql, connectionSettings, null, null); - JobId jobId = JobId.fromPb(queryJob.getJobReference()); - GetQueryResultsResponse firstPage = getQueryResultsFirstPage(jobId); - return getResultSet(firstPage, jobId, sql, false); - } catch (BigQueryException e) { - throw new BigQuerySQLException(e.getMessage(), e, e.getErrors()); - } + return getExecuteSelectResponse(sql, null, null); } /** @@ -227,6 +215,12 @@ public BigQueryResult executeSelect(String sql) throws BigQuerySQLException { public BigQueryResult executeSelect( String sql, List parameters, Map... labels) throws BigQuerySQLException { + return getExecuteSelectResponse(sql, parameters, labels); + } + + private BigQueryResult getExecuteSelectResponse( + String sql, List parameters, Map... labels) + throws BigQuerySQLException { Map labelMap = null; if (labels != null && labels.length == 1) { // We expect label as a key value pair in a single Map @@ -252,29 +246,181 @@ public BigQueryResult executeSelect( throw new BigQuerySQLException(e.getMessage(), e, e.getErrors()); } } + /** + * Execute a SQL statement that returns a single ResultSet and returns a ListenableFuture to + * process the response asynchronously. + * + *

Example of running a query. + * + *

+   * {
+   *   @code
+   *  ConnectionSettings connectionSettings =
+   *        ConnectionSettings.newBuilder()
+   *            .setUseReadAPI(true)
+   *            .build();
+   *   Connection connection = bigquery.createConnection(connectionSettings);
+   *   String selectQuery = "SELECT corpus FROM `bigquery-public-data.samples.shakespeare` GROUP BY corpus;";
+   * ListenableFuture executeSelectFuture = connection.executeSelectAsync(selectQuery);
+   * ExecuteSelectResponse executeSelectRes = executeSelectFuture.get();
+   *
+   *  if(!executeSelectRes.getIsSuccessful()){
+   * throw executeSelectRes.getBigQuerySQLException();
+   * }
+   *
+   *  BigQueryResult bigQueryResult = executeSelectRes.getBigQueryResult();
+   * ResultSet rs = bigQueryResult.getResultSet();
+   * while (rs.next()) {
+   * System.out.println(rs.getString(1));
+   * }
+   *
+   * 
+ * + * @param sql a static SQL SELECT statement + * @return a ListenableFuture that is used to get the data produced by the query + * @throws BigQuerySQLException upon failure + */ + @BetaApi + @Override + public ListenableFuture executeSelectAsync(String sql) + throws BigQuerySQLException { + return getExecuteSelectFuture(sql, null); + } + + /** This method calls the overloaded executeSelect(...) methods and returns a Future */ + private ListenableFuture getExecuteSelectFuture( + String sql, List parameters, Map... labels) + throws BigQuerySQLException { + ExecutorService execService = + Executors.newFixedThreadPool( + 2); // two fixed threads. One for the async operation and the other for processing the + // callback + ListeningExecutorService lExecService = MoreExecutors.listeningDecorator(execService); + ListenableFuture executeSelectFuture = + lExecService.submit( + () -> { + try { + return ExecuteSelectResponse.newBuilder() + .setResultSet( + this.executeSelect( + sql, + parameters, + labels)) // calling the overloaded executeSelect method, it takes care + // of null parameters and labels + .setIsSuccessful(true) + .build(); + } catch (BigQuerySQLException ex) { + return ExecuteSelectResponse + .newBuilder() // passing back the null result with isSuccessful set to false + .setIsSuccessful(false) + .setBigQuerySQLException(ex) + .build(); + } + }); + + Futures.addCallback( + executeSelectFuture, + new FutureCallback() { + public void onSuccess(ExecuteSelectResponse result) { + execService.shutdownNow(); // shutdown the executor service as we do not need it + } + + public void onFailure(Throwable t) { + logger.log( + Level.WARNING, + "\n" + + String.format( + "Async task failed or cancelled with error %s", t.getMessage())); + try { + close(); // attempt to stop the execution as the developer might have called + // Future.cancel() + } catch (BigQuerySQLException e) { + logger.log( + Level.WARNING, + "\n" + + String.format("Exception while closing the connection %s", e.getMessage())); + } + execService.shutdownNow(); // shutdown the executor service as we do not need it + } + }, + execService); + + return executeSelectFuture; + } + + /** + * Execute a SQL statement that returns a single ResultSet and returns a ListenableFuture to + * process the response asynchronously. + * + *

Example of running a query. + * + *

+   * {
+   *   @code
+   *  ConnectionSettings connectionSettings =
+   *        ConnectionSettings.newBuilder()
+   *            ..setUseReadAPI(true)
+   *            .build();
+   *   Connection connection = bigquery.createConnection(connectionSettings);
+   *     String selectQuery =
+   *         "SELECT TimestampField, StringField, BooleanField FROM "
+   *             + MY_TABLE
+   *             + " WHERE StringField = @stringParam"
+   *             + " AND IntegerField IN UNNEST(@integerList)";
+   *     QueryParameterValue stringParameter = QueryParameterValue.string("stringValue");
+   *     QueryParameterValue intArrayParameter =
+   *         QueryParameterValue.array(new Integer[] {3, 4}, Integer.class);
+   *     Parameter stringParam =
+   *         Parameter.newBuilder().setName("stringParam").setValue(stringParameter).build();
+   *     Parameter intArrayParam =
+   *         Parameter.newBuilder().setName("integerList").setValue(intArrayParameter).build();
+   *     List parameters = ImmutableList.of(stringParam, intArrayParam);
+   *
+   *     ListenableFuture executeSelectFut =
+   *         connection.executeSelectAsync(selectQuery, parameters);
+   * ExecuteSelectResponse executeSelectRes = executeSelectFuture.get();
+   *
+   *  if(!executeSelectRes.getIsSuccessful()){
+   * throw executeSelectRes.getBigQuerySQLException();
+   * }
+   *
+   *  BigQueryResult bigQueryResult = executeSelectRes.getBigQueryResult();
+   * ResultSet rs = bigQueryResult.getResultSet();
+   * while (rs.next()) {
+   * System.out.println(rs.getString(1));
+   * }
+   *
+   * 
+ * + * @param sql SQL SELECT query + * @param parameters named or positional parameters. The set of query parameters must either be + * all positional or all named parameters. + * @param labels (optional) the labels associated with this query. You can use these to organize + * and group your query jobs. Label keys and values can be no longer than 63 characters, can + * only contain lowercase letters, numeric characters, underscores and dashes. International + * characters are allowed. Label values are optional and Label is a Varargs. You should pass + * all the Labels in a single Map .Label keys must start with a letter and each label in the + * list must have a different key. + * @return a ListenableFuture that is used to get the data produced by the query + * @throws BigQuerySQLException upon failure + */ + @BetaApi + @Override + public ListenableFuture executeSelectAsync( + String sql, List parameters, Map... labels) + throws BigQuerySQLException { + return getExecuteSelectFuture(sql, parameters, labels); + } @VisibleForTesting BigQueryResult getResultSet( GetQueryResultsResponse firstPage, JobId jobId, String sql, Boolean hasQueryParameters) { - if (firstPage.getJobComplete() - && firstPage.getTotalRows() != null - && firstPage.getSchema() - != null) { // firstPage.getTotalRows() is null if job is not complete. We need to make - // sure that the schema is not null, as it is required for the ResultSet - return getSubsequentQueryResultsWithJob( - firstPage.getTotalRows().longValue(), - (long) firstPage.getRows().size(), - jobId, - firstPage, - hasQueryParameters); - } else { // job is still running, use dryrun to get Schema - com.google.api.services.bigquery.model.Job dryRunJob = createDryRunJob(sql); - Schema schema = Schema.fromPb(dryRunJob.getStatistics().getQuery().getSchema()); - // TODO: check how can we get totalRows and pageRows while the job is still running. - // `firstPage.getTotalRows()` returns null - return getSubsequentQueryResultsWithJob( - null, null, jobId, firstPage, schema, hasQueryParameters); - } + return getSubsequentQueryResultsWithJob( + firstPage.getTotalRows().longValue(), + (long) firstPage.getRows().size(), + jobId, + firstPage, + hasQueryParameters); } static class EndOfFieldValueList @@ -337,21 +483,6 @@ private BigQueryResult queryRpc( results.getJobComplete(), results.getSchema() == null, totalRows, pageRows)); JobId jobId = JobId.fromPb(results.getJobReference()); GetQueryResultsResponse firstPage = getQueryResultsFirstPage(jobId); - // We might get null schema from the backend occasionally. Ref: - // https://github.com/googleapis/java-bigquery/issues/2103/. Using queryDryRun in such cases - // to get the schema - if (firstPage.getSchema() == null) { // get schema using dry run - // Log the status if the job was complete complete - logger.log( - Level.WARNING, - "\n" - + "Received null schema, Using dryRun the get the Schema. jobComplete:" - + firstPage.getJobComplete()); - com.google.api.services.bigquery.model.Job dryRunJob = createDryRunJob(sql); - Schema schema = Schema.fromPb(dryRunJob.getStatistics().getQuery().getSchema()); - return getSubsequentQueryResultsWithJob( - totalRows, pageRows, jobId, firstPage, schema, hasQueryParameters); - } return getSubsequentQueryResultsWithJob( totalRows, pageRows, jobId, firstPage, hasQueryParameters); } diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ExecuteSelectResponse.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ExecuteSelectResponse.java new file mode 100644 index 000000000..59745020f --- /dev/null +++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ExecuteSelectResponse.java @@ -0,0 +1,47 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.bigquery; + +import com.google.auto.value.AutoValue; +import java.io.Serializable; +import javax.annotation.Nullable; + +@AutoValue +public abstract class ExecuteSelectResponse implements Serializable { + @Nullable + public abstract BigQueryResult getResultSet(); + + public abstract boolean getIsSuccessful(); + + @Nullable + public abstract BigQuerySQLException getBigQuerySQLException(); + + public static Builder newBuilder() { + return new AutoValue_ExecuteSelectResponse.Builder(); + } + + @AutoValue.Builder + public abstract static class Builder { + public abstract ExecuteSelectResponse build(); + + public abstract Builder setResultSet(BigQueryResult bigQueryResult); + + public abstract Builder setIsSuccessful(boolean isSuccessful); + + public abstract Builder setBigQuerySQLException(BigQuerySQLException bigQuerySQLException); + } +} diff --git a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/ConnectionImplTest.java b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/ConnectionImplTest.java index 4b1b93487..4b379629c 100644 --- a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/ConnectionImplTest.java +++ b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/ConnectionImplTest.java @@ -29,6 +29,7 @@ import com.google.cloud.bigquery.spi.BigQueryRpcFactory; import com.google.cloud.bigquery.spi.v2.BigQueryRpc; import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.ListenableFuture; import java.math.BigInteger; import java.sql.SQLException; import java.util.AbstractList; @@ -37,6 +38,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutionException; import java.util.concurrent.LinkedBlockingDeque; import org.junit.Before; import org.junit.Test; @@ -365,62 +367,39 @@ public void testLegacyQuerySinglePage() throws BigQuerySQLException { .createJobForQuery(any(com.google.api.services.bigquery.model.Job.class)); } - // calls executeSelect with a Fast query and emulates that no schema is returned with the first - // page + // exercises getSubsequentQueryResultsWithJob for fast running queries @Test - public void testFastQueryNullSchema() throws BigQuerySQLException { + public void testFastQueryLongRunning() throws SQLException { ConnectionImpl connectionSpy = Mockito.spy(connection); - QueryRequest queryReqMock = new QueryRequest(); - com.google.api.services.bigquery.model.JobStatistics stats = - new com.google.api.services.bigquery.model.JobStatistics() - .setQuery(new JobStatistics2().setSchema(FAST_QUERY_TABLESCHEMA)); - com.google.api.services.bigquery.model.Job jobResponseMock = - new com.google.api.services.bigquery.model.Job() - // .setConfiguration(QUERY_JOB.g) - .setJobReference(QUERY_JOB.toPb()) - .setId(JOB) - .setStatus(new com.google.api.services.bigquery.model.JobStatus().setState("DONE")) - .setStatistics(stats); - // emulating a legacy query + // emulating a fast query doReturn(true).when(connectionSpy).isFastQuerySupported(); + doReturn(GET_QUERY_RESULTS_RESPONSE) + .when(connectionSpy) + .getQueryResultsFirstPage(any(JobId.class)); + + doReturn(TABLE_NAME).when(connectionSpy).getDestinationTable(any(JobId.class)); + doReturn(BQ_RS_MOCK_RES) + .when(connectionSpy) + .tableDataList(any(GetQueryResultsResponse.class), any(JobId.class)); + com.google.api.services.bigquery.model.QueryResponse mockQueryRes = new QueryResponse() .setSchema(FAST_QUERY_TABLESCHEMA) - .setJobComplete(false) // so that it goes to the else part in queryRpc + .setJobComplete(false) .setTotalRows(new BigInteger(String.valueOf(4L))) .setJobReference(QUERY_JOB.toPb()) .setRows(TABLE_ROWS); when(bigqueryRpcMock.queryRpc(any(String.class), any(QueryRequest.class))) .thenReturn(mockQueryRes); - doReturn(GET_QUERY_RESULTS_RESPONSE_NULL_SCHEMA) // wiring the null schema for the test case - .when(connectionSpy) - .getQueryResultsFirstPage(any(JobId.class)); - doReturn(BQ_RS_MOCK_RES) - .when(connectionSpy) - .getSubsequentQueryResultsWithJob( - any(Long.class), - any(Long.class), - any(JobId.class), - any(GetQueryResultsResponse.class), - any(Schema.class), - any(Boolean.class)); - doReturn(jobResponseMock).when(connectionSpy).createDryRunJob(any(String.class)); BigQueryResult res = connectionSpy.executeSelect(SQL_QUERY); assertEquals(res.getTotalRows(), 2); assertEquals(QUERY_SCHEMA, res.getSchema()); - verify(connectionSpy, times(1)) - .getSubsequentQueryResultsWithJob( - any(Long.class), - any(Long.class), - any(JobId.class), - any(GetQueryResultsResponse.class), - any(Schema.class), - any(Boolean.class)); + verify(bigqueryRpcMock, times(1)).queryRpc(any(String.class), any(QueryRequest.class)); } - // exercises getSubsequentQueryResultsWithJob for fast running queries @Test - public void testFastQueryLongRunning() throws SQLException { + public void testFastQueryLongRunningAsync() + throws SQLException, ExecutionException, InterruptedException { ConnectionImpl connectionSpy = Mockito.spy(connection); // emulating a fast query doReturn(true).when(connectionSpy).isFastQuerySupported(); @@ -442,12 +421,113 @@ public void testFastQueryLongRunning() throws SQLException { .setRows(TABLE_ROWS); when(bigqueryRpcMock.queryRpc(any(String.class), any(QueryRequest.class))) .thenReturn(mockQueryRes); - BigQueryResult res = connectionSpy.executeSelect(SQL_QUERY); + ListenableFuture executeSelectFut = + connectionSpy.executeSelectAsync(SQL_QUERY); + ExecuteSelectResponse exSelRes = executeSelectFut.get(); + BigQueryResult res = exSelRes.getResultSet(); assertEquals(res.getTotalRows(), 2); assertEquals(QUERY_SCHEMA, res.getSchema()); + assertTrue(exSelRes.getIsSuccessful()); verify(bigqueryRpcMock, times(1)).queryRpc(any(String.class), any(QueryRequest.class)); } + @Test + public void testFastQuerySinglePageAsync() + throws BigQuerySQLException, ExecutionException, InterruptedException { + com.google.api.services.bigquery.model.QueryResponse mockQueryRes = + new QueryResponse().setSchema(FAST_QUERY_TABLESCHEMA).setJobComplete(true); + when(bigqueryRpcMock.queryRpc(any(String.class), any(QueryRequest.class))) + .thenReturn(mockQueryRes); + ConnectionImpl connectionSpy = Mockito.spy(connection); + doReturn(BQ_RS_MOCK_RES) + .when(connectionSpy) + .processQueryResponseResults(any(QueryResponse.class)); + + ListenableFuture executeSelectFut = + connectionSpy.executeSelectAsync(SQL_QUERY); + ExecuteSelectResponse exSelRes = executeSelectFut.get(); + BigQueryResult res = exSelRes.getResultSet(); + assertEquals(res.getTotalRows(), 2); + assertEquals(QUERY_SCHEMA, res.getSchema()); + assertTrue(exSelRes.getIsSuccessful()); + verify(connectionSpy, times(1)) + .processQueryResponseResults( + any(com.google.api.services.bigquery.model.QueryResponse.class)); + } + + @Test + public void testExecuteSelectSlowWithParamsAsync() + throws BigQuerySQLException, ExecutionException, InterruptedException { + ConnectionImpl connectionSpy = Mockito.spy(connection); + List parameters = new ArrayList<>(); + Map labels = new HashMap<>(); + doReturn(false).when(connectionSpy).isFastQuerySupported(); + com.google.api.services.bigquery.model.JobStatistics jobStatistics = + new com.google.api.services.bigquery.model.JobStatistics(); + com.google.api.services.bigquery.model.Job jobResponseMock = + new com.google.api.services.bigquery.model.Job() + .setJobReference(QUERY_JOB.toPb()) + .setId(JOB) + .setStatus(new com.google.api.services.bigquery.model.JobStatus().setState("DONE")) + .setStatistics(jobStatistics); + + doReturn(jobResponseMock) + .when(connectionSpy) + .createQueryJob(SQL_QUERY, connectionSettings, parameters, labels); + doReturn(GET_QUERY_RESULTS_RESPONSE) + .when(connectionSpy) + .getQueryResultsFirstPage(any(JobId.class)); + doReturn(BQ_RS_MOCK_RES) + .when(connectionSpy) + .getResultSet( + any(GetQueryResultsResponse.class), + any(JobId.class), + any(String.class), + any(Boolean.class)); + ListenableFuture executeSelectFut = + connectionSpy.executeSelectAsync(SQL_QUERY, parameters, labels); + ExecuteSelectResponse exSelRes = executeSelectFut.get(); + BigQueryResult res = exSelRes.getResultSet(); + assertTrue(exSelRes.getIsSuccessful()); + assertEquals(res.getTotalRows(), 2); + assertEquals(QUERY_SCHEMA, res.getSchema()); + verify(connectionSpy, times(1)) + .getResultSet( + any(GetQueryResultsResponse.class), + any(JobId.class), + any(String.class), + any(Boolean.class)); + } + + @Test + public void testFastQueryMultiplePagesAsync() + throws BigQuerySQLException, ExecutionException, InterruptedException { + com.google.api.services.bigquery.model.QueryResponse mockQueryRes = + new QueryResponse() + .setSchema(FAST_QUERY_TABLESCHEMA) + .setJobComplete(true) + .setPageToken(PAGE_TOKEN); + when(bigqueryRpcMock.queryRpc(any(String.class), any(QueryRequest.class))) + .thenReturn(mockQueryRes); + ConnectionImpl connectionSpy = Mockito.spy(connection); + + doReturn(BQ_RS_MOCK_RES_MULTI_PAGE) + .when(connectionSpy) + .processQueryResponseResults( + any(com.google.api.services.bigquery.model.QueryResponse.class)); + + ListenableFuture executeSelectFut = + connectionSpy.executeSelectAsync(SQL_QUERY); + ExecuteSelectResponse exSelRes = executeSelectFut.get(); + BigQueryResult res = exSelRes.getResultSet(); + assertTrue(exSelRes.getIsSuccessful()); + assertEquals(res.getTotalRows(), 4); + assertEquals(QUERY_SCHEMA, res.getSchema()); + verify(connectionSpy, times(1)) + .processQueryResponseResults( + any(com.google.api.services.bigquery.model.QueryResponse.class)); + } + @Test // Emulates first page response using getQueryResultsFirstPage(jobId) and then subsequent pages // using getQueryResultsFirstPage(jobId) getSubsequentQueryResultsWithJob( diff --git a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java index 9a4b18a48..8e8f4cc0c 100644 --- a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java +++ b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java @@ -62,6 +62,7 @@ import com.google.cloud.bigquery.Dataset; import com.google.cloud.bigquery.DatasetId; import com.google.cloud.bigquery.DatasetInfo; +import com.google.cloud.bigquery.ExecuteSelectResponse; import com.google.cloud.bigquery.ExternalTableDefinition; import com.google.cloud.bigquery.ExtractJobConfiguration; import com.google.cloud.bigquery.Field; @@ -135,6 +136,7 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Sets; import com.google.common.io.BaseEncoding; +import com.google.common.util.concurrent.ListenableFuture; import com.google.gson.JsonObject; import java.io.IOException; import java.io.InputStream; @@ -157,6 +159,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -2789,6 +2792,155 @@ public void testReadAPIIterationAndOrder() connection.close(); } + @Test + public void testReadAPIIterationAndOrderAsync() + throws SQLException, ExecutionException, + InterruptedException { // use read API to read 300K records and check the order + String query = + "SELECT date, county, state_name, confirmed_cases, deaths FROM " + + TABLE_ID_LARGE.getTable() + + " where date is not null and county is not null and state_name is not null order by confirmed_cases asc limit 300000"; + + ConnectionSettings connectionSettings = + ConnectionSettings.newBuilder() + .setDefaultDataset(DatasetId.of(DATASET)) + .setPriority( + QueryJobConfiguration.Priority + .INTERACTIVE) // required for this integration test so that isFastQuerySupported + // returns false + .build(); + Connection connection = bigquery.createConnection(connectionSettings); + + ListenableFuture executeSelectFut = connection.executeSelectAsync(query); + ExecuteSelectResponse exSelRes = executeSelectFut.get(); + BigQueryResult bigQueryResult = exSelRes.getResultSet(); + ResultSet rs = bigQueryResult.getResultSet(); + int cnt = 0; + int lasConfirmedCases = Integer.MIN_VALUE; + while (rs.next()) { // pagination starts after approx 120,000 records + assertNotNull(rs.getDate(0)); + assertNotNull(rs.getString(1)); + assertNotNull(rs.getString(2)); + assertTrue(rs.getInt(3) >= 0); + assertTrue(rs.getInt(4) >= 0); + + // check if the records are sorted + assertTrue(rs.getInt(3) >= lasConfirmedCases); + lasConfirmedCases = rs.getInt(3); + ++cnt; + } + assertEquals(300000, cnt); // total 300000 rows should be read + connection.close(); + } + + @Test + // Cancel the future and check if the operations got cancelled. Tests the wiring of future + // callback. + // TODO(prasmish): Remove this test case if it turns out to be flaky, as expecting the process to + // be uncompleted in 1000ms is nondeterministic! Though very likely it won't be complete in the + // specified amount of time + public void testExecuteSelectAsyncCancel() + throws SQLException, ExecutionException, + InterruptedException { // use read API to read 300K records and check the order + String query = + "SELECT date, county, state_name, confirmed_cases, deaths FROM " + + TABLE_ID_LARGE.getTable() + + " where date is not null and county is not null and state_name is not null order by confirmed_cases asc limit 300000"; + + ConnectionSettings connectionSettings = + ConnectionSettings.newBuilder() + .setDefaultDataset(DatasetId.of(DATASET)) + .setPriority( + QueryJobConfiguration.Priority + .INTERACTIVE) // required for this integration test so that isFastQuerySupported + // returns false + .build(); + Connection connection = bigquery.createConnection(connectionSettings); + + ListenableFuture executeSelectFut = connection.executeSelectAsync(query); + + // Cancel the future with 1000ms delay + Thread testCloseAsync = + new Thread( + () -> { + try { + Thread.sleep(1000); + executeSelectFut.cancel(true); + } catch (InterruptedException e) { + assertNotNull(e); + } + }); + testCloseAsync.start(); + + try { + executeSelectFut.get(); + fail(); // this line should not be reached + } catch (CancellationException e) { + assertNotNull(e); + } + } + + @Test + // Timeouts the future and check if the operations got cancelled. + // TODO(prasmish): Remove this test case if it turns out to be flaky, as expecting the process to + // be uncompleted in 1000ms is nondeterministic! Though very likely it won't be complete in the + // specified amount of time + public void testExecuteSelectAsyncTimeout() + throws SQLException, ExecutionException, + InterruptedException { // use read API to read 300K records and check the order + String query = + "SELECT date, county, state_name, confirmed_cases, deaths FROM " + + TABLE_ID_LARGE.getTable() + + " where date is not null and county is not null and state_name is not null order by confirmed_cases asc limit 300000"; + + ConnectionSettings connectionSettings = + ConnectionSettings.newBuilder() + .setDefaultDataset(DatasetId.of(DATASET)) + .setPriority( + QueryJobConfiguration.Priority + .INTERACTIVE) // required for this integration test so that isFastQuerySupported + // returns false + .build(); + Connection connection = bigquery.createConnection(connectionSettings); + + ListenableFuture executeSelectFut = connection.executeSelectAsync(query); + + try { + executeSelectFut.get(1000, TimeUnit.MILLISECONDS); + fail(); // this line should not be reached + } catch (CancellationException | TimeoutException e) { + assertNotNull(e); + } + } + + @Test + public void testExecuteSelectWithNamedQueryParametersAsync() + throws BigQuerySQLException, ExecutionException, InterruptedException { + String query = + "SELECT TimestampField, StringField, BooleanField FROM " + + TABLE_ID.getTable() + + " WHERE StringField = @stringParam" + + " AND IntegerField IN UNNEST(@integerList)"; + QueryParameterValue stringParameter = QueryParameterValue.string("stringValue"); + QueryParameterValue intArrayParameter = + QueryParameterValue.array(new Integer[] {3, 4}, Integer.class); + Parameter stringParam = + Parameter.newBuilder().setName("stringParam").setValue(stringParameter).build(); + Parameter intArrayParam = + Parameter.newBuilder().setName("integerList").setValue(intArrayParameter).build(); + + ConnectionSettings connectionSettings = + ConnectionSettings.newBuilder().setDefaultDataset(DatasetId.of(DATASET)).build(); + Connection connection = bigquery.createConnection(connectionSettings); + List parameters = ImmutableList.of(stringParam, intArrayParam); + + ListenableFuture executeSelectFut = + connection.executeSelectAsync(query, parameters); + ExecuteSelectResponse exSelRes = executeSelectFut.get(); + BigQueryResult rs = exSelRes.getResultSet(); + assertEquals(2, rs.getTotalRows()); + } + // Ref: https://github.com/googleapis/java-bigquery/issues/2070. Adding a pre-submit test to see // if bigquery.createConnection() returns null @Test