Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Removed EnableExactlyOnceDelivery from Builders #1052

Merged
merged 4 commits into from
Mar 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 20 additions & 0 deletions google-cloud-pubsub/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- see http://www.mojohaus.org/clirr-maven-plugin/examples/ignored-differences.html -->
<differences>
<!-- These should be removed after the next major release 1.117.x -->
<difference>
<differenceType>7002</differenceType>
<className>com/google/cloud/pubsub/v1/Subscriber$Builder</className>
<method>com.google.cloud.pubsub.v1.Subscriber$Builder setExactlyOnceDeliveryEnabled(boolean)</method>
</difference>
<difference>
<differenceType>7002</differenceType>
<className>com/google/cloud/pubsub/v1/StreamingSubscriberConnection$Builder</className>
<method>com.google.cloud.pubsub.v1.StreamingSubscriberConnection$Builder setExactlyOnceDeliveryEnabled(boolean)</method>
</difference>
<difference>
<differenceType>7002</differenceType>
<className>com/google/cloud/pubsub/v1/MessageDispatcher$Builder</className>
<method>com.google.cloud.pubsub.v1.MessageDispatcher$Builder setEnableExactlyOnceDelivery(boolean)</method>
</difference>
</differences>
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,34 @@

import java.util.concurrent.Future;

/**
mmicatka marked this conversation as resolved.
Show resolved Hide resolved
* Acknowledging a message in Pub/Sub means that you are done with it, and it will not be delivered
* to this subscription again. You should avoid acknowledging messages until you have *finished*
* processing them, so that in the event of a failure, you receive the message again.
*
* <p>If exactly-once delivery is enabled on the subscription, the future returned by the ack/nack
* methods track the state of acknowledgement operation by the server. If the future completes
* successfully, the message is guaranteed NOT to be re-delivered. Otherwise, the future will
* contain an exception with more details about the failure and the message may be re-delivered.
*
* <p>If exactly-once delivery is NOT enabled on the subscription, the future returns immediately
* with an AckResponse.SUCCESS. Because re-deliveries are possible, you should ensure that your
* processing code is idempotent, as you may receive any given message more than once.
*/
public interface AckReplyConsumerWithResponse {
/**
* Acknowledges that the message has been successfully processed. The service will not send the
* message again.
*
* <p>A future representing the server response is returned
*/
Future<AckResponse> ack();

/**
* Signals that the message has not been successfully processed. The service should resend the
* message.
*
* <p>A future representing the server response is returned
*/
Future<AckResponse> nack();
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class MessageDispatcher {

private final FlowController flowController;

private AtomicBoolean enableExactlyOnceDelivery;
private AtomicBoolean exactlyOnceDeliveryEnabled = new AtomicBoolean(false);

private final Waiter messagesWaiter;

Expand Down Expand Up @@ -198,7 +198,6 @@ private MessageDispatcher(Builder builder) {

ackProcessor = builder.ackProcessor;
flowController = builder.flowController;
enableExactlyOnceDelivery = new AtomicBoolean(builder.enableExactlyOnceDelivery);
ackLatencyDistribution = builder.ackLatencyDistribution;
clock = builder.clock;
jobLock = new ReentrantLock();
Expand Down Expand Up @@ -296,13 +295,13 @@ int getMessageDeadlineSeconds() {
}

@InternalApi
void setEnableExactlyOnceDelivery(boolean enableExactlyOnceDelivery) {
// Sanity check that we are changing the enableExactlyOnceDelivery state
if (enableExactlyOnceDelivery == this.enableExactlyOnceDelivery.get()) {
void setExactlyOnceDeliveryEnabled(boolean exactlyOnceDeliveryEnabled) {
// Sanity check that we are changing the exactlyOnceDeliveryEnabled state
if (exactlyOnceDeliveryEnabled == this.exactlyOnceDeliveryEnabled.get()) {
return;
}

this.enableExactlyOnceDelivery.set(enableExactlyOnceDelivery);
this.exactlyOnceDeliveryEnabled.set(exactlyOnceDeliveryEnabled);

// If a custom value for minDurationPerAckExtension, we should respect that
if (!minDurationPerAckExtensionDefaultUsed) {
Expand All @@ -313,7 +312,7 @@ void setEnableExactlyOnceDelivery(boolean enableExactlyOnceDelivery) {
// maxDurationPerAckExtensionSeconds does not change
int possibleNewMinAckDeadlineExtensionSeconds;

if (enableExactlyOnceDelivery) {
if (exactlyOnceDeliveryEnabled) {
possibleNewMinAckDeadlineExtensionSeconds =
Math.toIntExact(
Subscriber.DEFAULT_MIN_ACK_DEADLINE_EXTENSION_EXACTLY_ONCE_DELIVERY.getSeconds());
Expand All @@ -323,7 +322,7 @@ void setEnableExactlyOnceDelivery(boolean enableExactlyOnceDelivery) {
}

// If we are not using the default maxDurationAckExtension, check if the
// minAckDeadlineExtensionExactlyOnce needs to be bounded by the set max
// minAckDeadlineExtensionExactlyOnceDelivery needs to be bounded by the set max
if (!maxDurationPerAckExtensionDefaultUsed
&& (possibleNewMinAckDeadlineExtensionSeconds > maxDurationPerAckExtensionSeconds)) {
minDurationPerAckExtensionSeconds = maxDurationPerAckExtensionSeconds;
Expand Down Expand Up @@ -580,7 +579,6 @@ public static final class Builder {

private Distribution ackLatencyDistribution;
private FlowController flowController;
private boolean enableExactlyOnceDelivery;

private Executor executor;
private ScheduledExecutorService systemExecutor;
Expand Down Expand Up @@ -641,11 +639,6 @@ public Builder setFlowController(FlowController flowController) {
return this;
}

public Builder setEnableExactlyOnceDelivery(boolean enableExactlyOnceDelivery) {
this.enableExactlyOnceDelivery = enableExactlyOnceDelivery;
return this;
}

public Builder setExecutor(Executor executor) {
this.executor = executor;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,7 @@ private StreamingSubscriberConnection(Builder builder) {
// We need to set the default stream ack deadline on the initial request, this will be
// updated by modack requests in the message dispatcher
if (builder.maxDurationPerAckExtensionDefaultUsed) {
// If the default is used, check if exactly once is enabled and set appropriately
if (builder.exactlyOnceDeliveryEnabled) {
inititalStreamAckDeadline = Subscriber.STREAM_ACK_DEADLINE_EXACTLY_ONCE_DELIVERY_DEFAULT;
} else {
inititalStreamAckDeadline = Subscriber.STREAM_ACK_DEADLINE_DEFAULT;
}
inititalStreamAckDeadline = Subscriber.STREAM_ACK_DEADLINE_DEFAULT;
} else if (builder.maxDurationPerAckExtension.compareTo(Subscriber.MIN_STREAM_ACK_DEADLINE)
< 0) {
// We will not be able to extend more than the default minimum
Expand All @@ -123,7 +118,6 @@ private StreamingSubscriberConnection(Builder builder) {

subscriberStub = builder.subscriberStub;
channelAffinity = builder.channelAffinity;
exactlyOnceDeliveryEnabled.set(builder.exactlyOnceDeliveryEnabled);

MessageDispatcher.Builder messageDispatcherBuilder;
if (builder.receiver != null) {
Expand All @@ -143,7 +137,6 @@ private StreamingSubscriberConnection(Builder builder) {
.setMaxDurationPerAckExtensionDefaultUsed(builder.maxDurationPerAckExtensionDefaultUsed)
.setAckLatencyDistribution(builder.ackLatencyDistribution)
.setFlowController(builder.flowController)
.setEnableExactlyOnceDelivery(builder.exactlyOnceDeliveryEnabled)
.setExecutor(builder.executor)
.setSystemExecutor(builder.systemExecutor)
.setApiClock(builder.clock)
Expand All @@ -159,7 +152,7 @@ public StreamingSubscriberConnection setExactlyOnceDeliveryEnabled(
return this;
}

public boolean isExactlyOnceDeliveryEnabled() {
public boolean getExactlyOnceDeliveryEnabled() {
return exactlyOnceDeliveryEnabled.get();
}

Expand Down Expand Up @@ -221,7 +214,7 @@ public void onResponse(StreamingPullResponse response) {
response.getSubscriptionProperties().getExactlyOnceDeliveryEnabled();

setExactlyOnceDeliveryEnabled(exactlyOnceDeliveryEnabledResponse);
messageDispatcher.setEnableExactlyOnceDelivery(exactlyOnceDeliveryEnabledResponse);
messageDispatcher.setExactlyOnceDeliveryEnabled(exactlyOnceDeliveryEnabledResponse);
messageDispatcher.processReceivedMessages(response.getReceivedMessagesList());

// Only request more if we're not shutdown.
Expand Down Expand Up @@ -370,7 +363,7 @@ public void setResponseOutstandingMessages(AckResponse ackResponse) {
private void setFailureFutureOutstandingMessages(Throwable t) {
AckResponse ackResponse;

if (isExactlyOnceDeliveryEnabled()) {
if (getExactlyOnceDeliveryEnabled()) {
if (!(t instanceof ApiException)) {
ackResponse = AckResponse.OTHER;
}
Expand Down Expand Up @@ -518,7 +511,7 @@ public void onFailure(Throwable t) {
// Remove from our pending operations
ackOperationsWaiter.incrementPendingCount(-1);

if (!isExactlyOnceDeliveryEnabled()) {
if (!getExactlyOnceDeliveryEnabled()) {
Level level = isAlive() ? Level.WARNING : Level.FINER;
logger.log(level, "failed to send operations", t);
return;
Expand Down Expand Up @@ -609,7 +602,6 @@ public static final class Builder {
private int channelAffinity;
private FlowController flowController;
private FlowControlSettings flowControlSettings;
private boolean exactlyOnceDeliveryEnabled;
private boolean useLegacyFlowControl;
private ScheduledExecutorService executor;
private ScheduledExecutorService systemExecutor;
Expand Down Expand Up @@ -690,11 +682,6 @@ public Builder setUseLegacyFlowControl(boolean useLegacyFlowControl) {
return this;
}

public Builder setExactlyOnceDeliveryEnabled(boolean exactlyOnceDeliveryEnabled) {
this.exactlyOnceDeliveryEnabled = exactlyOnceDeliveryEnabled;
return this;
}

public Builder setExecutor(ScheduledExecutorService executor) {
this.executor = executor;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ public class Subscriber extends AbstractApiService implements SubscriberInterfac
private SubscriberStub subscriberStub;
private final SubscriberStubSettings subStubSettings;
private final FlowController flowController;
private boolean exactlyOnceDeliveryEnabled = false;
private final int numPullers;

private final MessageReceiver receiver;
Expand Down Expand Up @@ -166,8 +165,6 @@ private Subscriber(Builder builder) {
.setLimitExceededBehavior(LimitExceededBehavior.Block)
.build());

exactlyOnceDeliveryEnabled = builder.exactlyOnceDeliveryEnabled;

this.numPullers = builder.parallelPullCount;

executorProvider = builder.executorProvider;
Expand Down Expand Up @@ -385,7 +382,6 @@ private void startStreamingConnections() {
.setExecutor(executor)
.setSystemExecutor(alarmsExecutor)
.setClock(clock)
.setExactlyOnceDeliveryEnabled(exactlyOnceDeliveryEnabled)
.build();

streamingSubscriberConnections.add(streamingSubscriberConnection);
Expand Down Expand Up @@ -479,8 +475,6 @@ public static final class Builder {
private boolean useLegacyFlowControl = false;
private FlowControlSettings flowControlSettings = DEFAULT_FLOW_CONTROL_SETTINGS;

private boolean exactlyOnceDeliveryEnabled = false;

private ExecutorProvider executorProvider = DEFAULT_EXECUTOR_PROVIDER;
private ExecutorProvider systemExecutorProvider = null;
private TransportChannelProvider channelProvider =
Expand Down Expand Up @@ -573,22 +567,6 @@ public Builder setUseLegacyFlowControl(boolean value) {
return this;
}

/**
* Enables/Disabled ExactlyOnceDelivery
*
* <p>Will update the minDurationPerAckExtension if a user-provided value is not set
*/
public Builder setExactlyOnceDeliveryEnabled(boolean exactlyOnceDeliveryEnabled) {
// If exactlyOnceDeliveryIsEnabled we want to update the default minAckDeadlineExtension if
// applicable
if (exactlyOnceDeliveryEnabled && this.minDurationPerAckExtensionDefaultUsed) {
this.minDurationPerAckExtension = DEFAULT_MIN_ACK_DEADLINE_EXTENSION_EXACTLY_ONCE_DELIVERY;
}

this.exactlyOnceDeliveryEnabled = exactlyOnceDeliveryEnabled;
return this;
}

/**
* Set the maximum period a message ack deadline will be extended. Defaults to one hour.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,6 @@ public void receiveMessage(
receiveQueue.offer(MessageAndConsumer.create(message, consumer));
}
})
.setExactlyOnceDeliveryEnabled(true)
.build();
subscriber.addListener(
new Subscriber.Listener() {
Expand Down Expand Up @@ -282,7 +281,6 @@ public void receiveMessage(
SubscriptionAdminSettings.defaultGrpcTransportProviderBuilder()
.setMaxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE)
.build())
.setExactlyOnceDeliveryEnabled(false)
.build();
subscriber.addListener(
new Subscriber.Listener() {
Expand Down Expand Up @@ -360,7 +358,6 @@ public void receiveMessage(
SubscriptionAdminSettings.defaultGrpcTransportProviderBuilder()
.setMaxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE)
.build())
.setExactlyOnceDeliveryEnabled(false)
.build();
subscriber.addListener(
new Subscriber.Listener() {
Expand Down