From 04e261c602a2919cc75b3efa3dab099fb2cf704c Mon Sep 17 00:00:00 2001 From: Kamal Aboul-Hosn Date: Tue, 7 Jul 2020 11:49:25 -0400 Subject: [PATCH] feat: Add support for server-side flow control (#143) * chore: Remove notes about ordering keys being experimental. * Revert "chore: Remove notes about ordering keys being experimental." This reverts commit 38b2a3e91dd4f3f3c6657f4660fa1df8c0239124. * feat: Add support for server-side flow control * Add unit test for flow control --- .../subscriber/_protocol/streaming_pull_manager.py | 2 ++ .../pubsub_v1/subscriber/test_streaming_pull_manager.py | 9 +++++++++ 2 files changed, 11 insertions(+) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index 2c3e51fee..4e3f24933 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -575,6 +575,8 @@ def _get_initial_request(self, stream_ack_deadline_seconds): stream_ack_deadline_seconds=stream_ack_deadline_seconds, subscription=self._subscription, client_id=self._client_id, + max_outstanding_messages=self._flow_control.max_messages, + max_outstanding_bytes=self._flow_control.max_bytes, ) # Return the initial request. diff --git a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index dd1035991..3f2881df6 100644 --- a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -159,6 +159,15 @@ def test_client_id(): assert client_id_1 != client_id_2 +def test_streaming_flow_control(): + manager = make_manager( + flow_control=types.FlowControl(max_messages=10, max_bytes=1000) + ) + request = manager._get_initial_request(stream_ack_deadline_seconds=10) + assert request.max_outstanding_messages == 10 + assert request.max_outstanding_bytes == 1000 + + def test_ack_deadline_with_max_duration_per_lease_extension(): manager = make_manager() manager._flow_control = types.FlowControl(max_duration_per_lease_extension=5)