Skip to content

Commit

Permalink
feat: API for publish idempotence
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 507561467
  • Loading branch information
Google APIs authored and Copybara-Service committed Feb 6, 2023
1 parent 5677c20 commit 8bf4545
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 67 deletions.
61 changes: 37 additions & 24 deletions google/cloud/pubsublite/v1/admin.proto
Expand Up @@ -38,7 +38,8 @@ option ruby_package = "Google::Cloud::PubSubLite::V1";
// subscriptions, such creating, listing, and deleting topics and subscriptions.
service AdminService {
option (google.api.default_host) = "pubsublite.googleapis.com";
option (google.api.oauth_scopes) = "https://www.googleapis.com/auth/cloud-platform";
option (google.api.oauth_scopes) =
"https://www.googleapis.com/auth/cloud-platform";

// Creates a new topic.
rpc CreateTopic(CreateTopicRequest) returns (Topic) {
Expand Down Expand Up @@ -91,7 +92,8 @@ service AdminService {
}

// Lists the subscriptions attached to the specified topic.
rpc ListTopicSubscriptions(ListTopicSubscriptionsRequest) returns (ListTopicSubscriptionsResponse) {
rpc ListTopicSubscriptions(ListTopicSubscriptionsRequest)
returns (ListTopicSubscriptionsResponse) {
option (google.api.http) = {
get: "/v1/admin/{name=projects/*/locations/*/topics/*}/subscriptions"
};
Expand All @@ -104,7 +106,8 @@ service AdminService {
post: "/v1/admin/{parent=projects/*/locations/*}/subscriptions"
body: "subscription"
};
option (google.api.method_signature) = "parent,subscription,subscription_id";
option (google.api.method_signature) =
"parent,subscription,subscription_id";
}

// Returns the subscription configuration.
Expand All @@ -116,7 +119,8 @@ service AdminService {
}

// Returns the list of subscriptions for the given project.
rpc ListSubscriptions(ListSubscriptionsRequest) returns (ListSubscriptionsResponse) {
rpc ListSubscriptions(ListSubscriptionsRequest)
returns (ListSubscriptionsResponse) {
option (google.api.http) = {
get: "/v1/admin/{parent=projects/*/locations/*}/subscriptions"
};
Expand All @@ -133,7 +137,8 @@ service AdminService {
}

// Deletes the specified subscription.
rpc DeleteSubscription(DeleteSubscriptionRequest) returns (google.protobuf.Empty) {
rpc DeleteSubscription(DeleteSubscriptionRequest)
returns (google.protobuf.Empty) {
option (google.api.http) = {
delete: "/v1/admin/{name=projects/*/locations/*/subscriptions/*}"
};
Expand Down Expand Up @@ -161,7 +166,8 @@ service AdminService {
//
// If the previous seek operation has not yet completed, it will be aborted
// and the new invocation of seek will supersede it.
rpc SeekSubscription(SeekSubscriptionRequest) returns (google.longrunning.Operation) {
rpc SeekSubscription(SeekSubscriptionRequest)
returns (google.longrunning.Operation) {
option (google.api.http) = {
post: "/v1/admin/{name=projects/*/locations/*/subscriptions/*}:seek"
body: "*"
Expand Down Expand Up @@ -190,7 +196,8 @@ service AdminService {
}

// Returns the list of reservations for the given project.
rpc ListReservations(ListReservationsRequest) returns (ListReservationsResponse) {
rpc ListReservations(ListReservationsRequest)
returns (ListReservationsResponse) {
option (google.api.http) = {
get: "/v1/admin/{parent=projects/*/locations/*}/reservations"
};
Expand All @@ -207,15 +214,17 @@ service AdminService {
}

// Deletes the specified reservation.
rpc DeleteReservation(DeleteReservationRequest) returns (google.protobuf.Empty) {
rpc DeleteReservation(DeleteReservationRequest)
returns (google.protobuf.Empty) {
option (google.api.http) = {
delete: "/v1/admin/{name=projects/*/locations/*/reservations/*}"
};
option (google.api.method_signature) = "name";
}

// Lists the topics attached to the specified reservation.
rpc ListReservationTopics(ListReservationTopicsRequest) returns (ListReservationTopicsResponse) {
rpc ListReservationTopics(ListReservationTopicsRequest)
returns (ListReservationTopicsResponse) {
option (google.api.http) = {
get: "/v1/admin/{name=projects/*/locations/*/reservations/*}/topics"
};
Expand All @@ -234,11 +243,12 @@ message CreateTopicRequest {
}
];

// Required. Configuration of the topic to create. Its `name` field is ignored.
// Required. Configuration of the topic to create. Its `name` field is
// ignored.
Topic topic = 2 [(google.api.field_behavior) = REQUIRED];

// Required. The ID to use for the topic, which will become the final component of
// the topic's name.
// Required. The ID to use for the topic, which will become the final
// component of the topic's name.
//
// This value is structured like: `my-topic-name`.
string topic_id = 3 [(google.api.field_behavior) = REQUIRED];
Expand Down Expand Up @@ -313,7 +323,8 @@ message UpdateTopicRequest {
Topic topic = 1 [(google.api.field_behavior) = REQUIRED];

// Required. A mask specifying the topic fields to change.
google.protobuf.FieldMask update_mask = 2 [(google.api.field_behavior) = REQUIRED];
google.protobuf.FieldMask update_mask = 2
[(google.api.field_behavior) = REQUIRED];
}

// Request for DeleteTopic.
Expand Down Expand Up @@ -372,11 +383,12 @@ message CreateSubscriptionRequest {
}
];

// Required. Configuration of the subscription to create. Its `name` field is ignored.
// Required. Configuration of the subscription to create. Its `name` field is
// ignored.
Subscription subscription = 2 [(google.api.field_behavior) = REQUIRED];

// Required. The ID to use for the subscription, which will become the final component
// of the subscription's name.
// Required. The ID to use for the subscription, which will become the final
// component of the subscription's name.
//
// This value is structured like: `my-sub-name`.
string subscription_id = 3 [(google.api.field_behavior) = REQUIRED];
Expand Down Expand Up @@ -440,7 +452,8 @@ message UpdateSubscriptionRequest {
Subscription subscription = 1 [(google.api.field_behavior) = REQUIRED];

// Required. A mask specifying the subscription fields to change.
google.protobuf.FieldMask update_mask = 2 [(google.api.field_behavior) = REQUIRED];
google.protobuf.FieldMask update_mask = 2
[(google.api.field_behavior) = REQUIRED];
}

// Request for DeleteSubscription.
Expand Down Expand Up @@ -490,9 +503,7 @@ message SeekSubscriptionRequest {
}

// Response for SeekSubscription long running operation.
message SeekSubscriptionResponse {

}
message SeekSubscriptionResponse {}

// Metadata for long running operations.
message OperationMetadata {
Expand Down Expand Up @@ -523,11 +534,12 @@ message CreateReservationRequest {
}
];

// Required. Configuration of the reservation to create. Its `name` field is ignored.
// Required. Configuration of the reservation to create. Its `name` field is
// ignored.
Reservation reservation = 2 [(google.api.field_behavior) = REQUIRED];

// Required. The ID to use for the reservation, which will become the final component of
// the reservation's name.
// Required. The ID to use for the reservation, which will become the final
// component of the reservation's name.
//
// This value is structured like: `my-reservation-name`.
string reservation_id = 3 [(google.api.field_behavior) = REQUIRED];
Expand Down Expand Up @@ -587,7 +599,8 @@ message UpdateReservationRequest {
Reservation reservation = 1 [(google.api.field_behavior) = REQUIRED];

// Required. A mask specifying the reservation fields to change.
google.protobuf.FieldMask update_mask = 2 [(google.api.field_behavior) = REQUIRED];
google.protobuf.FieldMask update_mask = 2
[(google.api.field_behavior) = REQUIRED];
}

// Request for DeleteReservation.
Expand Down
18 changes: 9 additions & 9 deletions google/cloud/pubsublite/v1/common.proto
Expand Up @@ -163,8 +163,8 @@ message Topic {
// Structured like:
// projects/{project_number}/locations/{location}/reservations/{reservation_id}
string throughput_reservation = 1 [(google.api.resource_reference) = {
type: "pubsublite.googleapis.com/Reservation"
}];
type: "pubsublite.googleapis.com/Reservation"
}];
}

// The name of the topic.
Expand Down Expand Up @@ -221,8 +221,8 @@ message Subscription {
// Structured like:
// projects/{project_number}/locations/{location}/topics/{topic_id}
string topic = 2 [(google.api.resource_reference) = {
type: "pubsublite.googleapis.com/Topic"
}];
type: "pubsublite.googleapis.com/Topic"
}];

// The settings for this subscription's message delivery.
DeliveryConfig delivery_config = 3;
Expand Down Expand Up @@ -265,13 +265,13 @@ message ExportConfig {
// `ACTIVE` and `PAUSED` will result in an error.
State desired_state = 1;

// Output only. The current state of the export, which may be different to the desired
// state due to errors.
// Output only. The current state of the export, which may be different to the
// desired state due to errors. This field is output only.
State current_state = 6 [(google.api.field_behavior) = OUTPUT_ONLY];

// Optional. The name of an optional Pub/Sub Lite topic to publish messages that can not
// be exported to the destination. For example, the message can not be
// published to the Pub/Sub service because it does not satisfy the
// Optional. The name of an optional Pub/Sub Lite topic to publish messages
// that can not be exported to the destination. For example, the message can
// not be published to the Pub/Sub service because it does not satisfy the
// constraints documented at https://cloud.google.com/pubsub/docs/publisher.
//
// Structured like:
Expand Down
18 changes: 8 additions & 10 deletions google/cloud/pubsublite/v1/cursor.proto
Expand Up @@ -36,11 +36,12 @@ option ruby_package = "Google::Cloud::PubSubLite::V1";
// progress within a topic partition for a given subscription.
service CursorService {
option (google.api.default_host) = "pubsublite.googleapis.com";
option (google.api.oauth_scopes) = "https://www.googleapis.com/auth/cloud-platform";
option (google.api.oauth_scopes) =
"https://www.googleapis.com/auth/cloud-platform";

// Establishes a stream with the server for managing committed cursors.
rpc StreamingCommitCursor(stream StreamingCommitCursorRequest) returns (stream StreamingCommitCursorResponse) {
}
rpc StreamingCommitCursor(stream StreamingCommitCursorRequest)
returns (stream StreamingCommitCursorResponse) {}

// Updates the committed cursor.
rpc CommitCursor(CommitCursorRequest) returns (CommitCursorResponse) {
Expand All @@ -51,7 +52,8 @@ service CursorService {
}

// Returns all committed cursor information for a subscription.
rpc ListPartitionCursors(ListPartitionCursorsRequest) returns (ListPartitionCursorsResponse) {
rpc ListPartitionCursors(ListPartitionCursorsRequest)
returns (ListPartitionCursorsResponse) {
option (google.api.http) = {
get: "/v1/cursor/{parent=projects/*/locations/*/subscriptions/*}/cursors"
};
Expand All @@ -72,9 +74,7 @@ message InitialCommitCursorRequest {
}

// Response to an InitialCommitCursorRequest.
message InitialCommitCursorResponse {

}
message InitialCommitCursorResponse {}

// Streaming request to update the committed cursor. Subsequent
// SequencedCommitCursorRequests override outstanding ones.
Expand Down Expand Up @@ -129,9 +129,7 @@ message CommitCursorRequest {
}

// Response for CommitCursor.
message CommitCursorResponse {

}
message CommitCursorResponse {}

// Request for ListPartitionCursors.
message ListPartitionCursorsRequest {
Expand Down
56 changes: 50 additions & 6 deletions google/cloud/pubsublite/v1/publisher.proto
Expand Up @@ -35,7 +35,8 @@ option ruby_package = "Google::Cloud::PubSubLite::V1";
// to subscriber clients upon request (via the `SubscriberService`).
service PublisherService {
option (google.api.default_host) = "pubsublite.googleapis.com";
option (google.api.oauth_scopes) = "https://www.googleapis.com/auth/cloud-platform";
option (google.api.oauth_scopes) =
"https://www.googleapis.com/auth/cloud-platform";

// Establishes a stream with the server for publishing messages. Once the
// stream is initialized, the client publishes messages by sending publish
Expand All @@ -44,8 +45,7 @@ service PublisherService {
// were sent. Note that multiple PublishRequests can be in flight
// simultaneously, but they will be processed by the server in the order that
// they are sent by the client on a given stream.
rpc Publish(stream PublishRequest) returns (stream PublishResponse) {
}
rpc Publish(stream PublishRequest) returns (stream PublishResponse) {}
}

// The first request that must be sent on a newly-opened stream.
Expand All @@ -57,24 +57,68 @@ message InitialPublishRequest {
// Partitions are zero indexed, so `partition` must be in the range [0,
// topic.num_partitions).
int64 partition = 2;

// Unique identifier for a publisher client. If set, enables publish
// idempotency within a publisher client session.
//
// The length of this field must be exactly 16 bytes long and should be
// populated with a 128 bit uuid, generated by standard uuid algorithms like
// uuid1 or uuid4. The same identifier should be reused following
// disconnections with retryable stream errors.
bytes client_id = 3;
}

// Response to an InitialPublishRequest.
message InitialPublishResponse {

}
message InitialPublishResponse {}

// Request to publish messages to the topic.
message MessagePublishRequest {
// The messages to publish.
repeated PubSubMessage messages = 1;

// The sequence number corresponding to the first message in `messages`.
// Messages within a batch are ordered and the sequence numbers of all
// subsequent messages in the batch are assumed to be incremental.
//
// Sequence numbers are assigned at the message level and the first message
// published in a publisher client session must have a sequence number of 0.
// All messages must have contiguous sequence numbers, which uniquely identify
// the messages accepted by the publisher client. Since messages are ordered,
// the client only needs to specify the sequence number of the first message
// in a published batch. The server deduplicates messages with the same
// sequence number from the same publisher `client_id`.
int64 first_sequence_number = 2;
}

// Response to a MessagePublishRequest.
message MessagePublishResponse {
// Cursors for a subrange of published messages.
message CursorRange {
// The cursor of the message at the start index. The cursors for remaining
// messages up to the end index (exclusive) are sequential.
Cursor start_cursor = 1;

// Index of the message in the published batch that corresponds to the
// start cursor. Inclusive.
int32 start_index = 2;

// Index of the last message in this range. Exclusive.
int32 end_index = 3;
}

// The cursor of the first published message in the batch. The cursors for any
// remaining messages in the batch are guaranteed to be sequential.
Cursor start_cursor = 1;

// Cursors for messages published in the batch. There will exist multiple
// ranges when cursors are not contiguous within the batch.
//
// The cursor ranges may not account for all messages in the batch when
// publish idempotency is enabled. A missing range indicates that cursors
// could not be determined for messages within the range, as they were
// deduplicated and the necessary data was not available at publish time.
// These messages will have offsets when received by a subscriber.
repeated CursorRange cursor_ranges = 2;
}

// Request sent from the client to the server on a stream.
Expand Down

0 comments on commit 8bf4545

Please sign in to comment.