diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index 2c3e51fee..4e3f24933 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -575,6 +575,8 @@ def _get_initial_request(self, stream_ack_deadline_seconds): stream_ack_deadline_seconds=stream_ack_deadline_seconds, subscription=self._subscription, client_id=self._client_id, + max_outstanding_messages=self._flow_control.max_messages, + max_outstanding_bytes=self._flow_control.max_bytes, ) # Return the initial request. diff --git a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index dd1035991..3f2881df6 100644 --- a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -159,6 +159,15 @@ def test_client_id(): assert client_id_1 != client_id_2 +def test_streaming_flow_control(): + manager = make_manager( + flow_control=types.FlowControl(max_messages=10, max_bytes=1000) + ) + request = manager._get_initial_request(stream_ack_deadline_seconds=10) + assert request.max_outstanding_messages == 10 + assert request.max_outstanding_bytes == 1000 + + def test_ack_deadline_with_max_duration_per_lease_extension(): manager = make_manager() manager._flow_control = types.FlowControl(max_duration_per_lease_extension=5)