Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add executeSelectAsync and Refactor #2294

Merged
merged 27 commits into from Oct 12, 2022
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
1591411
Add executeSelectAsync
prash-mi Sep 27, 2022
6e83662
Add executeSelectAsync
prash-mi Sep 27, 2022
6ea69d5
Add ExecuteSelectResponse
prash-mi Sep 27, 2022
344d34b
Add executeSelectAsync(...) methods
prash-mi Oct 10, 2022
3133ca9
feat: Add executeSelectAsync
prash-mi Oct 10, 2022
3350360
implemented ExecuteSelectResponse Builder
prash-mi Oct 10, 2022
0bee0d3
Refactored executeSelect. Added getExecuteSelectResponse
prash-mi Oct 10, 2022
149242c
marked getExecuteSelectFuture private
prash-mi Oct 10, 2022
24c16fe
marked getExecuteSelectFuture private
prash-mi Oct 10, 2022
a16eb33
Add UT for Async methods
prash-mi Oct 10, 2022
6c1b6ed
Added IT for async methods
prash-mi Oct 10, 2022
031e81a
Removed testFastQueryNullSchema as it is no longer needed
prash-mi Oct 10, 2022
1dda219
removed dryRun calls as now we wait till the job is complete
prash-mi Oct 10, 2022
a420b4e
Added testExecuteSelectAsyncTimeout
prash-mi Oct 10, 2022
81798f4
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Oct 10, 2022
e54dba9
updated getExecuteSelectFuture
prash-mi Oct 11, 2022
b99935b
Merge remote-tracking branch 'origin/async-execute-select-and-refacto…
prash-mi Oct 11, 2022
2c0471d
lint
prash-mi Oct 11, 2022
f54136d
Add getters and setters for BigQuerySQLException
prash-mi Oct 11, 2022
89dc083
Add javadoc for overloaded executeSelectAsync and refactored getExecu…
prash-mi Oct 11, 2022
ffa218a
Marked ResultSet and BQSQLException optional
prash-mi Oct 11, 2022
89b29b9
minor refactor: getExecuteSelectFuture
prash-mi Oct 11, 2022
896b476
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Oct 11, 2022
53029f1
Merge branch 'master' into async-execute-select-and-refactor
prash-mi Oct 11, 2022
429101b
Merge remote-tracking branch 'origin/async-execute-select-and-refacto…
prash-mi Oct 11, 2022
704ec9d
update getExecuteSelectFuture
prash-mi Oct 11, 2022
cba789e
update javadoc
prash-mi Oct 11, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Expand Up @@ -52,20 +52,20 @@ If you are using Maven without BOM, add this to your dependencies:
If you are using Gradle 5.x or later, add this to your dependencies:

```Groovy
implementation platform('com.google.cloud:libraries-bom:26.1.2')
implementation platform('com.google.cloud:libraries-bom:26.1.3')

implementation 'com.google.cloud:google-cloud-bigquery'
```
If you are using Gradle without BOM, add this to your dependencies:

```Groovy
implementation 'com.google.cloud:google-cloud-bigquery:2.16.1'
implementation 'com.google.cloud:google-cloud-bigquery:2.17.0'
```

If you are using SBT, add this to your dependencies:

```Scala
libraryDependencies += "com.google.cloud" % "google-cloud-bigquery" % "2.16.1"
libraryDependencies += "com.google.cloud" % "google-cloud-bigquery" % "2.17.0"
```

## Authentication
Expand Down
12 changes: 11 additions & 1 deletion google-cloud-bigquery/clirr-ignored-differences.xml
Expand Up @@ -24,4 +24,14 @@
<className>com/google/cloud/bigquery/RoutineInfo*</className>
<method>*RemoteFunctionOptions(*)</method>
</difference>
</differences>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/bigquery/Connection</className>
<method>com.google.common.util.concurrent.ListenableFuture executeSelectAsync(java.lang.String)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/bigquery/Connection</className>
<method>com.google.common.util.concurrent.ListenableFuture executeSelectAsync(java.lang.String, java.util.List, java.util.Map[])</method>
</difference>
</differences>
Expand Up @@ -17,6 +17,7 @@
package com.google.cloud.bigquery;

import com.google.api.core.BetaApi;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -89,4 +90,65 @@ public interface Connection {
BigQueryResult executeSelect(
String sql, List<Parameter> parameters, Map<String, String>... labels)
throws BigQuerySQLException;

/**
* Execute a SQL statement that returns a single ResultSet and returns a ListenableFuture to
* process the response asynchronously.
*
* <p>Example of running a query.
*
* <pre>
* {
* &#64;code
* ConnectionSettings connectionSettings =
* ConnectionSettings.newBuilder()
* .setRequestTimeout(10L)
* .setMaxResults(100L)
* .setUseQueryCache(true)
* .build();
* Connection connection = bigquery.createConnection(connectionSettings);
* String selectQuery = "SELECT corpus FROM `bigquery-public-data.samples.shakespeare` GROUP BY corpus;";
* ListenableFuture<ExecuteSelectResponse> executeSelectFuture = connection.executeSelectAsync(selectQuery);
* ExecuteSelectResponse executeSelectRes = executeSelectFuture.get();
*
* if(!executeSelectRes.isSuccessful()){
* throw executeSelectRes.getBigQuerySQLException();
* }
*
* BigQueryResult bigQueryResult = executeSelectRes.getBigQueryResult();
* ResultSet rs = bigQueryResult.getResultSet();
* while (rs.next()) {
* System.out.println(rs.getString(1));
* }
*
* </pre>
*
* @param sql a static SQL SELECT statement
* @return a ListenableFuture that is used to get the data produced by the query
* @exception BigQuerySQLException if a database access error occurs
*/
@BetaApi
ListenableFuture<ExecuteSelectResponse> executeSelectAsync(String sql)
throws BigQuerySQLException;

/**
* Execute a SQL statement that returns a single ResultSet and returns a ListenableFuture to
* process the response asynchronously.
*
* @param sql SQL SELECT query
* @param parameters named or positional parameters. The set of query parameters must either be
* all positional or all named parameters.
* @param labels (optional) the labels associated with this query. You can use these to organize
* and group your query jobs. Label keys and values can be no longer than 63 characters, can
* only contain lowercase letters, numeric characters, underscores and dashes. International
* characters are allowed. Label values are optional and Label is a Varargs. You should pass
* all the Labels in a single Map .Label keys must start with a letter and each label in the
* list must have a different key.
* @return a ListenableFuture that is used to get the data produced by the query
* @throws BigQuerySQLException
*/
@BetaApi
ListenableFuture<ExecuteSelectResponse> executeSelectAsync(
String sql, List<Parameter> parameters, Map<String, String>... labels)
throws BigQuerySQLException;
}
Expand Up @@ -47,6 +47,11 @@
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.io.IOException;
import java.util.AbstractList;
import java.util.ArrayList;
Expand Down Expand Up @@ -187,24 +192,7 @@ public BigQueryDryRunResult dryRun(String sql) throws BigQuerySQLException {
@BetaApi
@Override
public BigQueryResult executeSelect(String sql) throws BigQuerySQLException {
try {
// use jobs.query if all the properties of connectionSettings are supported
if (isFastQuerySupported()) {
logger.log(Level.INFO, "\n Using Fast Query Path");
String projectId = bigQueryOptions.getProjectId();
QueryRequest queryRequest = createQueryRequest(connectionSettings, sql, null, null);
return queryRpc(projectId, queryRequest, sql, false);
}
// use jobs.insert otherwise
logger.log(Level.INFO, "\n Not Using Fast Query Path, using jobs.insert");
com.google.api.services.bigquery.model.Job queryJob =
createQueryJob(sql, connectionSettings, null, null);
JobId jobId = JobId.fromPb(queryJob.getJobReference());
GetQueryResultsResponse firstPage = getQueryResultsFirstPage(jobId);
return getResultSet(firstPage, jobId, sql, false);
} catch (BigQueryException e) {
throw new BigQuerySQLException(e.getMessage(), e, e.getErrors());
}
return getExecuteSelectResponse(sql, null, null);
}

/**
Expand All @@ -227,6 +215,12 @@ public BigQueryResult executeSelect(String sql) throws BigQuerySQLException {
public BigQueryResult executeSelect(
String sql, List<Parameter> parameters, Map<String, String>... labels)
throws BigQuerySQLException {
return getExecuteSelectResponse(sql, parameters, labels);
}

private BigQueryResult getExecuteSelectResponse(
String sql, List<Parameter> parameters, Map<String, String>... labels)
throws BigQuerySQLException {
Map<String, String> labelMap = null;
if (labels != null
&& labels.length == 1) { // We expect label as a key value pair in a single Map
Expand All @@ -252,29 +246,136 @@ public BigQueryResult executeSelect(
throw new BigQuerySQLException(e.getMessage(), e, e.getErrors());
}
}
/**
* Execute a SQL statement that returns a single ResultSet and returns a ListenableFuture to
* process the response asynchronously.
*
* <p>Example of running a query.
*
* <pre>
* {
* &#64;code
* ConnectionSettings connectionSettings =
* ConnectionSettings.newBuilder()
* .setRequestTimeout(10L)
* .setMaxResults(100L)
* .setUseQueryCache(true)
* .build();
* Connection connection = bigquery.createConnection(connectionSettings);
* String selectQuery = "SELECT corpus FROM `bigquery-public-data.samples.shakespeare` GROUP BY corpus;";
* ListenableFuture<ExecuteSelectResponse> executeSelectFuture = connection.executeSelectAsync(selectQuery);
* ExecuteSelectResponse executeSelectRes = executeSelectFuture.get();
*
* if(!executeSelectRes.getIsSuccessful()){
* throw executeSelectRes.getBigQuerySQLException();
* }
*
* BigQueryResult bigQueryResult = executeSelectRes.getBigQueryResult();
* ResultSet rs = bigQueryResult.getResultSet();
* while (rs.next()) {
* System.out.println(rs.getString(1));
* }
*
* </pre>
*
* @param sql a static SQL SELECT statement
* @return a ListenableFuture that is used to get the data produced by the query
* @exception BigQuerySQLException if a database access error occurs
*/
@BetaApi
@Override
public ListenableFuture<ExecuteSelectResponse> executeSelectAsync(String sql)
throws BigQuerySQLException {
return getExecuteSelectFuture(sql, null);
}

/** This method calls the overloaded executeSelect(...) methods and returns a Future */
private ListenableFuture<ExecuteSelectResponse> getExecuteSelectFuture(
String sql, List<Parameter> parameters, Map<String, String>... labels) {
ExecutorService execService =
Executors.newFixedThreadPool(
2); // two fixed threads. One for the async operation and the other for processing the
// callback
ListeningExecutorService lExecService = MoreExecutors.listeningDecorator(execService);
ListenableFuture<ExecuteSelectResponse> executeSelectFuture =
lExecService.submit(
() -> {
if (parameters == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there value in doing this check? does executeSelect not properly deal with null params?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this our @shollyman , null check isn't required here as the underlying method deals with it. Replaced it with a call to executeSelect( sql, parameters, labels). Also added exception handling to pass back any exception which might have occurred.

return ExecuteSelectResponse.newBuilder()
.setResultSet(this.executeSelect(sql))
.setIsSuccessful(true)
.build();
} else {
return ExecuteSelectResponse.newBuilder()
.setResultSet(this.executeSelect(sql, parameters, labels))
.setIsSuccessful(true)
.build();
}
});

Futures.addCallback(
executeSelectFuture,
new FutureCallback<ExecuteSelectResponse>() {
public void onSuccess(ExecuteSelectResponse result) {
execService.shutdownNow(); // shutdown the executor service as we do not need it
}

public void onFailure(Throwable t) {
logger.log(
Level.WARNING,
"\n"
+ String.format(
"Async task failed or cancelled with error %s", t.getMessage()));
try {
close(); // attempt to stop the execution as the developer might have called
// Future.cancel()
} catch (BigQuerySQLException e) {
logger.log(
Level.WARNING,
"\n"
+ String.format("Exception while closing the connection %s", e.getMessage()));
}
execService.shutdownNow(); // shutdown the executor service as we do not need it
}
},
execService);

return executeSelectFuture;
}

/**
* Execute a SQL statement that returns a single ResultSet and returns a ListenableFuture to
* process the response asynchronously.
*
* @param sql SQL SELECT query
* @param parameters named or positional parameters. The set of query parameters must either be
* all positional or all named parameters.
* @param labels (optional) the labels associated with this query. You can use these to organize
* and group your query jobs. Label keys and values can be no longer than 63 characters, can
* only contain lowercase letters, numeric characters, underscores and dashes. International
* characters are allowed. Label values are optional and Label is a Varargs. You should pass
* all the Labels in a single Map .Label keys must start with a letter and each label in the
* list must have a different key.
* @return a ListenableFuture that is used to get the data produced by the query
* @throws BigQuerySQLException
*/
@BetaApi
@Override
public ListenableFuture<ExecuteSelectResponse> executeSelectAsync(
String sql, List<Parameter> parameters, Map<String, String>... labels)
throws BigQuerySQLException {
return getExecuteSelectFuture(sql, parameters, labels);
}

@VisibleForTesting
BigQueryResult getResultSet(
GetQueryResultsResponse firstPage, JobId jobId, String sql, Boolean hasQueryParameters) {
if (firstPage.getJobComplete()
&& firstPage.getTotalRows() != null
&& firstPage.getSchema()
!= null) { // firstPage.getTotalRows() is null if job is not complete. We need to make
// sure that the schema is not null, as it is required for the ResultSet
return getSubsequentQueryResultsWithJob(
firstPage.getTotalRows().longValue(),
(long) firstPage.getRows().size(),
jobId,
firstPage,
hasQueryParameters);
} else { // job is still running, use dryrun to get Schema
com.google.api.services.bigquery.model.Job dryRunJob = createDryRunJob(sql);
Schema schema = Schema.fromPb(dryRunJob.getStatistics().getQuery().getSchema());
// TODO: check how can we get totalRows and pageRows while the job is still running.
// `firstPage.getTotalRows()` returns null
return getSubsequentQueryResultsWithJob(
null, null, jobId, firstPage, schema, hasQueryParameters);
}
return getSubsequentQueryResultsWithJob(
firstPage.getTotalRows().longValue(),
(long) firstPage.getRows().size(),
jobId,
firstPage,
hasQueryParameters);
}

static class EndOfFieldValueList
Expand Down Expand Up @@ -337,21 +438,6 @@ private BigQueryResult queryRpc(
results.getJobComplete(), results.getSchema() == null, totalRows, pageRows));
JobId jobId = JobId.fromPb(results.getJobReference());
GetQueryResultsResponse firstPage = getQueryResultsFirstPage(jobId);
// We might get null schema from the backend occasionally. Ref:
// https://github.com/googleapis/java-bigquery/issues/2103/. Using queryDryRun in such cases
// to get the schema
if (firstPage.getSchema() == null) { // get schema using dry run
// Log the status if the job was complete complete
logger.log(
Level.WARNING,
"\n"
+ "Received null schema, Using dryRun the get the Schema. jobComplete:"
+ firstPage.getJobComplete());
com.google.api.services.bigquery.model.Job dryRunJob = createDryRunJob(sql);
Schema schema = Schema.fromPb(dryRunJob.getStatistics().getQuery().getSchema());
return getSubsequentQueryResultsWithJob(
totalRows, pageRows, jobId, firstPage, schema, hasQueryParameters);
}
return getSubsequentQueryResultsWithJob(
totalRows, pageRows, jobId, firstPage, hasQueryParameters);
}
Expand Down
@@ -0,0 +1,40 @@
/*
* Copyright 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.bigquery;

import com.google.auto.value.AutoValue;
import java.io.Serializable;

@AutoValue
public abstract class ExecuteSelectResponse implements Serializable {
public abstract BigQueryResult getResultSet();

public abstract boolean getIsSuccessful();

public static Builder newBuilder() {
return new AutoValue_ExecuteSelectResponse.Builder();
}

@AutoValue.Builder
public abstract static class Builder {
public abstract ExecuteSelectResponse build();

public abstract Builder setResultSet(BigQueryResult bigQueryResult);

public abstract Builder setIsSuccessful(boolean isSuccessful);
}
}