Skip to content

Commit

Permalink
Improve tests and comments
Browse files Browse the repository at this point in the history
  • Loading branch information
pradn committed Mar 1, 2022
1 parent 36fcab9 commit 4158315
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 14 deletions.
Expand Up @@ -154,11 +154,11 @@ def _process_requests(
ack_reqs_dict: "containers.ScalarMap",
errors_dict: Optional["containers.ScalarMap"],
):
"""Process futures by referring to errors_dict.
"""Process requests by referring to error_status and errors_dict.
The errors returned by the server in `errors_dict` are used to complete
the request futures in `ack_reqs_dict` (with a success or exception) or
to return requests for further retries.
The errors returned by the server in as `error_status` or in `errors_dict`
are used to complete the request futures in `ack_reqs_dict` (with a success
or exception) or to return requests for further retries.
"""
requests_completed = []
requests_to_retry = []
Expand Down Expand Up @@ -564,6 +564,7 @@ def send_unary_ack(
error is re-raised.
"""
assert ack_ids
assert len(ack_ids) == len(ack_reqs_dict)

error_status = None
ack_errors_dict = None
Expand Down
73 changes: 63 additions & 10 deletions tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py
Expand Up @@ -506,7 +506,15 @@ def test__maybe_release_messages_negative_on_hold_bytes_warning(caplog):
def test_send_unary_ack():
manager = make_manager()

manager.send_unary_ack(ack_ids=["ack_id1", "ack_id2"], ack_reqs_dict={})
ack_reqs_dict = {
"ack_id1": requests.AckRequest(
ack_id="ack_id1", byte_size=0, time_to_ack=20, ordering_key="", future=None
),
"ack_id2": requests.AckRequest(
ack_id="ack_id2", byte_size=0, time_to_ack=20, ordering_key="", future=None
),
}
manager.send_unary_ack(ack_ids=["ack_id1", "ack_id2"], ack_reqs_dict=ack_reqs_dict)

manager._client.acknowledge.assert_called_once_with(
subscription=manager._subscription, ack_ids=["ack_id1", "ack_id2"]
Expand All @@ -516,10 +524,15 @@ def test_send_unary_ack():
def test_send_unary_modack():
manager = make_manager()

ack_reqs_dict = {
"ack_id3": requests.ModAckRequest(ack_id="ack_id3", seconds=60, future=None),
"ack_id4": requests.ModAckRequest(ack_id="ack_id4", seconds=60, future=None),
"ack_id5": requests.ModAckRequest(ack_id="ack_id5", seconds=60, future=None),
}
manager.send_unary_modack(
modify_deadline_ack_ids=["ack_id3", "ack_id4", "ack_id5"],
modify_deadline_seconds=[10, 20, 20],
ack_reqs_dict={},
ack_reqs_dict=ack_reqs_dict,
)

manager._client.modify_ack_deadline.assert_has_calls(
Expand Down Expand Up @@ -547,7 +560,15 @@ def test_send_unary_ack_api_call_error(caplog):
error = exceptions.GoogleAPICallError("The front fell off")
manager._client.acknowledge.side_effect = error

manager.send_unary_ack(ack_ids=["ack_id1", "ack_id2"], ack_reqs_dict={})
ack_reqs_dict = {
"ack_id1": requests.AckRequest(
ack_id="ack_id1", byte_size=0, time_to_ack=20, ordering_key="", future=None
),
"ack_id2": requests.AckRequest(
ack_id="ack_id2", byte_size=0, time_to_ack=20, ordering_key="", future=None
),
}
manager.send_unary_ack(ack_ids=["ack_id1", "ack_id2"], ack_reqs_dict=ack_reqs_dict)

assert "The front fell off" in caplog.text

Expand All @@ -560,10 +581,26 @@ def test_send_unary_modack_api_call_error(caplog):
error = exceptions.GoogleAPICallError("The front fell off")
manager._client.modify_ack_deadline.side_effect = error

ack_reqs_dict = {
"ack_id1": requests.AckRequest(
ack_id="ack_id1",
byte_size=0,
time_to_ack=20,
ordering_key="",
future=futures.Future(),
),
"ack_id2": requests.AckRequest(
ack_id="ack_id2",
byte_size=0,
time_to_ack=20,
ordering_key="",
future=futures.Future(),
),
}
manager.send_unary_modack(
modify_deadline_ack_ids=["ack_id_string"],
modify_deadline_seconds=[0],
ack_reqs_dict={},
ack_reqs_dict=ack_reqs_dict,
)

assert "The front fell off" in caplog.text
Expand All @@ -579,11 +616,23 @@ def test_send_unary_ack_retry_error(caplog):
)
manager._client.acknowledge.side_effect = error

future = futures.Future()
future1 = futures.Future()
future2 = futures.Future()
ack_reqs_dict = {
"ackid1": requests.AckRequest(
ack_id="ackid1", byte_size=0, time_to_ack=20, ordering_key="", future=future
)
"ack_id1": requests.AckRequest(
ack_id="ack_id1",
byte_size=0,
time_to_ack=20,
ordering_key="",
future=future1,
),
"ack_id2": requests.AckRequest(
ack_id="ack_id2",
byte_size=0,
time_to_ack=20,
ordering_key="",
future=future2,
),
}
with pytest.raises(exceptions.RetryError):
manager.send_unary_ack(
Expand All @@ -592,9 +641,13 @@ def test_send_unary_ack_retry_error(caplog):

assert "RetryError while sending unary RPC" in caplog.text
assert "signaled streaming pull manager shutdown" in caplog.text
assert isinstance(future.exception(), subscriber_exceptions.AcknowledgeError)
assert isinstance(future1.exception(), subscriber_exceptions.AcknowledgeError)
assert (
future.exception().error_code is subscriber_exceptions.AcknowledgeStatus.OTHER
future1.exception().error_code is subscriber_exceptions.AcknowledgeStatus.OTHER
)
assert isinstance(future2.exception(), subscriber_exceptions.AcknowledgeError)
assert (
future2.exception().error_code is subscriber_exceptions.AcknowledgeStatus.OTHER
)


Expand Down

0 comments on commit 4158315

Please sign in to comment.