Skip to content

Commit

Permalink
fix: table-not-found issue with executeSelect while running long quer…
Browse files Browse the repository at this point in the history
…ies (#2222)

* Added exponential-back-off to create read session to avoid table-not-found error

* Added testForTableNotFound IT

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* Set recordCnt to 5Mil

* Add polling logic @ getQueryResultsFirstPage, Removed retrial logic on table_not_found

* Removed getTableNotFoundRetrySettings

* Updated getQueryResultsWithRowLimit - Added timeoutMs param

* Updated testGetQueryResultsFirstPage

* Updated getQueryResultsWithRowLimit - Add timeoutMs

* Updated getQueryResultsFirstPage - Modified polling logic and refactor

* Removed prev differences. Add getQueryResultsWithRowLimit

* Removed prev differences. Add getQueryResultsWithRowLimit

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
prash-mi and gcf-owl-bot[bot] committed Aug 23, 2022
1 parent de313bd commit 4876569
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 36 deletions.
16 changes: 9 additions & 7 deletions google-cloud-bigquery/clirr-ignored-differences.xml
Expand Up @@ -2,14 +2,16 @@
<!-- see http://www.mojohaus.org/clirr-maven-plugin/examples/ignored-differences.html -->
<differences>
<!-- TODO: REMOVE AFTER RELEASE -->
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/bigquery/LoadConfiguration</className>
<method>java.util.List getDecimalTargetTypes()</method>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/bigquery/spi/v2/BigQueryRpc</className>
<method>com.google.api.services.bigquery.model.GetQueryResultsResponse getQueryResultsWithRowLimit(java.lang.String, java.lang.String, java.lang.String, java.lang.Integer)</method>
<justification>getQueryResultsWithRowLimit is just used by ConnectionImpl at the moment so it should be fine to update the signature instead of writing an overloaded method</justification>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/bigquery/LoadConfiguration$Builder</className>
<method>com.google.cloud.bigquery.LoadConfiguration$Builder setDecimalTargetTypes(java.util.List)</method>
<differenceType>7004</differenceType>
<className>com/google/cloud/bigquery/spi/v2/HttpBigQueryRpc</className>
<method>com.google.api.services.bigquery.model.GetQueryResultsResponse getQueryResultsWithRowLimit(java.lang.String, java.lang.String, java.lang.String, java.lang.Integer)</method>
<justification>getQueryResultsWithRowLimit is just used by ConnectionImpl at the moment so it should be fine to update the signature instead of writing an overloaded method</justification>
</difference>
</differences>
Expand Up @@ -835,8 +835,8 @@ BigQueryResult highThroughPutRead(
.setMaxStreamCount(1) // Currently just one stream is allowed
// DO a regex check using order by and use multiple streams
;

ReadSession readSession = bqReadClient.createReadSession(builder.build());

bufferRow = new LinkedBlockingDeque<>(getBufferSize());
Map<String, Integer> arrowNameToIndex = new HashMap<>();
// deserialize and populate the buffer async, so that the client isn't blocked
Expand Down Expand Up @@ -995,33 +995,57 @@ GetQueryResultsResponse getQueryResultsFirstPage(JobId jobId) {
jobId.getLocation() == null && bigQueryOptions.getLocation() != null
? bigQueryOptions.getLocation()
: jobId.getLocation());
try {
GetQueryResultsResponse results =
BigQueryRetryHelper.runWithRetries(
() ->
bigQueryRpc.getQueryResultsWithRowLimit(
completeJobId.getProject(),
completeJobId.getJob(),
completeJobId.getLocation(),
connectionSettings.getMaxResultPerPage()),
bigQueryOptions.getRetrySettings(),
BigQueryBaseService.BIGQUERY_EXCEPTION_HANDLER,
bigQueryOptions.getClock(),
retryConfig);

if (results.getErrors() != null) {
List<BigQueryError> bigQueryErrors =
results.getErrors().stream()
.map(BigQueryError.FROM_PB_FUNCTION)
.collect(Collectors.toList());
// Throwing BigQueryException since there may be no JobId and we want to stay consistent
// with the case where there there is a HTTP error
throw new BigQueryException(bigQueryErrors);
// Implementing logic to poll the Job's status using getQueryResults as
// we do not get rows, rows count and schema unless the job is complete
// Ref: b/241134681
// This logic relies on backend for poll and wait.BigQuery guarantees that jobs make forward
// progress (a job won't get stuck in pending forever).
boolean jobComplete = false;
GetQueryResultsResponse results = null;
long timeoutMs =
60000; // defaulting to 60seconds. TODO(prashant): It should be made user configurable

while (!jobComplete) {
try {
results =
BigQueryRetryHelper.runWithRetries(
() ->
bigQueryRpc.getQueryResultsWithRowLimit(
completeJobId.getProject(),
completeJobId.getJob(),
completeJobId.getLocation(),
connectionSettings.getMaxResultPerPage(),
timeoutMs),
bigQueryOptions.getRetrySettings(),
BigQueryBaseService.BIGQUERY_EXCEPTION_HANDLER,
bigQueryOptions.getClock(),
retryConfig);

if (results.getErrors() != null) {
List<BigQueryError> bigQueryErrors =
results.getErrors().stream()
.map(BigQueryError.FROM_PB_FUNCTION)
.collect(Collectors.toList());
// Throwing BigQueryException since there may be no JobId, and we want to stay consistent
// with the case where there is a HTTP error
throw new BigQueryException(bigQueryErrors);
}
} catch (BigQueryRetryHelper.BigQueryRetryHelperException e) {
throw BigQueryException.translateAndThrow(e);
}
return results;
} catch (BigQueryRetryHelper.BigQueryRetryHelperException e) {
throw BigQueryException.translateAndThrow(e);
jobComplete = results.getJobComplete();

// This log msg at Level.FINE might indicate that the job is still running and not stuck for
// very long running jobs.
logger.log(
Level.FINE,
String.format(
"jobComplete: %s , Polling getQueryResults with timeoutMs: %s",
jobComplete, timeoutMs));
}

return results;
}

@VisibleForTesting
Expand Down
Expand Up @@ -315,7 +315,7 @@ GetQueryResultsResponse getQueryResults(
* @throws BigQueryException upon failure
*/
GetQueryResultsResponse getQueryResultsWithRowLimit(
String projectId, String jobId, String location, Integer preFetchedRowLimit);
String projectId, String jobId, String location, Integer preFetchedRowLimit, Long timeoutMs);

/**
* Runs a BigQuery SQL query synchronously and returns query results if the query completes within
Expand Down
Expand Up @@ -697,14 +697,15 @@ public GetQueryResultsResponse getQueryResults(

@Override
public GetQueryResultsResponse getQueryResultsWithRowLimit(
String projectId, String jobId, String location, Integer maxResultPerPage) {
String projectId, String jobId, String location, Integer maxResultPerPage, Long timeoutMs) {
try {
return bigquery
.jobs()
.getQueryResults(projectId, jobId)
.setPrettyPrint(false)
.setLocation(location)
.setMaxResults(Long.valueOf(maxResultPerPage))
.setTimeoutMs(timeoutMs)
.execute();
} catch (IOException ex) {
throw translate(ex);
Expand Down
Expand Up @@ -315,14 +315,22 @@ public void testNextPageTask() throws InterruptedException {
@Test
public void testGetQueryResultsFirstPage() {
when(bigqueryRpcMock.getQueryResultsWithRowLimit(
any(String.class), any(String.class), any(String.class), any(Integer.class)))
any(String.class),
any(String.class),
any(String.class),
any(Integer.class),
any(Long.class)))
.thenReturn(GET_QUERY_RESULTS_RESPONSE);
GetQueryResultsResponse response = connection.getQueryResultsFirstPage(QUERY_JOB);
assertNotNull(response);
assertEquals(GET_QUERY_RESULTS_RESPONSE, response);
verify(bigqueryRpcMock, times(1))
.getQueryResultsWithRowLimit(
any(String.class), any(String.class), any(String.class), any(Integer.class));
any(String.class),
any(String.class),
any(String.class),
any(Integer.class),
any(Long.class));
}

// calls executeSelect with a nonFast query and exercises createQueryJob
Expand Down
Expand Up @@ -23,9 +23,11 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import com.google.cloud.ServiceOptions;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryError;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.BigQueryResult;
import com.google.cloud.bigquery.BigQuerySQLException;
import com.google.cloud.bigquery.Connection;
Expand Down Expand Up @@ -60,6 +62,7 @@
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand Down Expand Up @@ -484,6 +487,65 @@ public void testPositionalParams()
assertEquals(MULTI_LIMIT_RECS, cnt);
}

@Test
// This testcase reads rows in bulk for a public table to make sure we do not get
// table-not-found exception. Ref: b/241134681 . This exception has been seen while reading data
// in bulk
public void testForTableNotFound() throws SQLException {
int recordCnt = 50000000; // 5Mil
String query =
String.format(
"SELECT * FROM `bigquery-samples.wikipedia_benchmark.Wiki10B` LIMIT %s", recordCnt);

String dataSet = RemoteBigQueryHelper.generateDatasetName();
String table = "TAB_" + UUID.randomUUID();
createDataset(dataSet);
TableId targetTable =
TableId.of(
ServiceOptions.getDefaultProjectId(),
dataSet,
table); // table will be created implicitly

ConnectionSettings conSet =
ConnectionSettings.newBuilder()
.setUseReadAPI(true) // enable read api
.setDestinationTable(targetTable)
.setAllowLargeResults(true)
.build();

Connection connection =
BigQueryOptions.getDefaultInstance().getService().createConnection(conSet);
BigQueryResult bigQueryResultSet = connection.executeSelect(query);
assertNotNull(getResultHashWiki(bigQueryResultSet)); // this iterated through all the rows
assertTrue(
(recordCnt == bigQueryResultSet.getTotalRows())
|| (-1
== bigQueryResultSet
.getTotalRows())); // either job should return the actual count or -1 if the job
// is still running
try {
deleteTable(dataSet, table);
deleteDataset(dataSet);
} catch (Exception e) {
logger.log(
Level.WARNING,
String.format(
"Error [ %s ] while deleting dataset: %s , table: %s",
e.getMessage(), dataSet, table));
}
}

// this iterated through all the rows (just reads the title column)
private Long getResultHashWiki(BigQueryResult bigQueryResultSet) throws SQLException {
ResultSet rs = bigQueryResultSet.getResultSet();
long hash = 0L;
System.out.print("\n Running");
while (rs.next()) {
hash += rs.getString("title") == null ? 0 : rs.getString("title").hashCode();
}
return hash;
}

// asserts the value of each row
private static void testForAllDataTypeValues(ResultSet rs, int cnt) throws SQLException {
// Testing JSON type
Expand Down

0 comments on commit 4876569

Please sign in to comment.