Skip to content

Commit

Permalink
feat: add exactly once delivery flag (#577)
Browse files Browse the repository at this point in the history
- [x] Regenerate this pull request now.

PiperOrigin-RevId: 426401315

Source-Link: googleapis/googleapis@f02f439

Source-Link: googleapis/googleapis-gen@a6d5846
Copy-Tag: eyJwIjoiLmdpdGh1Yi8uT3dsQm90LnlhbWwiLCJoIjoiYTZkNTg0NmVlYjc0NTAyMDU3YmExOTk2ODMyODM2NWJmYmVlZGZiOSJ9
  • Loading branch information
gcf-owl-bot[bot] committed Feb 5, 2022
1 parent aa3754c commit d6614e2
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 1 deletion.
17 changes: 17 additions & 0 deletions google/pubsub_v1/types/pubsub.py
Expand Up @@ -558,6 +558,22 @@ class Subscription(proto.Message):
``StreamingPull`` requests will return FAILED_PRECONDITION.
If the subscription is a push subscription, pushes to the
endpoint will not be made.
enable_exactly_once_delivery (bool):
If true, Pub/Sub provides the following guarantees for the
delivery of a message with a given value of ``message_id``
on this subscription:
- The message sent to a subscriber is guaranteed not to be
resent before the message's acknowledgement deadline
expires.
- An acknowledged message will not be resent to a
subscriber.
Note that subscribers may still receive multiple copies of a
message when ``enable_exactly_once_delivery`` is true if the
message was published multiple times by a publisher client.
These copies are considered distinct by Pub/Sub and have
distinct ``message_id`` values.
topic_message_retention_duration (google.protobuf.duration_pb2.Duration):
Output only. Indicates the minimum duration for which a
message is retained after it is published to the
Expand Down Expand Up @@ -588,6 +604,7 @@ class Subscription(proto.Message):
)
retry_policy = proto.Field(proto.MESSAGE, number=14, message="RetryPolicy",)
detached = proto.Field(proto.BOOL, number=15,)
enable_exactly_once_delivery = proto.Field(proto.BOOL, number=16,)
topic_message_retention_duration = proto.Field(
proto.MESSAGE, number=17, message=duration_pb2.Duration,
)
Expand Down
2 changes: 1 addition & 1 deletion scripts/fixup_pubsub_v1_keywords.py
Expand Up @@ -42,7 +42,7 @@ class pubsubCallTransformer(cst.CSTTransformer):
'acknowledge': ('subscription', 'ack_ids', ),
'create_schema': ('parent', 'schema', 'schema_id', ),
'create_snapshot': ('name', 'subscription', 'labels', ),
'create_subscription': ('name', 'topic', 'push_config', 'ack_deadline_seconds', 'retain_acked_messages', 'message_retention_duration', 'labels', 'enable_message_ordering', 'expiration_policy', 'filter', 'dead_letter_policy', 'retry_policy', 'detached', 'topic_message_retention_duration', ),
'create_subscription': ('name', 'topic', 'push_config', 'ack_deadline_seconds', 'retain_acked_messages', 'message_retention_duration', 'labels', 'enable_message_ordering', 'expiration_policy', 'filter', 'dead_letter_policy', 'retry_policy', 'detached', 'enable_exactly_once_delivery', 'topic_message_retention_duration', ),
'create_topic': ('name', 'labels', 'message_storage_policy', 'kms_key_name', 'schema_settings', 'satisfies_pzs', 'message_retention_duration', ),
'delete_schema': ('name', ),
'delete_snapshot': ('snapshot', ),
Expand Down
12 changes: 12 additions & 0 deletions tests/unit/gapic/pubsub_v1/test_subscriber.py
Expand Up @@ -638,6 +638,7 @@ def test_create_subscription(request_type, transport: str = "grpc"):
enable_message_ordering=True,
filter="filter_value",
detached=True,
enable_exactly_once_delivery=True,
)
response = client.create_subscription(request)

Expand All @@ -655,6 +656,7 @@ def test_create_subscription(request_type, transport: str = "grpc"):
assert response.enable_message_ordering is True
assert response.filter == "filter_value"
assert response.detached is True
assert response.enable_exactly_once_delivery is True


def test_create_subscription_empty_call():
Expand Down Expand Up @@ -700,6 +702,7 @@ async def test_create_subscription_async(
enable_message_ordering=True,
filter="filter_value",
detached=True,
enable_exactly_once_delivery=True,
)
)
response = await client.create_subscription(request)
Expand All @@ -718,6 +721,7 @@ async def test_create_subscription_async(
assert response.enable_message_ordering is True
assert response.filter == "filter_value"
assert response.detached is True
assert response.enable_exactly_once_delivery is True


@pytest.mark.asyncio
Expand Down Expand Up @@ -905,6 +909,7 @@ def test_get_subscription(request_type, transport: str = "grpc"):
enable_message_ordering=True,
filter="filter_value",
detached=True,
enable_exactly_once_delivery=True,
)
response = client.get_subscription(request)

Expand All @@ -922,6 +927,7 @@ def test_get_subscription(request_type, transport: str = "grpc"):
assert response.enable_message_ordering is True
assert response.filter == "filter_value"
assert response.detached is True
assert response.enable_exactly_once_delivery is True


def test_get_subscription_empty_call():
Expand Down Expand Up @@ -963,6 +969,7 @@ async def test_get_subscription_async(
enable_message_ordering=True,
filter="filter_value",
detached=True,
enable_exactly_once_delivery=True,
)
)
response = await client.get_subscription(request)
Expand All @@ -981,6 +988,7 @@ async def test_get_subscription_async(
assert response.enable_message_ordering is True
assert response.filter == "filter_value"
assert response.detached is True
assert response.enable_exactly_once_delivery is True


@pytest.mark.asyncio
Expand Down Expand Up @@ -1130,6 +1138,7 @@ def test_update_subscription(request_type, transport: str = "grpc"):
enable_message_ordering=True,
filter="filter_value",
detached=True,
enable_exactly_once_delivery=True,
)
response = client.update_subscription(request)

Expand All @@ -1147,6 +1156,7 @@ def test_update_subscription(request_type, transport: str = "grpc"):
assert response.enable_message_ordering is True
assert response.filter == "filter_value"
assert response.detached is True
assert response.enable_exactly_once_delivery is True


def test_update_subscription_empty_call():
Expand Down Expand Up @@ -1192,6 +1202,7 @@ async def test_update_subscription_async(
enable_message_ordering=True,
filter="filter_value",
detached=True,
enable_exactly_once_delivery=True,
)
)
response = await client.update_subscription(request)
Expand All @@ -1210,6 +1221,7 @@ async def test_update_subscription_async(
assert response.enable_message_ordering is True
assert response.filter == "filter_value"
assert response.detached is True
assert response.enable_exactly_once_delivery is True


@pytest.mark.asyncio
Expand Down

0 comments on commit d6614e2

Please sign in to comment.