From 8bf4545d6339c9c07a107af9ef5f7123b63091d2 Mon Sep 17 00:00:00 2001 From: Google APIs Date: Mon, 6 Feb 2023 13:02:44 -0800 Subject: [PATCH] feat: API for publish idempotence PiperOrigin-RevId: 507561467 --- google/cloud/pubsublite/v1/admin.proto | 61 ++++++++++++-------- google/cloud/pubsublite/v1/common.proto | 18 +++--- google/cloud/pubsublite/v1/cursor.proto | 18 +++--- google/cloud/pubsublite/v1/publisher.proto | 56 ++++++++++++++++-- google/cloud/pubsublite/v1/subscriber.proto | 23 ++++---- google/cloud/pubsublite/v1/topic_stats.proto | 16 +++-- 6 files changed, 125 insertions(+), 67 deletions(-) diff --git a/google/cloud/pubsublite/v1/admin.proto b/google/cloud/pubsublite/v1/admin.proto index b13270b96e0c3..484aa19560acb 100644 --- a/google/cloud/pubsublite/v1/admin.proto +++ b/google/cloud/pubsublite/v1/admin.proto @@ -38,7 +38,8 @@ option ruby_package = "Google::Cloud::PubSubLite::V1"; // subscriptions, such creating, listing, and deleting topics and subscriptions. service AdminService { option (google.api.default_host) = "pubsublite.googleapis.com"; - option (google.api.oauth_scopes) = "https://www.googleapis.com/auth/cloud-platform"; + option (google.api.oauth_scopes) = + "https://www.googleapis.com/auth/cloud-platform"; // Creates a new topic. rpc CreateTopic(CreateTopicRequest) returns (Topic) { @@ -91,7 +92,8 @@ service AdminService { } // Lists the subscriptions attached to the specified topic. - rpc ListTopicSubscriptions(ListTopicSubscriptionsRequest) returns (ListTopicSubscriptionsResponse) { + rpc ListTopicSubscriptions(ListTopicSubscriptionsRequest) + returns (ListTopicSubscriptionsResponse) { option (google.api.http) = { get: "/v1/admin/{name=projects/*/locations/*/topics/*}/subscriptions" }; @@ -104,7 +106,8 @@ service AdminService { post: "/v1/admin/{parent=projects/*/locations/*}/subscriptions" body: "subscription" }; - option (google.api.method_signature) = "parent,subscription,subscription_id"; + option (google.api.method_signature) = + "parent,subscription,subscription_id"; } // Returns the subscription configuration. @@ -116,7 +119,8 @@ service AdminService { } // Returns the list of subscriptions for the given project. - rpc ListSubscriptions(ListSubscriptionsRequest) returns (ListSubscriptionsResponse) { + rpc ListSubscriptions(ListSubscriptionsRequest) + returns (ListSubscriptionsResponse) { option (google.api.http) = { get: "/v1/admin/{parent=projects/*/locations/*}/subscriptions" }; @@ -133,7 +137,8 @@ service AdminService { } // Deletes the specified subscription. - rpc DeleteSubscription(DeleteSubscriptionRequest) returns (google.protobuf.Empty) { + rpc DeleteSubscription(DeleteSubscriptionRequest) + returns (google.protobuf.Empty) { option (google.api.http) = { delete: "/v1/admin/{name=projects/*/locations/*/subscriptions/*}" }; @@ -161,7 +166,8 @@ service AdminService { // // If the previous seek operation has not yet completed, it will be aborted // and the new invocation of seek will supersede it. - rpc SeekSubscription(SeekSubscriptionRequest) returns (google.longrunning.Operation) { + rpc SeekSubscription(SeekSubscriptionRequest) + returns (google.longrunning.Operation) { option (google.api.http) = { post: "/v1/admin/{name=projects/*/locations/*/subscriptions/*}:seek" body: "*" @@ -190,7 +196,8 @@ service AdminService { } // Returns the list of reservations for the given project. - rpc ListReservations(ListReservationsRequest) returns (ListReservationsResponse) { + rpc ListReservations(ListReservationsRequest) + returns (ListReservationsResponse) { option (google.api.http) = { get: "/v1/admin/{parent=projects/*/locations/*}/reservations" }; @@ -207,7 +214,8 @@ service AdminService { } // Deletes the specified reservation. - rpc DeleteReservation(DeleteReservationRequest) returns (google.protobuf.Empty) { + rpc DeleteReservation(DeleteReservationRequest) + returns (google.protobuf.Empty) { option (google.api.http) = { delete: "/v1/admin/{name=projects/*/locations/*/reservations/*}" }; @@ -215,7 +223,8 @@ service AdminService { } // Lists the topics attached to the specified reservation. - rpc ListReservationTopics(ListReservationTopicsRequest) returns (ListReservationTopicsResponse) { + rpc ListReservationTopics(ListReservationTopicsRequest) + returns (ListReservationTopicsResponse) { option (google.api.http) = { get: "/v1/admin/{name=projects/*/locations/*/reservations/*}/topics" }; @@ -234,11 +243,12 @@ message CreateTopicRequest { } ]; - // Required. Configuration of the topic to create. Its `name` field is ignored. + // Required. Configuration of the topic to create. Its `name` field is + // ignored. Topic topic = 2 [(google.api.field_behavior) = REQUIRED]; - // Required. The ID to use for the topic, which will become the final component of - // the topic's name. + // Required. The ID to use for the topic, which will become the final + // component of the topic's name. // // This value is structured like: `my-topic-name`. string topic_id = 3 [(google.api.field_behavior) = REQUIRED]; @@ -313,7 +323,8 @@ message UpdateTopicRequest { Topic topic = 1 [(google.api.field_behavior) = REQUIRED]; // Required. A mask specifying the topic fields to change. - google.protobuf.FieldMask update_mask = 2 [(google.api.field_behavior) = REQUIRED]; + google.protobuf.FieldMask update_mask = 2 + [(google.api.field_behavior) = REQUIRED]; } // Request for DeleteTopic. @@ -372,11 +383,12 @@ message CreateSubscriptionRequest { } ]; - // Required. Configuration of the subscription to create. Its `name` field is ignored. + // Required. Configuration of the subscription to create. Its `name` field is + // ignored. Subscription subscription = 2 [(google.api.field_behavior) = REQUIRED]; - // Required. The ID to use for the subscription, which will become the final component - // of the subscription's name. + // Required. The ID to use for the subscription, which will become the final + // component of the subscription's name. // // This value is structured like: `my-sub-name`. string subscription_id = 3 [(google.api.field_behavior) = REQUIRED]; @@ -440,7 +452,8 @@ message UpdateSubscriptionRequest { Subscription subscription = 1 [(google.api.field_behavior) = REQUIRED]; // Required. A mask specifying the subscription fields to change. - google.protobuf.FieldMask update_mask = 2 [(google.api.field_behavior) = REQUIRED]; + google.protobuf.FieldMask update_mask = 2 + [(google.api.field_behavior) = REQUIRED]; } // Request for DeleteSubscription. @@ -490,9 +503,7 @@ message SeekSubscriptionRequest { } // Response for SeekSubscription long running operation. -message SeekSubscriptionResponse { - -} +message SeekSubscriptionResponse {} // Metadata for long running operations. message OperationMetadata { @@ -523,11 +534,12 @@ message CreateReservationRequest { } ]; - // Required. Configuration of the reservation to create. Its `name` field is ignored. + // Required. Configuration of the reservation to create. Its `name` field is + // ignored. Reservation reservation = 2 [(google.api.field_behavior) = REQUIRED]; - // Required. The ID to use for the reservation, which will become the final component of - // the reservation's name. + // Required. The ID to use for the reservation, which will become the final + // component of the reservation's name. // // This value is structured like: `my-reservation-name`. string reservation_id = 3 [(google.api.field_behavior) = REQUIRED]; @@ -587,7 +599,8 @@ message UpdateReservationRequest { Reservation reservation = 1 [(google.api.field_behavior) = REQUIRED]; // Required. A mask specifying the reservation fields to change. - google.protobuf.FieldMask update_mask = 2 [(google.api.field_behavior) = REQUIRED]; + google.protobuf.FieldMask update_mask = 2 + [(google.api.field_behavior) = REQUIRED]; } // Request for DeleteReservation. diff --git a/google/cloud/pubsublite/v1/common.proto b/google/cloud/pubsublite/v1/common.proto index dd6d89a730396..3f08c308601b6 100644 --- a/google/cloud/pubsublite/v1/common.proto +++ b/google/cloud/pubsublite/v1/common.proto @@ -163,8 +163,8 @@ message Topic { // Structured like: // projects/{project_number}/locations/{location}/reservations/{reservation_id} string throughput_reservation = 1 [(google.api.resource_reference) = { - type: "pubsublite.googleapis.com/Reservation" - }]; + type: "pubsublite.googleapis.com/Reservation" + }]; } // The name of the topic. @@ -221,8 +221,8 @@ message Subscription { // Structured like: // projects/{project_number}/locations/{location}/topics/{topic_id} string topic = 2 [(google.api.resource_reference) = { - type: "pubsublite.googleapis.com/Topic" - }]; + type: "pubsublite.googleapis.com/Topic" + }]; // The settings for this subscription's message delivery. DeliveryConfig delivery_config = 3; @@ -265,13 +265,13 @@ message ExportConfig { // `ACTIVE` and `PAUSED` will result in an error. State desired_state = 1; - // Output only. The current state of the export, which may be different to the desired - // state due to errors. + // Output only. The current state of the export, which may be different to the + // desired state due to errors. This field is output only. State current_state = 6 [(google.api.field_behavior) = OUTPUT_ONLY]; - // Optional. The name of an optional Pub/Sub Lite topic to publish messages that can not - // be exported to the destination. For example, the message can not be - // published to the Pub/Sub service because it does not satisfy the + // Optional. The name of an optional Pub/Sub Lite topic to publish messages + // that can not be exported to the destination. For example, the message can + // not be published to the Pub/Sub service because it does not satisfy the // constraints documented at https://cloud.google.com/pubsub/docs/publisher. // // Structured like: diff --git a/google/cloud/pubsublite/v1/cursor.proto b/google/cloud/pubsublite/v1/cursor.proto index 3b8fc7d0eb78a..95a62fc36eb6c 100644 --- a/google/cloud/pubsublite/v1/cursor.proto +++ b/google/cloud/pubsublite/v1/cursor.proto @@ -36,11 +36,12 @@ option ruby_package = "Google::Cloud::PubSubLite::V1"; // progress within a topic partition for a given subscription. service CursorService { option (google.api.default_host) = "pubsublite.googleapis.com"; - option (google.api.oauth_scopes) = "https://www.googleapis.com/auth/cloud-platform"; + option (google.api.oauth_scopes) = + "https://www.googleapis.com/auth/cloud-platform"; // Establishes a stream with the server for managing committed cursors. - rpc StreamingCommitCursor(stream StreamingCommitCursorRequest) returns (stream StreamingCommitCursorResponse) { - } + rpc StreamingCommitCursor(stream StreamingCommitCursorRequest) + returns (stream StreamingCommitCursorResponse) {} // Updates the committed cursor. rpc CommitCursor(CommitCursorRequest) returns (CommitCursorResponse) { @@ -51,7 +52,8 @@ service CursorService { } // Returns all committed cursor information for a subscription. - rpc ListPartitionCursors(ListPartitionCursorsRequest) returns (ListPartitionCursorsResponse) { + rpc ListPartitionCursors(ListPartitionCursorsRequest) + returns (ListPartitionCursorsResponse) { option (google.api.http) = { get: "/v1/cursor/{parent=projects/*/locations/*/subscriptions/*}/cursors" }; @@ -72,9 +74,7 @@ message InitialCommitCursorRequest { } // Response to an InitialCommitCursorRequest. -message InitialCommitCursorResponse { - -} +message InitialCommitCursorResponse {} // Streaming request to update the committed cursor. Subsequent // SequencedCommitCursorRequests override outstanding ones. @@ -129,9 +129,7 @@ message CommitCursorRequest { } // Response for CommitCursor. -message CommitCursorResponse { - -} +message CommitCursorResponse {} // Request for ListPartitionCursors. message ListPartitionCursorsRequest { diff --git a/google/cloud/pubsublite/v1/publisher.proto b/google/cloud/pubsublite/v1/publisher.proto index 748fabcdf923e..20b9243841a00 100644 --- a/google/cloud/pubsublite/v1/publisher.proto +++ b/google/cloud/pubsublite/v1/publisher.proto @@ -35,7 +35,8 @@ option ruby_package = "Google::Cloud::PubSubLite::V1"; // to subscriber clients upon request (via the `SubscriberService`). service PublisherService { option (google.api.default_host) = "pubsublite.googleapis.com"; - option (google.api.oauth_scopes) = "https://www.googleapis.com/auth/cloud-platform"; + option (google.api.oauth_scopes) = + "https://www.googleapis.com/auth/cloud-platform"; // Establishes a stream with the server for publishing messages. Once the // stream is initialized, the client publishes messages by sending publish @@ -44,8 +45,7 @@ service PublisherService { // were sent. Note that multiple PublishRequests can be in flight // simultaneously, but they will be processed by the server in the order that // they are sent by the client on a given stream. - rpc Publish(stream PublishRequest) returns (stream PublishResponse) { - } + rpc Publish(stream PublishRequest) returns (stream PublishResponse) {} } // The first request that must be sent on a newly-opened stream. @@ -57,24 +57,68 @@ message InitialPublishRequest { // Partitions are zero indexed, so `partition` must be in the range [0, // topic.num_partitions). int64 partition = 2; + + // Unique identifier for a publisher client. If set, enables publish + // idempotency within a publisher client session. + // + // The length of this field must be exactly 16 bytes long and should be + // populated with a 128 bit uuid, generated by standard uuid algorithms like + // uuid1 or uuid4. The same identifier should be reused following + // disconnections with retryable stream errors. + bytes client_id = 3; } // Response to an InitialPublishRequest. -message InitialPublishResponse { - -} +message InitialPublishResponse {} // Request to publish messages to the topic. message MessagePublishRequest { // The messages to publish. repeated PubSubMessage messages = 1; + + // The sequence number corresponding to the first message in `messages`. + // Messages within a batch are ordered and the sequence numbers of all + // subsequent messages in the batch are assumed to be incremental. + // + // Sequence numbers are assigned at the message level and the first message + // published in a publisher client session must have a sequence number of 0. + // All messages must have contiguous sequence numbers, which uniquely identify + // the messages accepted by the publisher client. Since messages are ordered, + // the client only needs to specify the sequence number of the first message + // in a published batch. The server deduplicates messages with the same + // sequence number from the same publisher `client_id`. + int64 first_sequence_number = 2; } // Response to a MessagePublishRequest. message MessagePublishResponse { + // Cursors for a subrange of published messages. + message CursorRange { + // The cursor of the message at the start index. The cursors for remaining + // messages up to the end index (exclusive) are sequential. + Cursor start_cursor = 1; + + // Index of the message in the published batch that corresponds to the + // start cursor. Inclusive. + int32 start_index = 2; + + // Index of the last message in this range. Exclusive. + int32 end_index = 3; + } + // The cursor of the first published message in the batch. The cursors for any // remaining messages in the batch are guaranteed to be sequential. Cursor start_cursor = 1; + + // Cursors for messages published in the batch. There will exist multiple + // ranges when cursors are not contiguous within the batch. + // + // The cursor ranges may not account for all messages in the batch when + // publish idempotency is enabled. A missing range indicates that cursors + // could not be determined for messages within the range, as they were + // deduplicated and the necessary data was not available at publish time. + // These messages will have offsets when received by a subscriber. + repeated CursorRange cursor_ranges = 2; } // Request sent from the client to the server on a stream. diff --git a/google/cloud/pubsublite/v1/subscriber.proto b/google/cloud/pubsublite/v1/subscriber.proto index 3a22c77bb03ab..a02b3cf5df132 100644 --- a/google/cloud/pubsublite/v1/subscriber.proto +++ b/google/cloud/pubsublite/v1/subscriber.proto @@ -34,18 +34,19 @@ option ruby_package = "Google::Cloud::PubSubLite::V1"; // from subscriptions. service SubscriberService { option (google.api.default_host) = "pubsublite.googleapis.com"; - option (google.api.oauth_scopes) = "https://www.googleapis.com/auth/cloud-platform"; + option (google.api.oauth_scopes) = + "https://www.googleapis.com/auth/cloud-platform"; // Establishes a stream with the server for receiving messages. - rpc Subscribe(stream SubscribeRequest) returns (stream SubscribeResponse) { - } + rpc Subscribe(stream SubscribeRequest) returns (stream SubscribeResponse) {} } // The service that a subscriber client application uses to determine which // partitions it should connect to. service PartitionAssignmentService { option (google.api.default_host) = "pubsublite.googleapis.com"; - option (google.api.oauth_scopes) = "https://www.googleapis.com/auth/cloud-platform"; + option (google.api.oauth_scopes) = + "https://www.googleapis.com/auth/cloud-platform"; // Assign partitions for this client to handle for the specified subscription. // @@ -54,8 +55,8 @@ service PartitionAssignmentService { // outstanding on the stream at a time. // The client should send a PartitionAssignmentAck after updating the // partitions it is connected to to reflect the new assignment. - rpc AssignPartitions(stream PartitionAssignmentRequest) returns (stream PartitionAssignment) { - } + rpc AssignPartitions(stream PartitionAssignmentRequest) + returns (stream PartitionAssignment) {} } // The first request that must be sent on a newly-opened stream. The client must @@ -68,9 +69,9 @@ message InitialSubscribeRequest { // so `partition` must be in the range [0, topic.num_partitions). int64 partition = 2; - // Optional. Initial target location within the message backlog. If not set, messages - // will be delivered from the commit cursor for the given subscription and - // partition. + // Optional. Initial target location within the message backlog. If not set, + // messages will be delivered from the commit cursor for the given + // subscription and partition. SeekRequest initial_location = 4 [(google.api.field_behavior) = OPTIONAL]; } @@ -202,9 +203,7 @@ message PartitionAssignment { // partitions may remain unassigned for a period of time until the // client is known to be inactive, after which time the server will break the // stream. -message PartitionAssignmentAck { - -} +message PartitionAssignmentAck {} // A request on the PartitionAssignment stream. message PartitionAssignmentRequest { diff --git a/google/cloud/pubsublite/v1/topic_stats.proto b/google/cloud/pubsublite/v1/topic_stats.proto index 23d225acf549a..f6226d3e04f5d 100644 --- a/google/cloud/pubsublite/v1/topic_stats.proto +++ b/google/cloud/pubsublite/v1/topic_stats.proto @@ -34,11 +34,13 @@ option ruby_package = "Google::Cloud::PubSubLite::V1"; // This service allows users to get stats about messages in their topic. service TopicStatsService { option (google.api.default_host) = "pubsublite.googleapis.com"; - option (google.api.oauth_scopes) = "https://www.googleapis.com/auth/cloud-platform"; + option (google.api.oauth_scopes) = + "https://www.googleapis.com/auth/cloud-platform"; // Compute statistics about a range of messages in a given topic and // partition. - rpc ComputeMessageStats(ComputeMessageStatsRequest) returns (ComputeMessageStatsResponse) { + rpc ComputeMessageStats(ComputeMessageStatsRequest) + returns (ComputeMessageStatsResponse) { option (google.api.http) = { post: "/v1/topicStats/{topic=projects/*/locations/*/topics/*}:computeMessageStats" body: "*" @@ -51,7 +53,8 @@ service TopicStatsService { // greater than the offset of any message whose publish has already // been acknowledged. It is zero if there have never been messages in the // partition. - rpc ComputeHeadCursor(ComputeHeadCursorRequest) returns (ComputeHeadCursorResponse) { + rpc ComputeHeadCursor(ComputeHeadCursorRequest) + returns (ComputeHeadCursorResponse) { option (google.api.http) = { post: "/v1/topicStats/{topic=projects/*/locations/*/topics/*}:computeHeadCursor" body: "*" @@ -60,7 +63,8 @@ service TopicStatsService { // Compute the corresponding cursor for a publish or event time in a topic // partition. - rpc ComputeTimeCursor(ComputeTimeCursorRequest) returns (ComputeTimeCursorResponse) { + rpc ComputeTimeCursor(ComputeTimeCursorRequest) + returns (ComputeTimeCursorResponse) { option (google.api.http) = { post: "/v1/topicStats/{topic=projects/*/locations/*/topics/*}:computeTimeCursor" body: "*" @@ -143,8 +147,8 @@ message ComputeTimeCursorRequest { // Required. The partition for which we should compute the cursor. int64 partition = 2 [(google.api.field_behavior) = REQUIRED]; - // Required. The target publish or event time. Specifying a future time will return an - // unset cursor. + // Required. The target publish or event time. Specifying a future time will + // return an unset cursor. TimeTarget target = 3 [(google.api.field_behavior) = REQUIRED]; }