Skip to content

Commit

Permalink
samples: create subscription with filtering enabled
Browse files Browse the repository at this point in the history
  • Loading branch information
anguillanneuf committed Feb 8, 2022
1 parent 096a425 commit c156e3d
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 3 deletions.
47 changes: 44 additions & 3 deletions samples/snippets/subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ def create_push_subscription(
def create_subscription_with_ordering(
project_id: str, topic_id: str, subscription_id: str
) -> None:
"""Create a subscription with dead letter policy."""
"""Create a subscription with ordering enabled."""
# [START pubsub_enable_subscription_ordering]
from google.cloud import pubsub_v1

Expand All @@ -216,6 +216,36 @@ def create_subscription_with_ordering(
# [END pubsub_enable_subscription_ordering]


def create_subscription_with_filtering(
project_id: str, topic_id: str, subscription_id: str, filter: str,
) -> None:
"""Create a subscription with filtering enabled."""
# [START pubsub_create_subscription_with_filter]
from google.cloud import pubsub_v1

# TODO(developer): Choose an existing topic.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# subscription_id = "your-subscription-id"
# filter = "attributes.author=\"unknown\""

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

with subscriber:
subscription = subscriber.create_subscription(
request={
"name": subscription_path,
"topic": topic_path,
"filter": filter,
}
)
print(f"Created subscription with filtering enabled: {subscription}")
# [END pubsub_create_subscription_with_filter]


def delete_subscription(project_id: str, subscription_id: str) -> None:
"""Deletes an existing Pub/Sub topic."""
# [START pubsub_delete_subscription]
Expand Down Expand Up @@ -785,6 +815,13 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None:
create_subscription_with_ordering_parser.add_argument("topic_id")
create_subscription_with_ordering_parser.add_argument("subscription_id")

create_subscription_with_filtering_parser = subparsers.add_parser(
"create-with-filtering", help=create_subscription_with_filtering.__doc__
)
create_subscription_with_filtering_parser.add_argument("topic_id")
create_subscription_with_filtering_parser.add_argument("subscription_id")
create_subscription_with_filtering_parser.add_argument("filter")

delete_parser = subparsers.add_parser("delete", help=delete_subscription.__doc__)
delete_parser.add_argument("subscription_id")

Expand Down Expand Up @@ -888,17 +925,21 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None:
)
elif args.command == "create-push":
create_push_subscription(
args.project_id, args.topic_id, args.subscription_id, args.endpoint,
args.project_id, args.topic_id, args.subscription_id, args.endpoint
)
elif args.command == "create-with-ordering":
create_subscription_with_ordering(
args.project_id, args.topic_id, args.subscription_id
)
elif args.command == "create-with-filtering":
create_subscription_with_filtering(
args.project_id, args.topic_id, args.subscription_id, args.filter
)
elif args.command == "delete":
delete_subscription(args.project_id, args.subscription_id)
elif args.command == "update-push":
update_push_subscription(
args.project_id, args.topic_id, args.subscription_id, args.endpoint,
args.project_id, args.topic_id, args.subscription_id, args.endpoint
)
elif args.command == "update-dead-letter-policy":
update_subscription_with_dead_letter_policy(
Expand Down
24 changes: 24 additions & 0 deletions samples/snippets/subscriber_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
NEW_ENDPOINT = f"https://{PROJECT_ID}.appspot.com/push2"
DEFAULT_MAX_DELIVERY_ATTEMPTS = 5
UPDATED_MAX_DELIVERY_ATTEMPTS = 20
FILTER = "attributes.author=\"unknown\""

C = TypeVar("C", bound=Callable[..., Any])

Expand Down Expand Up @@ -385,6 +386,29 @@ def test_create_subscription_with_ordering(
assert "enable_message_ordering: true" in out


def test_create_subscription_with_filtering(
subscriber_client: pubsub_v1.SubscriberClient,
subscription_admin: str,
capsys: CaptureFixture[str],
) -> None:
subscription_path = subscriber_client.subscription_path(
PROJECT_ID, SUBSCRIPTION_ADMIN
)
try:
subscriber_client.delete_subscription(
request={"subscription": subscription_path}
)
except NotFound:
pass

subscriber.create_subscription_with_filtering(PROJECT_ID, TOPIC, SUBSCRIPTION_ADMIN, FILTER)

out, _ = capsys.readouterr()
assert "Created subscription with filtering enabled" in out
assert f"{subscription_admin}" in out
assert f"filter: {FILTER}" in out


def test_create_push_subscription(
subscriber_client: pubsub_v1.SubscriberClient,
subscription_admin: str,
Expand Down

0 comments on commit c156e3d

Please sign in to comment.