Skip to content

Commit

Permalink
fix most unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
tswast committed Apr 12, 2024
1 parent 0c9a8a2 commit 07839b5
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 41 deletions.
24 changes: 20 additions & 4 deletions google/cloud/bigquery/job/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -1440,6 +1440,12 @@ def _done_or_raise(self, retry=DEFAULT_RETRY, timeout=None):
# stored in _blocking_poll() in the process of polling for job completion.
transport_timeout = timeout if timeout is not None else self._transport_timeout

# We could have been given a generic object() sentinel to represent a
# default timeout.
transport_timeout = (
transport_timeout if isinstance(timeout, (int, float)) else None
)

try:
self._reload_query_results(retry=retry, timeout=transport_timeout)
except exceptions.GoogleAPIError as exc:
Expand Down Expand Up @@ -1576,7 +1582,12 @@ def is_job_done():
self._retry_do_query = retry_do_query
self._job_retry = job_retry

if self.done():
# Refresh the job status with jobs.get because some of the
# exceptions thrown by jobs.getQueryResults like timeout and
# rateLimitExceeded errors are ambiguous. We want to know if
# the query job failed and not just the call to
# jobs.getQueryResults.
if self.done(retry=retry, timeout=timeout):
# If it's already failed, we might as well stop.
if self.exception() is not None:
# Only try to restart the query job if the job failed for
Expand All @@ -1590,16 +1601,21 @@ def is_job_done():
# it's already been fetched, e.g. from jobs.query first
# page of results.
if (
self._query_results is not None
and self._query_results.complete
self._query_results is None
or not self._query_results.complete
):
return True
self._reload_query_results(retry=retry, timeout=timeout)
return True

# Call jobs.getQueryResults with max results set to 0 just to
# wait for the query to finish. Unlike most methods,
# jobs.getQueryResults hangs as long as it can to ensure we
# know when the query has finished as soon as possible.
self._reload_query_results(retry=retry, timeout=timeout)

# Even if the query is finished now according to
# jobs.getQueryResults, we'll want to reload the job status if
# it's not already DONE.
return False

if retry_do_query is not None and job_retry is not None:
Expand Down
64 changes: 30 additions & 34 deletions tests/unit/job/test_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -970,7 +970,12 @@ def test_result(self):
"rows": [{"f": [{"v": "abc"}]}],
}
conn = make_connection(
query_resource, query_resource_done, job_resource_done, query_page_resource
job_resource,
query_resource,
job_resource,
query_resource_done,
job_resource_done,
query_page_resource,
)
client = _make_client(self.PROJECT, connection=conn)
job = self._get_target_class().from_api_repr(job_resource, client)
Expand Down Expand Up @@ -1014,7 +1019,14 @@ def test_result(self):
timeout=None,
)
conn.api_request.assert_has_calls(
[query_results_call, query_results_call, reload_call, query_page_call]
[
reload_call,
query_results_call,
reload_call,
query_results_call,
reload_call,
query_page_call,
]
)

def test_result_dry_run(self):
Expand Down Expand Up @@ -1254,9 +1266,13 @@ def test_result_w_retry(self):
}

connection = make_connection(
exceptions.NotFound("not normally retriable"),
job_resource,
exceptions.NotFound("not normally retriable"),
query_resource,
exceptions.NotFound("not normally retriable"),
job_resource,
exceptions.NotFound("not normally retriable"),
query_resource_done,
exceptions.NotFound("not normally retriable"),
job_resource_done,
Expand Down Expand Up @@ -1289,7 +1305,18 @@ def test_result_w_retry(self):
)

connection.api_request.assert_has_calls(
[query_results_call, query_results_call, reload_call]
[
reload_call,
reload_call,
query_results_call,
query_results_call,
reload_call,
reload_call,
query_results_call,
query_results_call,
reload_call,
reload_call,
]
)

def test_result_w_empty_schema(self):
Expand All @@ -1316,37 +1343,6 @@ def test_result_w_empty_schema(self):
self.assertEqual(result.location, "asia-northeast1")
self.assertEqual(result.query_id, "xyz-abc")

def test_result_invokes_begins(self):
begun_resource = self._make_resource()
incomplete_resource = {
"jobComplete": False,
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
"schema": {"fields": [{"name": "col1", "type": "STRING"}]},
}
query_resource = copy.deepcopy(incomplete_resource)
query_resource["jobComplete"] = True
done_resource = copy.deepcopy(begun_resource)
done_resource["status"] = {"state": "DONE"}
connection = make_connection(
begun_resource,
incomplete_resource,
query_resource,
done_resource,
query_resource,
)
client = _make_client(project=self.PROJECT, connection=connection)
job = self._make_one(self.JOB_ID, self.QUERY, client)

job.result()

self.assertEqual(len(connection.api_request.call_args_list), 4)
begin_request = connection.api_request.call_args_list[0]
query_request = connection.api_request.call_args_list[2]
reload_request = connection.api_request.call_args_list[3]
self.assertEqual(begin_request[1]["method"], "POST")
self.assertEqual(query_request[1]["method"], "GET")
self.assertEqual(reload_request[1]["method"], "GET")

def test_result_w_timeout(self):
import google.cloud.bigquery.client

Expand Down
14 changes: 13 additions & 1 deletion tests/unit/test__job_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -946,6 +946,15 @@ def test_query_and_wait_incomplete_query():
"jobComplete": False,
},
# jobs.get
{
"jobReference": {
"projectId": "response-project",
"jobId": "response-job-id",
"location": "response-location",
},
"status": {"state": "RUNNING"},
},
# jobs.getQueryResults with max_results=0
{
"jobReference": {
"projectId": "response-project",
Expand All @@ -962,15 +971,18 @@ def test_query_and_wait_incomplete_query():
],
},
},
# jobs.getQueryResults with max_results=0
# jobs.get
{
"jobReference": {
"projectId": "response-project",
"jobId": "response-job-id",
"location": "response-location",
},
"status": {"state": "DONE"},
},
# jobs.getQueryResults
# Note: No more jobs.getQueryResults with max_results=0 because the
# previous call to jobs.getQueryResults returned with jobComplete=True.
{
"rows": [
{"f": [{"v": "Whillma Phlyntstone"}, {"v": "27"}]},
Expand Down
4 changes: 2 additions & 2 deletions tests/unit/test_job_retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,8 @@ def api_request(method, path, query_params=None, data=None, **kw):
with pytest.raises(google.api_core.exceptions.RetryError):
job.result()

# We never got a successful job, so the job id never changed:
assert job.job_id == orig_job_id
# We retried the job at least once, so we should have generated a new job ID.
assert job.job_id != orig_job_id

# We failed because we couldn't succeed after 120 seconds.
# But we can try again:
Expand Down

0 comments on commit 07839b5

Please sign in to comment.