Skip to content

Commit

Permalink
feat: exactly once support v3 (#1022)
Browse files Browse the repository at this point in the history
* Added new interfaces
  * `AckReplyConsumerWithResponse`
  * `MessageReceiverWithAckResponse`
* Added `AckResponse` enum
* Changed `MessageDispatcher` and `StreamingSubscriberConnection` to use builder pattern
* Keeps track of whether exactly-once delivery is enabled for a subscription by looking at the subscription's SubscriptionProperties
* Set the minimum ack deadline to 60 secs if exactly-once is known to be turned on.
* Add new min-lease-extension parameter. If the user sets this, it overrides the auto-set param
* Changed `AckId` information for Modacks and Acks to use a new `AckRequestData` object that also includes the message future (if applicable)

Minor Updates:
* Added `Mockito` dependency for mocking in unit tests
  • Loading branch information
mmicatka committed Mar 4, 2022
1 parent 7e08361 commit 02ed621
Show file tree
Hide file tree
Showing 17 changed files with 2,490 additions and 464 deletions.
5 changes: 5 additions & 0 deletions google-cloud-pubsub/pom.xml
Expand Up @@ -95,6 +95,11 @@
</dependency>

<!-- Test dependencies -->
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
@@ -0,0 +1,25 @@
/*
* Copyright 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsub.v1;

import java.util.concurrent.Future;

public interface AckReplyConsumerWithResponse {
Future<AckResponse> ack();

Future<AckResponse> nack();
}
@@ -0,0 +1,85 @@
/*
* Copyright 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsub.v1;

import com.google.api.core.SettableApiFuture;
import java.util.Optional;

public class AckRequestData {
private final String ackId;
private final Optional<SettableApiFuture<AckResponse>> messageFuture;

protected AckRequestData(Builder builder) {
this.ackId = builder.ackId;
this.messageFuture = builder.messageFuture;
}

public String getAckId() {
return ackId;
}

public SettableApiFuture<AckResponse> getMessageFutureIfExists() {
return this.messageFuture.orElse(null);
}

public AckRequestData setResponse(AckResponse ackResponse, boolean setResponseOnSuccess) {
if (this.messageFuture.isPresent() && !this.messageFuture.get().isDone()) {
switch (ackResponse) {
case SUCCESSFUL:
if (setResponseOnSuccess) {
this.messageFuture.get().set(ackResponse);
}
break;
case INVALID:
case OTHER:
case PERMISSION_DENIED:
case FAILED_PRECONDITION:
// Non-succesful messages will get set for both acks, nacks, and modacks
this.messageFuture.get().set(ackResponse);
break;
}
}
return this;
}

public boolean hasMessageFuture() {
return this.messageFuture.isPresent();
}

public static Builder newBuilder(String ackId) {
return new Builder(ackId);
}

/** Builder of {@link AckRequestData AckRequestData}. */
protected static final class Builder {
private final String ackId;
private Optional<SettableApiFuture<AckResponse>> messageFuture = Optional.empty();

protected Builder(String ackId) {
this.ackId = ackId;
}

public Builder setMessageFuture(SettableApiFuture<AckResponse> messageFuture) {
this.messageFuture = Optional.of(messageFuture);
return this;
}

public AckRequestData build() {
return new AckRequestData(this);
}
}
}
@@ -0,0 +1,25 @@
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsub.v1;

public enum AckResponse {
PERMISSION_DENIED,
FAILED_PRECONDITION,
SUCCESSFUL,
INVALID,
OTHER
}

0 comments on commit 02ed621

Please sign in to comment.