From 38b2a3e91dd4f3f3c6657f4660fa1df8c0239124 Mon Sep 17 00:00:00 2001 From: Kamal Aboul-Hosn Date: Sat, 2 May 2020 13:36:32 -0400 Subject: [PATCH 1/4] chore: Remove notes about ordering keys being experimental. --- google/cloud/pubsub_v1/publisher/client.py | 2 -- google/cloud/pubsub_v1/types.py | 3 --- 2 files changed, 5 deletions(-) diff --git a/google/cloud/pubsub_v1/publisher/client.py b/google/cloud/pubsub_v1/publisher/client.py index caa784407..02f508574 100644 --- a/google/cloud/pubsub_v1/publisher/client.py +++ b/google/cloud/pubsub_v1/publisher/client.py @@ -315,8 +315,6 @@ def publish(self, topic, data, ordering_key="", **attrs): ordering_key: A string that identifies related messages for which publish order should be respected. Message ordering must be enabled for this client to use this feature. - EXPERIMENTAL: This feature is currently available in a closed - alpha. Please contact the Cloud Pub/Sub team to use it. attrs (Mapping[str, str]): A dictionary of attributes to be sent as metadata. (These may be text strings or byte strings.) diff --git a/google/cloud/pubsub_v1/types.py b/google/cloud/pubsub_v1/types.py index eb4f00681..cb140e730 100644 --- a/google/cloud/pubsub_v1/types.py +++ b/google/cloud/pubsub_v1/types.py @@ -43,9 +43,6 @@ PublisherOptions.__doc__ = "The options for the publisher client." PublisherOptions.enable_message_ordering.__doc__ = ( "Whether to order messages in a batch by a supplied ordering key." - "EXPERIMENTAL: Message ordering is an alpha feature that requires " - "special permissions to use. Please contact the Cloud Pub/Sub team for " - "more information." ) From 8c30a9f3ee1a9b2acbf34be24e401a33cdf1476b Mon Sep 17 00:00:00 2001 From: Kamal Aboul-Hosn Date: Sat, 20 Jun 2020 21:06:21 -0400 Subject: [PATCH 2/4] Revert "chore: Remove notes about ordering keys being experimental." This reverts commit 38b2a3e91dd4f3f3c6657f4660fa1df8c0239124. --- google/cloud/pubsub_v1/publisher/client.py | 2 ++ google/cloud/pubsub_v1/types.py | 1 - 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/google/cloud/pubsub_v1/publisher/client.py b/google/cloud/pubsub_v1/publisher/client.py index ad27f0029..8dbbea634 100644 --- a/google/cloud/pubsub_v1/publisher/client.py +++ b/google/cloud/pubsub_v1/publisher/client.py @@ -325,6 +325,8 @@ def publish(self, topic, data, ordering_key="", **attrs): ordering_key: A string that identifies related messages for which publish order should be respected. Message ordering must be enabled for this client to use this feature. + EXPERIMENTAL: This feature is currently available in a closed + alpha. Please contact the Cloud Pub/Sub team to use it. attrs (Mapping[str, str]): A dictionary of attributes to be sent as metadata. (These may be text strings or byte strings.) diff --git a/google/cloud/pubsub_v1/types.py b/google/cloud/pubsub_v1/types.py index 04762a3a6..b52b3ea60 100644 --- a/google/cloud/pubsub_v1/types.py +++ b/google/cloud/pubsub_v1/types.py @@ -32,7 +32,6 @@ from google.cloud.pubsub_v1.proto import pubsub_pb2 - # Define the default values for batching. # # This class is used when creating a publisher or subscriber client, and From aaf1e4c6eaeb32306e448bcb14e0c54c0bea6df4 Mon Sep 17 00:00:00 2001 From: Kamal Aboul-Hosn Date: Sat, 20 Jun 2020 21:10:39 -0400 Subject: [PATCH 3/4] feat: Add support for server-side flow control --- .../pubsub_v1/subscriber/_protocol/streaming_pull_manager.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index 2c3e51fee..4e3f24933 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -575,6 +575,8 @@ def _get_initial_request(self, stream_ack_deadline_seconds): stream_ack_deadline_seconds=stream_ack_deadline_seconds, subscription=self._subscription, client_id=self._client_id, + max_outstanding_messages=self._flow_control.max_messages, + max_outstanding_bytes=self._flow_control.max_bytes, ) # Return the initial request. From daec6964a8ae42177d8ffa4f77af85e36b5e3026 Mon Sep 17 00:00:00 2001 From: Kamal Aboul-Hosn Date: Mon, 6 Jul 2020 13:09:50 -0400 Subject: [PATCH 4/4] Add unit test for flow control --- .../pubsub_v1/subscriber/test_streaming_pull_manager.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index dd1035991..3f2881df6 100644 --- a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -159,6 +159,15 @@ def test_client_id(): assert client_id_1 != client_id_2 +def test_streaming_flow_control(): + manager = make_manager( + flow_control=types.FlowControl(max_messages=10, max_bytes=1000) + ) + request = manager._get_initial_request(stream_ack_deadline_seconds=10) + assert request.max_outstanding_messages == 10 + assert request.max_outstanding_bytes == 1000 + + def test_ack_deadline_with_max_duration_per_lease_extension(): manager = make_manager() manager._flow_control = types.FlowControl(max_duration_per_lease_extension=5)