From 4158315411e2d79bc92efb657b239c1f6e6f4846 Mon Sep 17 00:00:00 2001 From: Prad Nelluru Date: Mon, 28 Feb 2022 17:54:46 -0500 Subject: [PATCH] Improve tests and comments --- .../_protocol/streaming_pull_manager.py | 9 ++- .../subscriber/test_streaming_pull_manager.py | 73 ++++++++++++++++--- 2 files changed, 68 insertions(+), 14 deletions(-) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index 5971e7988..5a9d08026 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -154,11 +154,11 @@ def _process_requests( ack_reqs_dict: "containers.ScalarMap", errors_dict: Optional["containers.ScalarMap"], ): - """Process futures by referring to errors_dict. + """Process requests by referring to error_status and errors_dict. - The errors returned by the server in `errors_dict` are used to complete - the request futures in `ack_reqs_dict` (with a success or exception) or - to return requests for further retries. + The errors returned by the server in as `error_status` or in `errors_dict` + are used to complete the request futures in `ack_reqs_dict` (with a success + or exception) or to return requests for further retries. """ requests_completed = [] requests_to_retry = [] @@ -564,6 +564,7 @@ def send_unary_ack( error is re-raised. """ assert ack_ids + assert len(ack_ids) == len(ack_reqs_dict) error_status = None ack_errors_dict = None diff --git a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index 3d9904ca1..9e8d6c5ed 100644 --- a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -506,7 +506,15 @@ def test__maybe_release_messages_negative_on_hold_bytes_warning(caplog): def test_send_unary_ack(): manager = make_manager() - manager.send_unary_ack(ack_ids=["ack_id1", "ack_id2"], ack_reqs_dict={}) + ack_reqs_dict = { + "ack_id1": requests.AckRequest( + ack_id="ack_id1", byte_size=0, time_to_ack=20, ordering_key="", future=None + ), + "ack_id2": requests.AckRequest( + ack_id="ack_id2", byte_size=0, time_to_ack=20, ordering_key="", future=None + ), + } + manager.send_unary_ack(ack_ids=["ack_id1", "ack_id2"], ack_reqs_dict=ack_reqs_dict) manager._client.acknowledge.assert_called_once_with( subscription=manager._subscription, ack_ids=["ack_id1", "ack_id2"] @@ -516,10 +524,15 @@ def test_send_unary_ack(): def test_send_unary_modack(): manager = make_manager() + ack_reqs_dict = { + "ack_id3": requests.ModAckRequest(ack_id="ack_id3", seconds=60, future=None), + "ack_id4": requests.ModAckRequest(ack_id="ack_id4", seconds=60, future=None), + "ack_id5": requests.ModAckRequest(ack_id="ack_id5", seconds=60, future=None), + } manager.send_unary_modack( modify_deadline_ack_ids=["ack_id3", "ack_id4", "ack_id5"], modify_deadline_seconds=[10, 20, 20], - ack_reqs_dict={}, + ack_reqs_dict=ack_reqs_dict, ) manager._client.modify_ack_deadline.assert_has_calls( @@ -547,7 +560,15 @@ def test_send_unary_ack_api_call_error(caplog): error = exceptions.GoogleAPICallError("The front fell off") manager._client.acknowledge.side_effect = error - manager.send_unary_ack(ack_ids=["ack_id1", "ack_id2"], ack_reqs_dict={}) + ack_reqs_dict = { + "ack_id1": requests.AckRequest( + ack_id="ack_id1", byte_size=0, time_to_ack=20, ordering_key="", future=None + ), + "ack_id2": requests.AckRequest( + ack_id="ack_id2", byte_size=0, time_to_ack=20, ordering_key="", future=None + ), + } + manager.send_unary_ack(ack_ids=["ack_id1", "ack_id2"], ack_reqs_dict=ack_reqs_dict) assert "The front fell off" in caplog.text @@ -560,10 +581,26 @@ def test_send_unary_modack_api_call_error(caplog): error = exceptions.GoogleAPICallError("The front fell off") manager._client.modify_ack_deadline.side_effect = error + ack_reqs_dict = { + "ack_id1": requests.AckRequest( + ack_id="ack_id1", + byte_size=0, + time_to_ack=20, + ordering_key="", + future=futures.Future(), + ), + "ack_id2": requests.AckRequest( + ack_id="ack_id2", + byte_size=0, + time_to_ack=20, + ordering_key="", + future=futures.Future(), + ), + } manager.send_unary_modack( modify_deadline_ack_ids=["ack_id_string"], modify_deadline_seconds=[0], - ack_reqs_dict={}, + ack_reqs_dict=ack_reqs_dict, ) assert "The front fell off" in caplog.text @@ -579,11 +616,23 @@ def test_send_unary_ack_retry_error(caplog): ) manager._client.acknowledge.side_effect = error - future = futures.Future() + future1 = futures.Future() + future2 = futures.Future() ack_reqs_dict = { - "ackid1": requests.AckRequest( - ack_id="ackid1", byte_size=0, time_to_ack=20, ordering_key="", future=future - ) + "ack_id1": requests.AckRequest( + ack_id="ack_id1", + byte_size=0, + time_to_ack=20, + ordering_key="", + future=future1, + ), + "ack_id2": requests.AckRequest( + ack_id="ack_id2", + byte_size=0, + time_to_ack=20, + ordering_key="", + future=future2, + ), } with pytest.raises(exceptions.RetryError): manager.send_unary_ack( @@ -592,9 +641,13 @@ def test_send_unary_ack_retry_error(caplog): assert "RetryError while sending unary RPC" in caplog.text assert "signaled streaming pull manager shutdown" in caplog.text - assert isinstance(future.exception(), subscriber_exceptions.AcknowledgeError) + assert isinstance(future1.exception(), subscriber_exceptions.AcknowledgeError) assert ( - future.exception().error_code is subscriber_exceptions.AcknowledgeStatus.OTHER + future1.exception().error_code is subscriber_exceptions.AcknowledgeStatus.OTHER + ) + assert isinstance(future2.exception(), subscriber_exceptions.AcknowledgeError) + assert ( + future2.exception().error_code is subscriber_exceptions.AcknowledgeStatus.OTHER )