... labels)
throws BigQuerySQLException;
+
+ /**
+ * Execute a SQL statement that returns a single ResultSet and returns a ListenableFuture to
+ * process the response asynchronously.
+ *
+ * Example of running a query.
+ *
+ *
+ * {
+ * @code
+ * ConnectionSettings connectionSettings =
+ * ConnectionSettings.newBuilder()
+ * .setUseReadAPI(true)
+ * .build();
+ * Connection connection = bigquery.createConnection(connectionSettings);
+ * String selectQuery = "SELECT corpus FROM `bigquery-public-data.samples.shakespeare` GROUP BY corpus;";
+ * ListenableFuture executeSelectFuture = connection.executeSelectAsync(selectQuery);
+ * ExecuteSelectResponse executeSelectRes = executeSelectFuture.get();
+ *
+ * if(!executeSelectRes.getIsSuccessful()){
+ * throw executeSelectRes.getBigQuerySQLException();
+ * }
+ *
+ * BigQueryResult bigQueryResult = executeSelectRes.getBigQueryResult();
+ * ResultSet rs = bigQueryResult.getResultSet();
+ * while (rs.next()) {
+ * System.out.println(rs.getString(1));
+ * }
+ *
+ *
+ *
+ * @param sql a static SQL SELECT statement
+ * @return a ListenableFuture that is used to get the data produced by the query
+ * @throws BigQuerySQLException upon failure
+ */
+ @BetaApi
+ ListenableFuture executeSelectAsync(String sql)
+ throws BigQuerySQLException;
+
+ /**
+ * Execute a SQL statement that returns a single ResultSet and returns a ListenableFuture to
+ * process the response asynchronously.
+ *
+ * Example of running a query.
+ *
+ *
+ * {
+ * @code
+ * ConnectionSettings connectionSettings =
+ * ConnectionSettings.newBuilder()
+ * ..setUseReadAPI(true)
+ * .build();
+ * Connection connection = bigquery.createConnection(connectionSettings);
+ * String selectQuery =
+ * "SELECT TimestampField, StringField, BooleanField FROM "
+ * + MY_TABLE
+ * + " WHERE StringField = @stringParam"
+ * + " AND IntegerField IN UNNEST(@integerList)";
+ * QueryParameterValue stringParameter = QueryParameterValue.string("stringValue");
+ * QueryParameterValue intArrayParameter =
+ * QueryParameterValue.array(new Integer[] {3, 4}, Integer.class);
+ * Parameter stringParam =
+ * Parameter.newBuilder().setName("stringParam").setValue(stringParameter).build();
+ * Parameter intArrayParam =
+ * Parameter.newBuilder().setName("integerList").setValue(intArrayParameter).build();
+ * List parameters = ImmutableList.of(stringParam, intArrayParam);
+ *
+ * ListenableFuture executeSelectFut =
+ * connection.executeSelectAsync(selectQuery, parameters);
+ * ExecuteSelectResponse executeSelectRes = executeSelectFuture.get();
+ *
+ * if(!executeSelectRes.getIsSuccessful()){
+ * throw executeSelectRes.getBigQuerySQLException();
+ * }
+ *
+ * BigQueryResult bigQueryResult = executeSelectRes.getBigQueryResult();
+ * ResultSet rs = bigQueryResult.getResultSet();
+ * while (rs.next()) {
+ * System.out.println(rs.getString(1));
+ * }
+ *
+ *
+ *
+ * @param sql SQL SELECT query
+ * @param parameters named or positional parameters. The set of query parameters must either be
+ * all positional or all named parameters.
+ * @param labels (optional) the labels associated with this query. You can use these to organize
+ * and group your query jobs. Label keys and values can be no longer than 63 characters, can
+ * only contain lowercase letters, numeric characters, underscores and dashes. International
+ * characters are allowed. Label values are optional and Label is a Varargs. You should pass
+ * all the Labels in a single Map .Label keys must start with a letter and each label in the
+ * list must have a different key.
+ * @return a ListenableFuture that is used to get the data produced by the query
+ * @throws BigQuerySQLException upon failure
+ */
+ @BetaApi
+ ListenableFuture executeSelectAsync(
+ String sql, List parameters, Map... labels)
+ throws BigQuerySQLException;
}
diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ConnectionImpl.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ConnectionImpl.java
index 0310832c5..eb0072905 100644
--- a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ConnectionImpl.java
+++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ConnectionImpl.java
@@ -47,6 +47,11 @@
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import java.io.IOException;
import java.util.AbstractList;
import java.util.ArrayList;
@@ -187,24 +192,7 @@ public BigQueryDryRunResult dryRun(String sql) throws BigQuerySQLException {
@BetaApi
@Override
public BigQueryResult executeSelect(String sql) throws BigQuerySQLException {
- try {
- // use jobs.query if all the properties of connectionSettings are supported
- if (isFastQuerySupported()) {
- logger.log(Level.INFO, "\n Using Fast Query Path");
- String projectId = bigQueryOptions.getProjectId();
- QueryRequest queryRequest = createQueryRequest(connectionSettings, sql, null, null);
- return queryRpc(projectId, queryRequest, sql, false);
- }
- // use jobs.insert otherwise
- logger.log(Level.INFO, "\n Not Using Fast Query Path, using jobs.insert");
- com.google.api.services.bigquery.model.Job queryJob =
- createQueryJob(sql, connectionSettings, null, null);
- JobId jobId = JobId.fromPb(queryJob.getJobReference());
- GetQueryResultsResponse firstPage = getQueryResultsFirstPage(jobId);
- return getResultSet(firstPage, jobId, sql, false);
- } catch (BigQueryException e) {
- throw new BigQuerySQLException(e.getMessage(), e, e.getErrors());
- }
+ return getExecuteSelectResponse(sql, null, null);
}
/**
@@ -227,6 +215,12 @@ public BigQueryResult executeSelect(String sql) throws BigQuerySQLException {
public BigQueryResult executeSelect(
String sql, List parameters, Map... labels)
throws BigQuerySQLException {
+ return getExecuteSelectResponse(sql, parameters, labels);
+ }
+
+ private BigQueryResult getExecuteSelectResponse(
+ String sql, List parameters, Map... labels)
+ throws BigQuerySQLException {
Map labelMap = null;
if (labels != null
&& labels.length == 1) { // We expect label as a key value pair in a single Map
@@ -252,29 +246,181 @@ public BigQueryResult executeSelect(
throw new BigQuerySQLException(e.getMessage(), e, e.getErrors());
}
}
+ /**
+ * Execute a SQL statement that returns a single ResultSet and returns a ListenableFuture to
+ * process the response asynchronously.
+ *
+ * Example of running a query.
+ *
+ *
+ * {
+ * @code
+ * ConnectionSettings connectionSettings =
+ * ConnectionSettings.newBuilder()
+ * .setUseReadAPI(true)
+ * .build();
+ * Connection connection = bigquery.createConnection(connectionSettings);
+ * String selectQuery = "SELECT corpus FROM `bigquery-public-data.samples.shakespeare` GROUP BY corpus;";
+ * ListenableFuture executeSelectFuture = connection.executeSelectAsync(selectQuery);
+ * ExecuteSelectResponse executeSelectRes = executeSelectFuture.get();
+ *
+ * if(!executeSelectRes.getIsSuccessful()){
+ * throw executeSelectRes.getBigQuerySQLException();
+ * }
+ *
+ * BigQueryResult bigQueryResult = executeSelectRes.getBigQueryResult();
+ * ResultSet rs = bigQueryResult.getResultSet();
+ * while (rs.next()) {
+ * System.out.println(rs.getString(1));
+ * }
+ *
+ *
+ *
+ * @param sql a static SQL SELECT statement
+ * @return a ListenableFuture that is used to get the data produced by the query
+ * @throws BigQuerySQLException upon failure
+ */
+ @BetaApi
+ @Override
+ public ListenableFuture executeSelectAsync(String sql)
+ throws BigQuerySQLException {
+ return getExecuteSelectFuture(sql, null);
+ }
+
+ /** This method calls the overloaded executeSelect(...) methods and returns a Future */
+ private ListenableFuture getExecuteSelectFuture(
+ String sql, List parameters, Map... labels)
+ throws BigQuerySQLException {
+ ExecutorService execService =
+ Executors.newFixedThreadPool(
+ 2); // two fixed threads. One for the async operation and the other for processing the
+ // callback
+ ListeningExecutorService lExecService = MoreExecutors.listeningDecorator(execService);
+ ListenableFuture executeSelectFuture =
+ lExecService.submit(
+ () -> {
+ try {
+ return ExecuteSelectResponse.newBuilder()
+ .setResultSet(
+ this.executeSelect(
+ sql,
+ parameters,
+ labels)) // calling the overloaded executeSelect method, it takes care
+ // of null parameters and labels
+ .setIsSuccessful(true)
+ .build();
+ } catch (BigQuerySQLException ex) {
+ return ExecuteSelectResponse
+ .newBuilder() // passing back the null result with isSuccessful set to false
+ .setIsSuccessful(false)
+ .setBigQuerySQLException(ex)
+ .build();
+ }
+ });
+
+ Futures.addCallback(
+ executeSelectFuture,
+ new FutureCallback() {
+ public void onSuccess(ExecuteSelectResponse result) {
+ execService.shutdownNow(); // shutdown the executor service as we do not need it
+ }
+
+ public void onFailure(Throwable t) {
+ logger.log(
+ Level.WARNING,
+ "\n"
+ + String.format(
+ "Async task failed or cancelled with error %s", t.getMessage()));
+ try {
+ close(); // attempt to stop the execution as the developer might have called
+ // Future.cancel()
+ } catch (BigQuerySQLException e) {
+ logger.log(
+ Level.WARNING,
+ "\n"
+ + String.format("Exception while closing the connection %s", e.getMessage()));
+ }
+ execService.shutdownNow(); // shutdown the executor service as we do not need it
+ }
+ },
+ execService);
+
+ return executeSelectFuture;
+ }
+
+ /**
+ * Execute a SQL statement that returns a single ResultSet and returns a ListenableFuture to
+ * process the response asynchronously.
+ *
+ * Example of running a query.
+ *
+ *
+ * {
+ * @code
+ * ConnectionSettings connectionSettings =
+ * ConnectionSettings.newBuilder()
+ * ..setUseReadAPI(true)
+ * .build();
+ * Connection connection = bigquery.createConnection(connectionSettings);
+ * String selectQuery =
+ * "SELECT TimestampField, StringField, BooleanField FROM "
+ * + MY_TABLE
+ * + " WHERE StringField = @stringParam"
+ * + " AND IntegerField IN UNNEST(@integerList)";
+ * QueryParameterValue stringParameter = QueryParameterValue.string("stringValue");
+ * QueryParameterValue intArrayParameter =
+ * QueryParameterValue.array(new Integer[] {3, 4}, Integer.class);
+ * Parameter stringParam =
+ * Parameter.newBuilder().setName("stringParam").setValue(stringParameter).build();
+ * Parameter intArrayParam =
+ * Parameter.newBuilder().setName("integerList").setValue(intArrayParameter).build();
+ * List parameters = ImmutableList.of(stringParam, intArrayParam);
+ *
+ * ListenableFuture executeSelectFut =
+ * connection.executeSelectAsync(selectQuery, parameters);
+ * ExecuteSelectResponse executeSelectRes = executeSelectFuture.get();
+ *
+ * if(!executeSelectRes.getIsSuccessful()){
+ * throw executeSelectRes.getBigQuerySQLException();
+ * }
+ *
+ * BigQueryResult bigQueryResult = executeSelectRes.getBigQueryResult();
+ * ResultSet rs = bigQueryResult.getResultSet();
+ * while (rs.next()) {
+ * System.out.println(rs.getString(1));
+ * }
+ *
+ *
+ *
+ * @param sql SQL SELECT query
+ * @param parameters named or positional parameters. The set of query parameters must either be
+ * all positional or all named parameters.
+ * @param labels (optional) the labels associated with this query. You can use these to organize
+ * and group your query jobs. Label keys and values can be no longer than 63 characters, can
+ * only contain lowercase letters, numeric characters, underscores and dashes. International
+ * characters are allowed. Label values are optional and Label is a Varargs. You should pass
+ * all the Labels in a single Map .Label keys must start with a letter and each label in the
+ * list must have a different key.
+ * @return a ListenableFuture that is used to get the data produced by the query
+ * @throws BigQuerySQLException upon failure
+ */
+ @BetaApi
+ @Override
+ public ListenableFuture executeSelectAsync(
+ String sql, List parameters, Map... labels)
+ throws BigQuerySQLException {
+ return getExecuteSelectFuture(sql, parameters, labels);
+ }
@VisibleForTesting
BigQueryResult getResultSet(
GetQueryResultsResponse firstPage, JobId jobId, String sql, Boolean hasQueryParameters) {
- if (firstPage.getJobComplete()
- && firstPage.getTotalRows() != null
- && firstPage.getSchema()
- != null) { // firstPage.getTotalRows() is null if job is not complete. We need to make
- // sure that the schema is not null, as it is required for the ResultSet
- return getSubsequentQueryResultsWithJob(
- firstPage.getTotalRows().longValue(),
- (long) firstPage.getRows().size(),
- jobId,
- firstPage,
- hasQueryParameters);
- } else { // job is still running, use dryrun to get Schema
- com.google.api.services.bigquery.model.Job dryRunJob = createDryRunJob(sql);
- Schema schema = Schema.fromPb(dryRunJob.getStatistics().getQuery().getSchema());
- // TODO: check how can we get totalRows and pageRows while the job is still running.
- // `firstPage.getTotalRows()` returns null
- return getSubsequentQueryResultsWithJob(
- null, null, jobId, firstPage, schema, hasQueryParameters);
- }
+ return getSubsequentQueryResultsWithJob(
+ firstPage.getTotalRows().longValue(),
+ (long) firstPage.getRows().size(),
+ jobId,
+ firstPage,
+ hasQueryParameters);
}
static class EndOfFieldValueList
@@ -337,21 +483,6 @@ private BigQueryResult queryRpc(
results.getJobComplete(), results.getSchema() == null, totalRows, pageRows));
JobId jobId = JobId.fromPb(results.getJobReference());
GetQueryResultsResponse firstPage = getQueryResultsFirstPage(jobId);
- // We might get null schema from the backend occasionally. Ref:
- // https://github.com/googleapis/java-bigquery/issues/2103/. Using queryDryRun in such cases
- // to get the schema
- if (firstPage.getSchema() == null) { // get schema using dry run
- // Log the status if the job was complete complete
- logger.log(
- Level.WARNING,
- "\n"
- + "Received null schema, Using dryRun the get the Schema. jobComplete:"
- + firstPage.getJobComplete());
- com.google.api.services.bigquery.model.Job dryRunJob = createDryRunJob(sql);
- Schema schema = Schema.fromPb(dryRunJob.getStatistics().getQuery().getSchema());
- return getSubsequentQueryResultsWithJob(
- totalRows, pageRows, jobId, firstPage, schema, hasQueryParameters);
- }
return getSubsequentQueryResultsWithJob(
totalRows, pageRows, jobId, firstPage, hasQueryParameters);
}
diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ExecuteSelectResponse.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ExecuteSelectResponse.java
new file mode 100644
index 000000000..59745020f
--- /dev/null
+++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ExecuteSelectResponse.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2022 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.bigquery;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import javax.annotation.Nullable;
+
+@AutoValue
+public abstract class ExecuteSelectResponse implements Serializable {
+ @Nullable
+ public abstract BigQueryResult getResultSet();
+
+ public abstract boolean getIsSuccessful();
+
+ @Nullable
+ public abstract BigQuerySQLException getBigQuerySQLException();
+
+ public static Builder newBuilder() {
+ return new AutoValue_ExecuteSelectResponse.Builder();
+ }
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+ public abstract ExecuteSelectResponse build();
+
+ public abstract Builder setResultSet(BigQueryResult bigQueryResult);
+
+ public abstract Builder setIsSuccessful(boolean isSuccessful);
+
+ public abstract Builder setBigQuerySQLException(BigQuerySQLException bigQuerySQLException);
+ }
+}
diff --git a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/ConnectionImplTest.java b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/ConnectionImplTest.java
index 4b1b93487..4b379629c 100644
--- a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/ConnectionImplTest.java
+++ b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/ConnectionImplTest.java
@@ -29,6 +29,7 @@
import com.google.cloud.bigquery.spi.BigQueryRpcFactory;
import com.google.cloud.bigquery.spi.v2.BigQueryRpc;
import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.ListenableFuture;
import java.math.BigInteger;
import java.sql.SQLException;
import java.util.AbstractList;
@@ -37,6 +38,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingDeque;
import org.junit.Before;
import org.junit.Test;
@@ -365,62 +367,39 @@ public void testLegacyQuerySinglePage() throws BigQuerySQLException {
.createJobForQuery(any(com.google.api.services.bigquery.model.Job.class));
}
- // calls executeSelect with a Fast query and emulates that no schema is returned with the first
- // page
+ // exercises getSubsequentQueryResultsWithJob for fast running queries
@Test
- public void testFastQueryNullSchema() throws BigQuerySQLException {
+ public void testFastQueryLongRunning() throws SQLException {
ConnectionImpl connectionSpy = Mockito.spy(connection);
- QueryRequest queryReqMock = new QueryRequest();
- com.google.api.services.bigquery.model.JobStatistics stats =
- new com.google.api.services.bigquery.model.JobStatistics()
- .setQuery(new JobStatistics2().setSchema(FAST_QUERY_TABLESCHEMA));
- com.google.api.services.bigquery.model.Job jobResponseMock =
- new com.google.api.services.bigquery.model.Job()
- // .setConfiguration(QUERY_JOB.g)
- .setJobReference(QUERY_JOB.toPb())
- .setId(JOB)
- .setStatus(new com.google.api.services.bigquery.model.JobStatus().setState("DONE"))
- .setStatistics(stats);
- // emulating a legacy query
+ // emulating a fast query
doReturn(true).when(connectionSpy).isFastQuerySupported();
+ doReturn(GET_QUERY_RESULTS_RESPONSE)
+ .when(connectionSpy)
+ .getQueryResultsFirstPage(any(JobId.class));
+
+ doReturn(TABLE_NAME).when(connectionSpy).getDestinationTable(any(JobId.class));
+ doReturn(BQ_RS_MOCK_RES)
+ .when(connectionSpy)
+ .tableDataList(any(GetQueryResultsResponse.class), any(JobId.class));
+
com.google.api.services.bigquery.model.QueryResponse mockQueryRes =
new QueryResponse()
.setSchema(FAST_QUERY_TABLESCHEMA)
- .setJobComplete(false) // so that it goes to the else part in queryRpc
+ .setJobComplete(false)
.setTotalRows(new BigInteger(String.valueOf(4L)))
.setJobReference(QUERY_JOB.toPb())
.setRows(TABLE_ROWS);
when(bigqueryRpcMock.queryRpc(any(String.class), any(QueryRequest.class)))
.thenReturn(mockQueryRes);
- doReturn(GET_QUERY_RESULTS_RESPONSE_NULL_SCHEMA) // wiring the null schema for the test case
- .when(connectionSpy)
- .getQueryResultsFirstPage(any(JobId.class));
- doReturn(BQ_RS_MOCK_RES)
- .when(connectionSpy)
- .getSubsequentQueryResultsWithJob(
- any(Long.class),
- any(Long.class),
- any(JobId.class),
- any(GetQueryResultsResponse.class),
- any(Schema.class),
- any(Boolean.class));
- doReturn(jobResponseMock).when(connectionSpy).createDryRunJob(any(String.class));
BigQueryResult res = connectionSpy.executeSelect(SQL_QUERY);
assertEquals(res.getTotalRows(), 2);
assertEquals(QUERY_SCHEMA, res.getSchema());
- verify(connectionSpy, times(1))
- .getSubsequentQueryResultsWithJob(
- any(Long.class),
- any(Long.class),
- any(JobId.class),
- any(GetQueryResultsResponse.class),
- any(Schema.class),
- any(Boolean.class));
+ verify(bigqueryRpcMock, times(1)).queryRpc(any(String.class), any(QueryRequest.class));
}
- // exercises getSubsequentQueryResultsWithJob for fast running queries
@Test
- public void testFastQueryLongRunning() throws SQLException {
+ public void testFastQueryLongRunningAsync()
+ throws SQLException, ExecutionException, InterruptedException {
ConnectionImpl connectionSpy = Mockito.spy(connection);
// emulating a fast query
doReturn(true).when(connectionSpy).isFastQuerySupported();
@@ -442,12 +421,113 @@ public void testFastQueryLongRunning() throws SQLException {
.setRows(TABLE_ROWS);
when(bigqueryRpcMock.queryRpc(any(String.class), any(QueryRequest.class)))
.thenReturn(mockQueryRes);
- BigQueryResult res = connectionSpy.executeSelect(SQL_QUERY);
+ ListenableFuture executeSelectFut =
+ connectionSpy.executeSelectAsync(SQL_QUERY);
+ ExecuteSelectResponse exSelRes = executeSelectFut.get();
+ BigQueryResult res = exSelRes.getResultSet();
assertEquals(res.getTotalRows(), 2);
assertEquals(QUERY_SCHEMA, res.getSchema());
+ assertTrue(exSelRes.getIsSuccessful());
verify(bigqueryRpcMock, times(1)).queryRpc(any(String.class), any(QueryRequest.class));
}
+ @Test
+ public void testFastQuerySinglePageAsync()
+ throws BigQuerySQLException, ExecutionException, InterruptedException {
+ com.google.api.services.bigquery.model.QueryResponse mockQueryRes =
+ new QueryResponse().setSchema(FAST_QUERY_TABLESCHEMA).setJobComplete(true);
+ when(bigqueryRpcMock.queryRpc(any(String.class), any(QueryRequest.class)))
+ .thenReturn(mockQueryRes);
+ ConnectionImpl connectionSpy = Mockito.spy(connection);
+ doReturn(BQ_RS_MOCK_RES)
+ .when(connectionSpy)
+ .processQueryResponseResults(any(QueryResponse.class));
+
+ ListenableFuture executeSelectFut =
+ connectionSpy.executeSelectAsync(SQL_QUERY);
+ ExecuteSelectResponse exSelRes = executeSelectFut.get();
+ BigQueryResult res = exSelRes.getResultSet();
+ assertEquals(res.getTotalRows(), 2);
+ assertEquals(QUERY_SCHEMA, res.getSchema());
+ assertTrue(exSelRes.getIsSuccessful());
+ verify(connectionSpy, times(1))
+ .processQueryResponseResults(
+ any(com.google.api.services.bigquery.model.QueryResponse.class));
+ }
+
+ @Test
+ public void testExecuteSelectSlowWithParamsAsync()
+ throws BigQuerySQLException, ExecutionException, InterruptedException {
+ ConnectionImpl connectionSpy = Mockito.spy(connection);
+ List parameters = new ArrayList<>();
+ Map labels = new HashMap<>();
+ doReturn(false).when(connectionSpy).isFastQuerySupported();
+ com.google.api.services.bigquery.model.JobStatistics jobStatistics =
+ new com.google.api.services.bigquery.model.JobStatistics();
+ com.google.api.services.bigquery.model.Job jobResponseMock =
+ new com.google.api.services.bigquery.model.Job()
+ .setJobReference(QUERY_JOB.toPb())
+ .setId(JOB)
+ .setStatus(new com.google.api.services.bigquery.model.JobStatus().setState("DONE"))
+ .setStatistics(jobStatistics);
+
+ doReturn(jobResponseMock)
+ .when(connectionSpy)
+ .createQueryJob(SQL_QUERY, connectionSettings, parameters, labels);
+ doReturn(GET_QUERY_RESULTS_RESPONSE)
+ .when(connectionSpy)
+ .getQueryResultsFirstPage(any(JobId.class));
+ doReturn(BQ_RS_MOCK_RES)
+ .when(connectionSpy)
+ .getResultSet(
+ any(GetQueryResultsResponse.class),
+ any(JobId.class),
+ any(String.class),
+ any(Boolean.class));
+ ListenableFuture executeSelectFut =
+ connectionSpy.executeSelectAsync(SQL_QUERY, parameters, labels);
+ ExecuteSelectResponse exSelRes = executeSelectFut.get();
+ BigQueryResult res = exSelRes.getResultSet();
+ assertTrue(exSelRes.getIsSuccessful());
+ assertEquals(res.getTotalRows(), 2);
+ assertEquals(QUERY_SCHEMA, res.getSchema());
+ verify(connectionSpy, times(1))
+ .getResultSet(
+ any(GetQueryResultsResponse.class),
+ any(JobId.class),
+ any(String.class),
+ any(Boolean.class));
+ }
+
+ @Test
+ public void testFastQueryMultiplePagesAsync()
+ throws BigQuerySQLException, ExecutionException, InterruptedException {
+ com.google.api.services.bigquery.model.QueryResponse mockQueryRes =
+ new QueryResponse()
+ .setSchema(FAST_QUERY_TABLESCHEMA)
+ .setJobComplete(true)
+ .setPageToken(PAGE_TOKEN);
+ when(bigqueryRpcMock.queryRpc(any(String.class), any(QueryRequest.class)))
+ .thenReturn(mockQueryRes);
+ ConnectionImpl connectionSpy = Mockito.spy(connection);
+
+ doReturn(BQ_RS_MOCK_RES_MULTI_PAGE)
+ .when(connectionSpy)
+ .processQueryResponseResults(
+ any(com.google.api.services.bigquery.model.QueryResponse.class));
+
+ ListenableFuture executeSelectFut =
+ connectionSpy.executeSelectAsync(SQL_QUERY);
+ ExecuteSelectResponse exSelRes = executeSelectFut.get();
+ BigQueryResult res = exSelRes.getResultSet();
+ assertTrue(exSelRes.getIsSuccessful());
+ assertEquals(res.getTotalRows(), 4);
+ assertEquals(QUERY_SCHEMA, res.getSchema());
+ verify(connectionSpy, times(1))
+ .processQueryResponseResults(
+ any(com.google.api.services.bigquery.model.QueryResponse.class));
+ }
+
@Test
// Emulates first page response using getQueryResultsFirstPage(jobId) and then subsequent pages
// using getQueryResultsFirstPage(jobId) getSubsequentQueryResultsWithJob(
diff --git a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java
index 9a4b18a48..8e8f4cc0c 100644
--- a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java
+++ b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java
@@ -62,6 +62,7 @@
import com.google.cloud.bigquery.Dataset;
import com.google.cloud.bigquery.DatasetId;
import com.google.cloud.bigquery.DatasetInfo;
+import com.google.cloud.bigquery.ExecuteSelectResponse;
import com.google.cloud.bigquery.ExternalTableDefinition;
import com.google.cloud.bigquery.ExtractJobConfiguration;
import com.google.cloud.bigquery.Field;
@@ -135,6 +136,7 @@
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.io.BaseEncoding;
+import com.google.common.util.concurrent.ListenableFuture;
import com.google.gson.JsonObject;
import java.io.IOException;
import java.io.InputStream;
@@ -157,6 +159,7 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -2789,6 +2792,155 @@ public void testReadAPIIterationAndOrder()
connection.close();
}
+ @Test
+ public void testReadAPIIterationAndOrderAsync()
+ throws SQLException, ExecutionException,
+ InterruptedException { // use read API to read 300K records and check the order
+ String query =
+ "SELECT date, county, state_name, confirmed_cases, deaths FROM "
+ + TABLE_ID_LARGE.getTable()
+ + " where date is not null and county is not null and state_name is not null order by confirmed_cases asc limit 300000";
+
+ ConnectionSettings connectionSettings =
+ ConnectionSettings.newBuilder()
+ .setDefaultDataset(DatasetId.of(DATASET))
+ .setPriority(
+ QueryJobConfiguration.Priority
+ .INTERACTIVE) // required for this integration test so that isFastQuerySupported
+ // returns false
+ .build();
+ Connection connection = bigquery.createConnection(connectionSettings);
+
+ ListenableFuture executeSelectFut = connection.executeSelectAsync(query);
+ ExecuteSelectResponse exSelRes = executeSelectFut.get();
+ BigQueryResult bigQueryResult = exSelRes.getResultSet();
+ ResultSet rs = bigQueryResult.getResultSet();
+ int cnt = 0;
+ int lasConfirmedCases = Integer.MIN_VALUE;
+ while (rs.next()) { // pagination starts after approx 120,000 records
+ assertNotNull(rs.getDate(0));
+ assertNotNull(rs.getString(1));
+ assertNotNull(rs.getString(2));
+ assertTrue(rs.getInt(3) >= 0);
+ assertTrue(rs.getInt(4) >= 0);
+
+ // check if the records are sorted
+ assertTrue(rs.getInt(3) >= lasConfirmedCases);
+ lasConfirmedCases = rs.getInt(3);
+ ++cnt;
+ }
+ assertEquals(300000, cnt); // total 300000 rows should be read
+ connection.close();
+ }
+
+ @Test
+ // Cancel the future and check if the operations got cancelled. Tests the wiring of future
+ // callback.
+ // TODO(prasmish): Remove this test case if it turns out to be flaky, as expecting the process to
+ // be uncompleted in 1000ms is nondeterministic! Though very likely it won't be complete in the
+ // specified amount of time
+ public void testExecuteSelectAsyncCancel()
+ throws SQLException, ExecutionException,
+ InterruptedException { // use read API to read 300K records and check the order
+ String query =
+ "SELECT date, county, state_name, confirmed_cases, deaths FROM "
+ + TABLE_ID_LARGE.getTable()
+ + " where date is not null and county is not null and state_name is not null order by confirmed_cases asc limit 300000";
+
+ ConnectionSettings connectionSettings =
+ ConnectionSettings.newBuilder()
+ .setDefaultDataset(DatasetId.of(DATASET))
+ .setPriority(
+ QueryJobConfiguration.Priority
+ .INTERACTIVE) // required for this integration test so that isFastQuerySupported
+ // returns false
+ .build();
+ Connection connection = bigquery.createConnection(connectionSettings);
+
+ ListenableFuture executeSelectFut = connection.executeSelectAsync(query);
+
+ // Cancel the future with 1000ms delay
+ Thread testCloseAsync =
+ new Thread(
+ () -> {
+ try {
+ Thread.sleep(1000);
+ executeSelectFut.cancel(true);
+ } catch (InterruptedException e) {
+ assertNotNull(e);
+ }
+ });
+ testCloseAsync.start();
+
+ try {
+ executeSelectFut.get();
+ fail(); // this line should not be reached
+ } catch (CancellationException e) {
+ assertNotNull(e);
+ }
+ }
+
+ @Test
+ // Timeouts the future and check if the operations got cancelled.
+ // TODO(prasmish): Remove this test case if it turns out to be flaky, as expecting the process to
+ // be uncompleted in 1000ms is nondeterministic! Though very likely it won't be complete in the
+ // specified amount of time
+ public void testExecuteSelectAsyncTimeout()
+ throws SQLException, ExecutionException,
+ InterruptedException { // use read API to read 300K records and check the order
+ String query =
+ "SELECT date, county, state_name, confirmed_cases, deaths FROM "
+ + TABLE_ID_LARGE.getTable()
+ + " where date is not null and county is not null and state_name is not null order by confirmed_cases asc limit 300000";
+
+ ConnectionSettings connectionSettings =
+ ConnectionSettings.newBuilder()
+ .setDefaultDataset(DatasetId.of(DATASET))
+ .setPriority(
+ QueryJobConfiguration.Priority
+ .INTERACTIVE) // required for this integration test so that isFastQuerySupported
+ // returns false
+ .build();
+ Connection connection = bigquery.createConnection(connectionSettings);
+
+ ListenableFuture executeSelectFut = connection.executeSelectAsync(query);
+
+ try {
+ executeSelectFut.get(1000, TimeUnit.MILLISECONDS);
+ fail(); // this line should not be reached
+ } catch (CancellationException | TimeoutException e) {
+ assertNotNull(e);
+ }
+ }
+
+ @Test
+ public void testExecuteSelectWithNamedQueryParametersAsync()
+ throws BigQuerySQLException, ExecutionException, InterruptedException {
+ String query =
+ "SELECT TimestampField, StringField, BooleanField FROM "
+ + TABLE_ID.getTable()
+ + " WHERE StringField = @stringParam"
+ + " AND IntegerField IN UNNEST(@integerList)";
+ QueryParameterValue stringParameter = QueryParameterValue.string("stringValue");
+ QueryParameterValue intArrayParameter =
+ QueryParameterValue.array(new Integer[] {3, 4}, Integer.class);
+ Parameter stringParam =
+ Parameter.newBuilder().setName("stringParam").setValue(stringParameter).build();
+ Parameter intArrayParam =
+ Parameter.newBuilder().setName("integerList").setValue(intArrayParameter).build();
+
+ ConnectionSettings connectionSettings =
+ ConnectionSettings.newBuilder().setDefaultDataset(DatasetId.of(DATASET)).build();
+ Connection connection = bigquery.createConnection(connectionSettings);
+ List parameters = ImmutableList.of(stringParam, intArrayParam);
+
+ ListenableFuture executeSelectFut =
+ connection.executeSelectAsync(query, parameters);
+ ExecuteSelectResponse exSelRes = executeSelectFut.get();
+ BigQueryResult rs = exSelRes.getResultSet();
+ assertEquals(2, rs.getTotalRows());
+ }
+
// Ref: https://github.com/googleapis/java-bigquery/issues/2070. Adding a pre-submit test to see
// if bigquery.createConnection() returns null
@Test