Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

samples: sample for receiving messages with exactly-once delivery enabled #588

Merged
merged 8 commits into from Mar 4, 2022
70 changes: 70 additions & 0 deletions samples/snippets/subscriber.py
Expand Up @@ -580,6 +580,61 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None:
# [END pubsub_subscriber_blocking_shutdown]


def receive_messages_with_exactly_once_delivery_enabled(
project_id: str, subscription_id: str, timeout: Optional[float] = None
) -> None:
"""Receives messages from a pull subscription with exactly-once delivery enabled."""
# [START pubsub_subscriber_exactly_once]
from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.subscriber import sub_exceptions

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
pradn marked this conversation as resolved.
Show resolved Hide resolved
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message: pubsub_v1.subscriber.message.Message) -> None:
print(f"Received {message}.")

# Use `ack_with_response()` instead of `ack()` to get a future that tracks
# the result of the acknowledge call. When exactly-once delivery is enabled
# on the subscription, the message is guaranteed to not be delivered again
# if the ack future succeeds.
ack_future = message.ack_with_response()
pradn marked this conversation as resolved.
Show resolved Hide resolved

try:
# Block on result of acknowledge call.
# When `timeout` is not set, result() will block indefinitely,
# unless an exception is encountered first.
ack_future.result(timeout=timeout)
print(f"Ack for message {message.message_id} successful.")
except sub_exceptions.AcknowledgeError as e:
print(
f"Ack for message {message.message_id} failed with error: {e.error_code}"
)

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
try:
# When `timeout` is not set, result() will block indefinitely,
# unless an exception is encountered first.
streaming_pull_future.result(timeout=timeout)
except TimeoutError:
streaming_pull_future.cancel() # Trigger the shutdown.
streaming_pull_future.result() # Block until the shutdown is complete.
# [END pubsub_subscriber_exactly_once]


def synchronous_pull(project_id: str, subscription_id: str) -> None:
"""Pulling messages synchronously."""
# [START pubsub_subscriber_sync_pull]
Expand Down Expand Up @@ -881,6 +936,17 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None:
"timeout", default=None, type=float, nargs="?"
)

receive_messages_with_exactly_once_delivery_enabled_parser = subparsers.add_parser(
"receive-messages-with-exactly-once-delivery-enabled",
help=receive_messages_with_exactly_once_delivery_enabled.__doc__,
)
receive_messages_with_exactly_once_delivery_enabled_parser.add_argument(
"subscription_id"
)
receive_messages_with_exactly_once_delivery_enabled_parser.add_argument(
"timeout", default=None, type=float, nargs="?"
)

synchronous_pull_parser = subparsers.add_parser(
"receive-synchronously", help=synchronous_pull.__doc__
)
Expand Down Expand Up @@ -967,6 +1033,10 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None:
receive_messages_with_blocking_shutdown(
args.project_id, args.subscription_id, args.timeout
)
elif args.command == "receive-messages-with-exactly-once-delivery-enabled":
receive_messages_with_exactly_once_delivery_enabled(
args.project_id, args.subscription_id, args.timeout
)
elif args.command == "receive-synchronously":
synchronous_pull(args.project_id, args.subscription_id)
elif args.command == "receive-synchronously-with-lease":
Expand Down
28 changes: 28 additions & 0 deletions samples/snippets/subscriber_test.py
Expand Up @@ -624,6 +624,34 @@ def eventually_consistent_test() -> None:
eventually_consistent_test()


def test_receive_messages_with_exactly_once_delivery_enabled(
publisher_client: pubsub_v1.PublisherClient,
topic: str,
subscription_async: str,
capsys: CaptureFixture[str],
) -> None:

typed_backoff = cast(
Callable[[C], C], backoff.on_exception(backoff.expo, Unknown, max_time=60),
)

@typed_backoff
def eventually_consistent_test() -> None:
_publish_messages(publisher_client, topic)

subscriber.receive_messages_with_exactly_once_delivery_enabled(
PROJECT_ID, SUBSCRIPTION_ASYNC, 10
)

out, _ = capsys.readouterr()
assert "Listening" in out
assert subscription_async in out
assert "Received" in out
assert "Ack" in out

eventually_consistent_test()


def test_listen_for_errors(
publisher_client: pubsub_v1.PublisherClient,
topic: str,
Expand Down