Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add exactly once delivery flag #577

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
17 changes: 17 additions & 0 deletions google/pubsub_v1/types/pubsub.py
Expand Up @@ -558,6 +558,22 @@ class Subscription(proto.Message):
``StreamingPull`` requests will return FAILED_PRECONDITION.
If the subscription is a push subscription, pushes to the
endpoint will not be made.
enable_exactly_once_delivery (bool):
If true, Pub/Sub provides the following guarantees for the
delivery of a message with a given value of ``message_id``
on this subscription:

- The message sent to a subscriber is guaranteed not to be
resent before the message's acknowledgement deadline
expires.
- An acknowledged message will not be resent to a
subscriber.

Note that subscribers may still receive multiple copies of a
message when ``enable_exactly_once_delivery`` is true if the
message was published multiple times by a publisher client.
These copies are considered distinct by Pub/Sub and have
distinct ``message_id`` values.
topic_message_retention_duration (google.protobuf.duration_pb2.Duration):
Output only. Indicates the minimum duration for which a
message is retained after it is published to the
Expand Down Expand Up @@ -588,6 +604,7 @@ class Subscription(proto.Message):
)
retry_policy = proto.Field(proto.MESSAGE, number=14, message="RetryPolicy",)
detached = proto.Field(proto.BOOL, number=15,)
enable_exactly_once_delivery = proto.Field(proto.BOOL, number=16,)
topic_message_retention_duration = proto.Field(
proto.MESSAGE, number=17, message=duration_pb2.Duration,
)
Expand Down
2 changes: 1 addition & 1 deletion scripts/fixup_pubsub_v1_keywords.py
Expand Up @@ -42,7 +42,7 @@ class pubsubCallTransformer(cst.CSTTransformer):
'acknowledge': ('subscription', 'ack_ids', ),
'create_schema': ('parent', 'schema', 'schema_id', ),
'create_snapshot': ('name', 'subscription', 'labels', ),
'create_subscription': ('name', 'topic', 'push_config', 'ack_deadline_seconds', 'retain_acked_messages', 'message_retention_duration', 'labels', 'enable_message_ordering', 'expiration_policy', 'filter', 'dead_letter_policy', 'retry_policy', 'detached', 'topic_message_retention_duration', ),
'create_subscription': ('name', 'topic', 'push_config', 'ack_deadline_seconds', 'retain_acked_messages', 'message_retention_duration', 'labels', 'enable_message_ordering', 'expiration_policy', 'filter', 'dead_letter_policy', 'retry_policy', 'detached', 'enable_exactly_once_delivery', 'topic_message_retention_duration', ),
'create_topic': ('name', 'labels', 'message_storage_policy', 'kms_key_name', 'schema_settings', 'satisfies_pzs', 'message_retention_duration', ),
'delete_schema': ('name', ),
'delete_snapshot': ('snapshot', ),
Expand Down
12 changes: 12 additions & 0 deletions tests/unit/gapic/pubsub_v1/test_subscriber.py
Expand Up @@ -638,6 +638,7 @@ def test_create_subscription(request_type, transport: str = "grpc"):
enable_message_ordering=True,
filter="filter_value",
detached=True,
enable_exactly_once_delivery=True,
)
response = client.create_subscription(request)

Expand All @@ -655,6 +656,7 @@ def test_create_subscription(request_type, transport: str = "grpc"):
assert response.enable_message_ordering is True
assert response.filter == "filter_value"
assert response.detached is True
assert response.enable_exactly_once_delivery is True


def test_create_subscription_empty_call():
Expand Down Expand Up @@ -700,6 +702,7 @@ async def test_create_subscription_async(
enable_message_ordering=True,
filter="filter_value",
detached=True,
enable_exactly_once_delivery=True,
)
)
response = await client.create_subscription(request)
Expand All @@ -718,6 +721,7 @@ async def test_create_subscription_async(
assert response.enable_message_ordering is True
assert response.filter == "filter_value"
assert response.detached is True
assert response.enable_exactly_once_delivery is True


@pytest.mark.asyncio
Expand Down Expand Up @@ -905,6 +909,7 @@ def test_get_subscription(request_type, transport: str = "grpc"):
enable_message_ordering=True,
filter="filter_value",
detached=True,
enable_exactly_once_delivery=True,
)
response = client.get_subscription(request)

Expand All @@ -922,6 +927,7 @@ def test_get_subscription(request_type, transport: str = "grpc"):
assert response.enable_message_ordering is True
assert response.filter == "filter_value"
assert response.detached is True
assert response.enable_exactly_once_delivery is True


def test_get_subscription_empty_call():
Expand Down Expand Up @@ -963,6 +969,7 @@ async def test_get_subscription_async(
enable_message_ordering=True,
filter="filter_value",
detached=True,
enable_exactly_once_delivery=True,
)
)
response = await client.get_subscription(request)
Expand All @@ -981,6 +988,7 @@ async def test_get_subscription_async(
assert response.enable_message_ordering is True
assert response.filter == "filter_value"
assert response.detached is True
assert response.enable_exactly_once_delivery is True


@pytest.mark.asyncio
Expand Down Expand Up @@ -1130,6 +1138,7 @@ def test_update_subscription(request_type, transport: str = "grpc"):
enable_message_ordering=True,
filter="filter_value",
detached=True,
enable_exactly_once_delivery=True,
)
response = client.update_subscription(request)

Expand All @@ -1147,6 +1156,7 @@ def test_update_subscription(request_type, transport: str = "grpc"):
assert response.enable_message_ordering is True
assert response.filter == "filter_value"
assert response.detached is True
assert response.enable_exactly_once_delivery is True


def test_update_subscription_empty_call():
Expand Down Expand Up @@ -1192,6 +1202,7 @@ async def test_update_subscription_async(
enable_message_ordering=True,
filter="filter_value",
detached=True,
enable_exactly_once_delivery=True,
)
)
response = await client.update_subscription(request)
Expand All @@ -1210,6 +1221,7 @@ async def test_update_subscription_async(
assert response.enable_message_ordering is True
assert response.filter == "filter_value"
assert response.detached is True
assert response.enable_exactly_once_delivery is True


@pytest.mark.asyncio
Expand Down