Skip to content

Commit

Permalink
feat: retry temporary GRPC statuses for ack/modack/nack when exactly-…
Browse files Browse the repository at this point in the history
…once delivery is enabled (#607)

We need to do this because [only UNAVAILABLE](https://github.com/googleapis/googleapis/blob/eb0700c6f29ca94f460307f201eb605744f055cb/google/pubsub/v1/pubsub_grpc_service_config.json#L221) is retried for acks/modacks/nacks at the GRPC level. With this CL, we extend the higher-level, manual retry mechanism for these RPCs to all the ones considered temporary for the Publish RPC. The new list of retriable codes is for these RPCs when exactly-once delivery is enabled is: DEADLINE_EXCEEDED, RESOURCE_EXHAUSTED, ABORTED, INTERNAL, UNAVAILABLE.
  • Loading branch information
pradn committed Mar 8, 2022
1 parent de0bbce commit a91bed8
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 5 deletions.
Expand Up @@ -75,6 +75,14 @@
a subscription. We do this to reduce premature ack expiration.
"""

_EXACTLY_ONCE_DELIVERY_TEMPORARY_RETRY_ERRORS = {
code_pb2.DEADLINE_EXCEEDED,
code_pb2.RESOURCE_EXHAUSTED,
code_pb2.ABORTED,
code_pb2.INTERNAL,
code_pb2.UNAVAILABLE,
}


def _wrap_as_exception(maybe_exception: Any) -> BaseException:
"""Wrap an object as a Python exception, if needed.
Expand Down Expand Up @@ -163,6 +171,8 @@ def _process_requests(
requests_completed = []
requests_to_retry = []
for ack_id in ack_reqs_dict:
# Handle special errors returned for ack/modack RPCs via the ErrorInfo
# sidecar metadata when exactly-once delivery is enabled.
if errors_dict and ack_id in errors_dict:
exactly_once_error = errors_dict[ack_id]
if exactly_once_error.startswith("TRANSIENT_"):
Expand All @@ -176,9 +186,14 @@ def _process_requests(
future = ack_reqs_dict[ack_id].future
future.set_exception(exc)
requests_completed.append(ack_reqs_dict[ack_id])
# Temporary GRPC errors are retried
elif (
error_status
and error_status.code in _EXACTLY_ONCE_DELIVERY_TEMPORARY_RETRY_ERRORS
):
requests_to_retry.append(ack_reqs_dict[ack_id])
# Other GRPC errors are NOT retried
elif error_status:
# Only permanent errors are expected here b/c retriable errors are
# retried at the lower, GRPC level.
if error_status.code == code_pb2.PERMISSION_DENIED:
exc = AcknowledgeError(AcknowledgeStatus.PERMISSION_DENIED, info=None)
elif error_status.code == code_pb2.FAILED_PRECONDITION:
Expand All @@ -188,11 +203,13 @@ def _process_requests(
future = ack_reqs_dict[ack_id].future
future.set_exception(exc)
requests_completed.append(ack_reqs_dict[ack_id])
# Since no error occurred, requests with futures are completed successfully.
elif ack_reqs_dict[ack_id].future:
future = ack_reqs_dict[ack_id].future
# success
future.set_result(AcknowledgeStatus.SUCCESS)
requests_completed.append(ack_reqs_dict[ack_id])
# All other requests are considered completed.
else:
requests_completed.append(ack_reqs_dict[ack_id])

Expand Down Expand Up @@ -580,7 +597,9 @@ def send_unary_ack(
ack_errors_dict = _get_ack_errors(exc)
except exceptions.RetryError as exc:
status = status_pb2.Status()
status.code = code_pb2.DEADLINE_EXCEEDED
# Choose a non-retriable error code so the futures fail with
# exceptions.
status.code = code_pb2.UNKNOWN
# Makes sure to complete futures so they don't block forever.
_process_requests(status, ack_reqs_dict, None)
_LOGGER.debug(
Expand Down Expand Up @@ -634,7 +653,9 @@ def send_unary_modack(
modack_errors_dict = _get_ack_errors(exc)
except exceptions.RetryError as exc:
status = status_pb2.Status()
status.code = code_pb2.DEADLINE_EXCEEDED
# Choose a non-retriable error code so the futures fail with
# exceptions.
status.code = code_pb2.UNKNOWN
# Makes sure to complete futures so they don't block forever.
_process_requests(status, ack_reqs_dict, None)
_LOGGER.debug(
Expand Down
34 changes: 33 additions & 1 deletion tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py
Expand Up @@ -1735,7 +1735,7 @@ def test_process_requests_permanent_error_raises_exception():
assert not requests_to_retry


def test_process_requests_transient_error_returns_request():
def test_process_requests_transient_error_returns_request_for_retrying():
# a transient error returns the request in `requests_to_retry`
future = futures.Future()
ack_reqs_dict = {
Expand Down Expand Up @@ -1772,6 +1772,38 @@ def test_process_requests_unknown_error_raises_exception():
assert not requests_to_retry


def test_process_requests_retriable_error_status_returns_request_for_retrying():
# a retriable error status returns the request in `requests_to_retry`
retriable_errors = [
code_pb2.DEADLINE_EXCEEDED,
code_pb2.RESOURCE_EXHAUSTED,
code_pb2.ABORTED,
code_pb2.INTERNAL,
code_pb2.UNAVAILABLE,
]

for retriable_error in retriable_errors:
future = futures.Future()
ack_reqs_dict = {
"ackid1": requests.AckRequest(
ack_id="ackid1",
byte_size=0,
time_to_ack=20,
ordering_key="",
future=future,
)
}
st = status_pb2.Status()
st.code = retriable_error
(
requests_completed,
requests_to_retry,
) = streaming_pull_manager._process_requests(st, ack_reqs_dict, None)
assert not requests_completed
assert requests_to_retry[0].ack_id == "ackid1"
assert not future.done()


def test_process_requests_permission_denied_error_status_raises_exception():
# a permission-denied error status raises an exception
future = futures.Future()
Expand Down

0 comments on commit a91bed8

Please sign in to comment.