Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add support for server-side flow control #143

Merged
merged 9 commits into from Jul 7, 2020
Expand Up @@ -575,6 +575,8 @@ def _get_initial_request(self, stream_ack_deadline_seconds):
stream_ack_deadline_seconds=stream_ack_deadline_seconds,
subscription=self._subscription,
client_id=self._client_id,
max_outstanding_messages=self._flow_control.max_messages,
kamalaboulhosn marked this conversation as resolved.
Show resolved Hide resolved
max_outstanding_bytes=self._flow_control.max_bytes,
)

# Return the initial request.
Expand Down
Expand Up @@ -159,6 +159,15 @@ def test_client_id():
assert client_id_1 != client_id_2


def test_streaming_flow_control():
manager = make_manager(
flow_control=types.FlowControl(max_messages=10, max_bytes=1000)
)
request = manager._get_initial_request(stream_ack_deadline_seconds=10)
assert request.max_outstanding_messages == 10
assert request.max_outstanding_bytes == 1000


def test_ack_deadline_with_max_duration_per_lease_extension():
manager = make_manager()
manager._flow_control = types.FlowControl(max_duration_per_lease_extension=5)
Expand Down