From 2ab63837e08e463bb7ee6d766993760e0998261b Mon Sep 17 00:00:00 2001 From: Prad Nelluru Date: Thu, 24 Feb 2022 15:15:57 -0500 Subject: [PATCH 1/7] Add receive_messages_with_exactly_once_subscribe sample with its own region tag --- samples/snippets/subscriber.py | 49 ++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/samples/snippets/subscriber.py b/samples/snippets/subscriber.py index 7bc124ca8..1b81a1cb1 100644 --- a/samples/snippets/subscriber.py +++ b/samples/snippets/subscriber.py @@ -580,6 +580,55 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None: # [END pubsub_subscriber_blocking_shutdown] +def receive_messages_with_exactly_once_subscribe( + project_id: str, subscription_id: str, timeout: Optional[float] = None +) -> None: + """Receives messages from a pull subscription with exactly-once delivery enabled.""" + # [START pubsub_subscriber_pull_with_exactly_once_delivery_enabled] + from concurrent.futures import TimeoutError + from google.cloud import pubsub_v1 + from google.cloud.pubsub_v1.subscriber import sub_exceptions + + # TODO(developer) + # project_id = "your-project-id" + # subscription_id = "your-subscription-id" + # Number of seconds the subscriber should listen for messages + # timeout = 5.0 + + subscriber = pubsub_v1.SubscriberClient() + # The `subscription_path` method creates a fully qualified identifier + # in the form `projects/{project_id}/subscriptions/{subscription_id}` + subscription_path = subscriber.subscription_path(project_id, subscription_id) + + def callback(message: pubsub_v1.subscriber.message.Message) -> None: + print(f"Received {message}.") + + ack_future = message.ack_with_response() + + try: + # Block on result of acknowledge call. + ack_future.result() + print(f"Ack for message {message.message_id} successful.") + except sub_exceptions.AcknowledgeError as e: + print( + f"Ack for message {message.message_id} failed with error: {e.error_code}" + ) + + streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback) + print(f"Listening for messages on {subscription_path}..\n") + + # Wrap subscriber in a 'with' block to automatically call close() when done. + with subscriber: + try: + # When `timeout` is not set, result() will block indefinitely, + # unless an exception is encountered first. + streaming_pull_future.result(timeout=timeout) + except TimeoutError: + streaming_pull_future.cancel() # Trigger the shutdown. + streaming_pull_future.result() # Block until the shutdown is complete. + # [END pubsub_subscriber_pull_with_exactly_once_delivery_enabled] + + def synchronous_pull(project_id: str, subscription_id: str) -> None: """Pulling messages synchronously.""" # [START pubsub_subscriber_sync_pull] From 40bcfa2cfc45f4a8d01e23ed6620629ef73e5127 Mon Sep 17 00:00:00 2001 From: Prad Nelluru Date: Wed, 2 Mar 2022 13:23:32 -0500 Subject: [PATCH 2/7] Address Tianzi and Mahesh's comments. --- samples/snippets/subscriber.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/samples/snippets/subscriber.py b/samples/snippets/subscriber.py index 1b81a1cb1..f3adf29e3 100644 --- a/samples/snippets/subscriber.py +++ b/samples/snippets/subscriber.py @@ -584,7 +584,7 @@ def receive_messages_with_exactly_once_subscribe( project_id: str, subscription_id: str, timeout: Optional[float] = None ) -> None: """Receives messages from a pull subscription with exactly-once delivery enabled.""" - # [START pubsub_subscriber_pull_with_exactly_once_delivery_enabled] + # [START pubsub_subscriber_exactly_once_delivery] from concurrent.futures import TimeoutError from google.cloud import pubsub_v1 from google.cloud.pubsub_v1.subscriber import sub_exceptions @@ -614,7 +614,13 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None: f"Ack for message {message.message_id} failed with error: {e.error_code}" ) - streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback) + # Set a high `min_duration_per_lease_extension` to ensure the subscriber + # has plenty of time to process the message. + flow_control = pubsub_v1.types.FlowControl(min_duration_per_lease_extension=120) + + streaming_pull_future = subscriber.subscribe( + subscription_path, callback=callback, flow_control=flow_control + ) print(f"Listening for messages on {subscription_path}..\n") # Wrap subscriber in a 'with' block to automatically call close() when done. @@ -626,7 +632,7 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None: except TimeoutError: streaming_pull_future.cancel() # Trigger the shutdown. streaming_pull_future.result() # Block until the shutdown is complete. - # [END pubsub_subscriber_pull_with_exactly_once_delivery_enabled] + # [END pubsub_subscriber_exactly_once_delivery] def synchronous_pull(project_id: str, subscription_id: str) -> None: From 444bf1129741c167ed77536096b76e5a33c5814f Mon Sep 17 00:00:00 2001 From: Prad Nelluru Date: Wed, 2 Mar 2022 13:31:58 -0500 Subject: [PATCH 3/7] Add code for arg parsing / integrate sample with infra --- samples/snippets/subscriber.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/samples/snippets/subscriber.py b/samples/snippets/subscriber.py index f3adf29e3..94ee99d97 100644 --- a/samples/snippets/subscriber.py +++ b/samples/snippets/subscriber.py @@ -580,7 +580,7 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None: # [END pubsub_subscriber_blocking_shutdown] -def receive_messages_with_exactly_once_subscribe( +def receive_messages_with_exactly_once_delivery_enabled( project_id: str, subscription_id: str, timeout: Optional[float] = None ) -> None: """Receives messages from a pull subscription with exactly-once delivery enabled.""" @@ -936,6 +936,15 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None: "timeout", default=None, type=float, nargs="?" ) + receive_messages_with_exactly_once_delivery_enabled_parser = subparsers.add_parser( + "receive-messages-with-exactly-once-delivery-enabled", + help=receive_messages_with_exactly_once_delivery_enabled.__doc__, + ) + receive_messages_with_exactly_once_delivery_enabled_parser.add_argument("subscription_id") + receive_messages_with_exactly_once_delivery_enabled_parser.add_argument( + "timeout", default=None, type=float, nargs="?" + ) + synchronous_pull_parser = subparsers.add_parser( "receive-synchronously", help=synchronous_pull.__doc__ ) @@ -1022,6 +1031,10 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None: receive_messages_with_blocking_shutdown( args.project_id, args.subscription_id, args.timeout ) + elif args.command == "receive-messages-with-exactly-once-delivery-enabled": + receive_messages_with_exactly_once_delivery_enabled( + args.project_id, args.subscription_id, args.timeout + ) elif args.command == "receive-synchronously": synchronous_pull(args.project_id, args.subscription_id) elif args.command == "receive-synchronously-with-lease": From 4d6911d9ee5ad7b6b6e8bd4f904ae63113a089f6 Mon Sep 17 00:00:00 2001 From: Prad Nelluru Date: Wed, 2 Mar 2022 13:42:17 -0500 Subject: [PATCH 4/7] Add sample test --- samples/snippets/subscriber.py | 5 +++-- samples/snippets/subscriber_test.py | 26 ++++++++++++++++++++++++++ 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/samples/snippets/subscriber.py b/samples/snippets/subscriber.py index 94ee99d97..cd859f7d4 100644 --- a/samples/snippets/subscriber.py +++ b/samples/snippets/subscriber.py @@ -607,7 +607,9 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None: try: # Block on result of acknowledge call. - ack_future.result() + # When `timeout` is not set, result() will block indefinitely, + # unless an exception is encountered first. + ack_future.result(timeout=timeout) print(f"Ack for message {message.message_id} successful.") except sub_exceptions.AcknowledgeError as e: print( @@ -617,7 +619,6 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None: # Set a high `min_duration_per_lease_extension` to ensure the subscriber # has plenty of time to process the message. flow_control = pubsub_v1.types.FlowControl(min_duration_per_lease_extension=120) - streaming_pull_future = subscriber.subscribe( subscription_path, callback=callback, flow_control=flow_control ) diff --git a/samples/snippets/subscriber_test.py b/samples/snippets/subscriber_test.py index 9fcb1c119..ac8c0b8d6 100644 --- a/samples/snippets/subscriber_test.py +++ b/samples/snippets/subscriber_test.py @@ -624,6 +624,32 @@ def eventually_consistent_test() -> None: eventually_consistent_test() +def test_receive_messages_with_exactly_once_delivery_enabled( + publisher_client: pubsub_v1.PublisherClient, + topic: str, + subscription_async: str, + capsys: CaptureFixture[str], +) -> None: + + typed_backoff = cast( + Callable[[C], C], backoff.on_exception(backoff.expo, Unknown, max_time=60), + ) + + @typed_backoff + def eventually_consistent_test() -> None: + _publish_messages(publisher_client, topic) + + subscriber.receive_messages_with_exactly_once_delivery_enabled(PROJECT_ID, SUBSCRIPTION_ASYNC, 10) + + out, _ = capsys.readouterr() + assert "Listening" in out + assert subscription_async in out + assert "Received" in out + assert "Ack" in out + + eventually_consistent_test() + + def test_listen_for_errors( publisher_client: pubsub_v1.PublisherClient, topic: str, From c03233204c8400e0858a927e8a0f9c71dd9fe20f Mon Sep 17 00:00:00 2001 From: Prad Nelluru Date: Wed, 2 Mar 2022 13:43:44 -0500 Subject: [PATCH 5/7] Reformat and remove min lease extension period setting from sample --- samples/snippets/subscriber.py | 11 ++++------- samples/snippets/subscriber_test.py | 4 +++- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/samples/snippets/subscriber.py b/samples/snippets/subscriber.py index cd859f7d4..f44f99da7 100644 --- a/samples/snippets/subscriber.py +++ b/samples/snippets/subscriber.py @@ -616,12 +616,7 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None: f"Ack for message {message.message_id} failed with error: {e.error_code}" ) - # Set a high `min_duration_per_lease_extension` to ensure the subscriber - # has plenty of time to process the message. - flow_control = pubsub_v1.types.FlowControl(min_duration_per_lease_extension=120) - streaming_pull_future = subscriber.subscribe( - subscription_path, callback=callback, flow_control=flow_control - ) + streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback) print(f"Listening for messages on {subscription_path}..\n") # Wrap subscriber in a 'with' block to automatically call close() when done. @@ -941,7 +936,9 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None: "receive-messages-with-exactly-once-delivery-enabled", help=receive_messages_with_exactly_once_delivery_enabled.__doc__, ) - receive_messages_with_exactly_once_delivery_enabled_parser.add_argument("subscription_id") + receive_messages_with_exactly_once_delivery_enabled_parser.add_argument( + "subscription_id" + ) receive_messages_with_exactly_once_delivery_enabled_parser.add_argument( "timeout", default=None, type=float, nargs="?" ) diff --git a/samples/snippets/subscriber_test.py b/samples/snippets/subscriber_test.py index ac8c0b8d6..614633664 100644 --- a/samples/snippets/subscriber_test.py +++ b/samples/snippets/subscriber_test.py @@ -639,7 +639,9 @@ def test_receive_messages_with_exactly_once_delivery_enabled( def eventually_consistent_test() -> None: _publish_messages(publisher_client, topic) - subscriber.receive_messages_with_exactly_once_delivery_enabled(PROJECT_ID, SUBSCRIPTION_ASYNC, 10) + subscriber.receive_messages_with_exactly_once_delivery_enabled( + PROJECT_ID, SUBSCRIPTION_ASYNC, 10 + ) out, _ = capsys.readouterr() assert "Listening" in out From c1350ecc50b70a83d0218c9c82e78cf19edbd4c4 Mon Sep 17 00:00:00 2001 From: Prad Nelluru Date: Wed, 2 Mar 2022 13:57:12 -0500 Subject: [PATCH 6/7] Address Tianzi's comments. --- samples/snippets/subscriber.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/samples/snippets/subscriber.py b/samples/snippets/subscriber.py index f44f99da7..a85143e1d 100644 --- a/samples/snippets/subscriber.py +++ b/samples/snippets/subscriber.py @@ -584,7 +584,7 @@ def receive_messages_with_exactly_once_delivery_enabled( project_id: str, subscription_id: str, timeout: Optional[float] = None ) -> None: """Receives messages from a pull subscription with exactly-once delivery enabled.""" - # [START pubsub_subscriber_exactly_once_delivery] + # [START pubsub_subscriber_exactly_once] from concurrent.futures import TimeoutError from google.cloud import pubsub_v1 from google.cloud.pubsub_v1.subscriber import sub_exceptions @@ -603,6 +603,10 @@ def receive_messages_with_exactly_once_delivery_enabled( def callback(message: pubsub_v1.subscriber.message.Message) -> None: print(f"Received {message}.") + # Use `ack_with_response()` instead of `ack()` to get a future that tracks + # the result of the acknowledge call. When exactly-once delivery is enabled + # on the subscription, the message is guaranteed to not be delivered again + # if the ack future succeeds. ack_future = message.ack_with_response() try: @@ -628,7 +632,7 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None: except TimeoutError: streaming_pull_future.cancel() # Trigger the shutdown. streaming_pull_future.result() # Block until the shutdown is complete. - # [END pubsub_subscriber_exactly_once_delivery] + # [END pubsub_subscriber_exactly_once] def synchronous_pull(project_id: str, subscription_id: str) -> None: From 03c3e4bd4181353f057f9fea85b6f6d44e956639 Mon Sep 17 00:00:00 2001 From: Prad Nelluru Date: Fri, 4 Mar 2022 09:56:54 -0500 Subject: [PATCH 7/7] Fix import of subscriber exceptions. --- samples/snippets/subscriber.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samples/snippets/subscriber.py b/samples/snippets/subscriber.py index a85143e1d..ada70a02d 100644 --- a/samples/snippets/subscriber.py +++ b/samples/snippets/subscriber.py @@ -587,7 +587,7 @@ def receive_messages_with_exactly_once_delivery_enabled( # [START pubsub_subscriber_exactly_once] from concurrent.futures import TimeoutError from google.cloud import pubsub_v1 - from google.cloud.pubsub_v1.subscriber import sub_exceptions + from google.cloud.pubsub_v1.subscriber import exceptions as sub_exceptions # TODO(developer) # project_id = "your-project-id"