Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Removed EnableExactlyOnceDelivery from Builders #1052

Merged
merged 4 commits into from
Mar 9, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,23 @@

import java.util.concurrent.Future;

/**
mmicatka marked this conversation as resolved.
Show resolved Hide resolved
* Accepts a reply, sending it to the service. A future is returned representing the server response
*/
public interface AckReplyConsumerWithResponse {
/**
* Acknowledges that the message has been successfully processed. The service will not send the
* message again.
*
* <p>A future representing the server response is returned
*/
Future<AckResponse> ack();

/**
* Signals that the message has not been successfully processed. The service should resend the
* message.
*
* <p>A future representing the server response is returned
*/
Future<AckResponse> nack();
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class MessageDispatcher {

private final FlowController flowController;

private AtomicBoolean enableExactlyOnceDelivery;
private AtomicBoolean enableExactlyOnceDelivery = new AtomicBoolean(false);

private final Waiter messagesWaiter;

Expand Down Expand Up @@ -198,7 +198,6 @@ private MessageDispatcher(Builder builder) {

ackProcessor = builder.ackProcessor;
flowController = builder.flowController;
enableExactlyOnceDelivery = new AtomicBoolean(builder.enableExactlyOnceDelivery);
ackLatencyDistribution = builder.ackLatencyDistribution;
clock = builder.clock;
jobLock = new ReentrantLock();
Expand Down Expand Up @@ -580,7 +579,6 @@ public static final class Builder {

private Distribution ackLatencyDistribution;
private FlowController flowController;
private boolean enableExactlyOnceDelivery;

private Executor executor;
private ScheduledExecutorService systemExecutor;
Expand Down Expand Up @@ -641,11 +639,6 @@ public Builder setFlowController(FlowController flowController) {
return this;
}

public Builder setEnableExactlyOnceDelivery(boolean enableExactlyOnceDelivery) {
this.enableExactlyOnceDelivery = enableExactlyOnceDelivery;
return this;
}

public Builder setExecutor(Executor executor) {
this.executor = executor;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,7 @@ private StreamingSubscriberConnection(Builder builder) {
// We need to set the default stream ack deadline on the initial request, this will be
// updated by modack requests in the message dispatcher
if (builder.maxDurationPerAckExtensionDefaultUsed) {
// If the default is used, check if exactly once is enabled and set appropriately
if (builder.exactlyOnceDeliveryEnabled) {
inititalStreamAckDeadline = Subscriber.STREAM_ACK_DEADLINE_EXACTLY_ONCE_DELIVERY_DEFAULT;
} else {
inititalStreamAckDeadline = Subscriber.STREAM_ACK_DEADLINE_DEFAULT;
}
inititalStreamAckDeadline = Subscriber.STREAM_ACK_DEADLINE_DEFAULT;
} else if (builder.maxDurationPerAckExtension.compareTo(Subscriber.MIN_STREAM_ACK_DEADLINE)
< 0) {
// We will not be able to extend more than the default minimum
Expand All @@ -123,7 +118,6 @@ private StreamingSubscriberConnection(Builder builder) {

subscriberStub = builder.subscriberStub;
channelAffinity = builder.channelAffinity;
exactlyOnceDeliveryEnabled.set(builder.exactlyOnceDeliveryEnabled);

MessageDispatcher.Builder messageDispatcherBuilder;
if (builder.receiver != null) {
Expand All @@ -143,7 +137,6 @@ private StreamingSubscriberConnection(Builder builder) {
.setMaxDurationPerAckExtensionDefaultUsed(builder.maxDurationPerAckExtensionDefaultUsed)
.setAckLatencyDistribution(builder.ackLatencyDistribution)
.setFlowController(builder.flowController)
.setEnableExactlyOnceDelivery(builder.exactlyOnceDeliveryEnabled)
.setExecutor(builder.executor)
.setSystemExecutor(builder.systemExecutor)
.setApiClock(builder.clock)
Expand Down Expand Up @@ -609,7 +602,6 @@ public static final class Builder {
private int channelAffinity;
private FlowController flowController;
private FlowControlSettings flowControlSettings;
private boolean exactlyOnceDeliveryEnabled;
private boolean useLegacyFlowControl;
private ScheduledExecutorService executor;
private ScheduledExecutorService systemExecutor;
Expand Down Expand Up @@ -690,11 +682,6 @@ public Builder setUseLegacyFlowControl(boolean useLegacyFlowControl) {
return this;
}

public Builder setExactlyOnceDeliveryEnabled(boolean exactlyOnceDeliveryEnabled) {
this.exactlyOnceDeliveryEnabled = exactlyOnceDeliveryEnabled;
return this;
}

public Builder setExecutor(ScheduledExecutorService executor) {
this.executor = executor;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ public class Subscriber extends AbstractApiService implements SubscriberInterfac
private SubscriberStub subscriberStub;
private final SubscriberStubSettings subStubSettings;
private final FlowController flowController;
private boolean exactlyOnceDeliveryEnabled = false;
private final int numPullers;

private final MessageReceiver receiver;
Expand Down Expand Up @@ -166,8 +165,6 @@ private Subscriber(Builder builder) {
.setLimitExceededBehavior(LimitExceededBehavior.Block)
.build());

exactlyOnceDeliveryEnabled = builder.exactlyOnceDeliveryEnabled;

this.numPullers = builder.parallelPullCount;

executorProvider = builder.executorProvider;
Expand Down Expand Up @@ -385,7 +382,6 @@ private void startStreamingConnections() {
.setExecutor(executor)
.setSystemExecutor(alarmsExecutor)
.setClock(clock)
.setExactlyOnceDeliveryEnabled(exactlyOnceDeliveryEnabled)
.build();

streamingSubscriberConnections.add(streamingSubscriberConnection);
Expand Down Expand Up @@ -479,8 +475,6 @@ public static final class Builder {
private boolean useLegacyFlowControl = false;
private FlowControlSettings flowControlSettings = DEFAULT_FLOW_CONTROL_SETTINGS;

private boolean exactlyOnceDeliveryEnabled = false;

private ExecutorProvider executorProvider = DEFAULT_EXECUTOR_PROVIDER;
private ExecutorProvider systemExecutorProvider = null;
private TransportChannelProvider channelProvider =
Expand Down Expand Up @@ -573,22 +567,6 @@ public Builder setUseLegacyFlowControl(boolean value) {
return this;
}

/**
* Enables/Disabled ExactlyOnceDelivery
*
* <p>Will update the minDurationPerAckExtension if a user-provided value is not set
*/
public Builder setExactlyOnceDeliveryEnabled(boolean exactlyOnceDeliveryEnabled) {
// If exactlyOnceDeliveryIsEnabled we want to update the default minAckDeadlineExtension if
// applicable
if (exactlyOnceDeliveryEnabled && this.minDurationPerAckExtensionDefaultUsed) {
this.minDurationPerAckExtension = DEFAULT_MIN_ACK_DEADLINE_EXTENSION_EXACTLY_ONCE_DELIVERY;
}

this.exactlyOnceDeliveryEnabled = exactlyOnceDeliveryEnabled;
return this;
}

/**
* Set the maximum period a message ack deadline will be extended. Defaults to one hour.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,6 @@ public void receiveMessage(
receiveQueue.offer(MessageAndConsumer.create(message, consumer));
}
})
.setExactlyOnceDeliveryEnabled(true)
.build();
subscriber.addListener(
new Subscriber.Listener() {
Expand Down Expand Up @@ -282,7 +281,6 @@ public void receiveMessage(
SubscriptionAdminSettings.defaultGrpcTransportProviderBuilder()
.setMaxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE)
.build())
.setExactlyOnceDeliveryEnabled(false)
.build();
subscriber.addListener(
new Subscriber.Listener() {
Expand Down Expand Up @@ -360,7 +358,6 @@ public void receiveMessage(
SubscriptionAdminSettings.defaultGrpcTransportProviderBuilder()
.setMaxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE)
.build())
.setExactlyOnceDeliveryEnabled(false)
.build();
subscriber.addListener(
new Subscriber.Listener() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,6 @@ public void testExtension_GiveUp() throws Exception {

@Test
public void testAckExtensionDefaultsExactlyOnceDeliveryOffThenOn() {
// EnableExactlyOnceDelivery is turned off by default
MessageDispatcher messageDispatcher =
MessageDispatcher.newBuilder(mock(MessageReceiver.class))
.setAckLatencyDistribution(mockAckLatencyDistribution)
Expand All @@ -348,12 +347,16 @@ public void testAckExtensionDefaultsExactlyOnceDeliveryOffThenOn() {
.setMaxDurationPerAckExtensionDefaultUsed(true)
.build();

// EnableExactlyOnceDelivery is turned off by default

// We should be using the Subscriber set hard deadlines
assertMinAndMaxAckDeadlines(
messageDispatcher,
Math.toIntExact(Subscriber.MIN_STREAM_ACK_DEADLINE.getSeconds()),
Math.toIntExact(Subscriber.MAX_STREAM_ACK_DEADLINE.getSeconds()));

// This would normally be set from the streaming pull response in the
// StreamingSubscriberConnection
messageDispatcher.setEnableExactlyOnceDelivery(true);

// Should only change min deadline
Expand All @@ -369,20 +372,25 @@ public void testAckExtensionDefaultsExactlyOnceDeliveryOnThenOff() {
MessageDispatcher messageDispatcher =
MessageDispatcher.newBuilder(mock(MessageReceiver.class))
.setAckLatencyDistribution(mockAckLatencyDistribution)
.setEnableExactlyOnceDelivery(true)
.setMinDurationPerAckExtension(
Subscriber.DEFAULT_MIN_ACK_DEADLINE_EXTENSION_EXACTLY_ONCE_DELIVERY)
.setMinDurationPerAckExtensionDefaultUsed(true)
.setMaxDurationPerAckExtension(Subscriber.DEFAULT_MIN_ACK_DEADLINE_EXTENSION)
.setMaxDurationPerAckExtensionDefaultUsed(true)
.build();

// This would normally be set from the streaming pull response in the
// StreamingSubscriberConnection
messageDispatcher.setEnableExactlyOnceDelivery(true);

assertMinAndMaxAckDeadlines(
messageDispatcher,
Math.toIntExact(
Subscriber.DEFAULT_MIN_ACK_DEADLINE_EXTENSION_EXACTLY_ONCE_DELIVERY.getSeconds()),
Math.toIntExact(Subscriber.MAX_STREAM_ACK_DEADLINE.getSeconds()));

// This would normally be set from the streaming pull response in the
// StreamingSubscriberConnection
messageDispatcher.setEnableExactlyOnceDelivery(false);

// Should change min deadline
Expand All @@ -404,11 +412,14 @@ public void testAckExtensionCustomMinExactlyOnceDeliveryOffThenOn() {
.setMaxDurationPerAckExtensionDefaultUsed(true)
.build();

// EnableExactlyOnceDelivery is turned off by default
assertMinAndMaxAckDeadlines(
messageDispatcher,
customMinSeconds,
Math.toIntExact(Subscriber.MAX_STREAM_ACK_DEADLINE.getSeconds()));

// This would normally be set from the streaming pull response in the
// StreamingSubscriberConnection
messageDispatcher.setEnableExactlyOnceDelivery(true);

// no changes should occur
Expand All @@ -430,11 +441,14 @@ public void testAckExtensionCustomMaxExactlyOnceDeliveryOffThenOn() {
.setMaxDurationPerAckExtensionDefaultUsed(false)
.build();

// EnableExactlyOnceDelivery is turned off by default
assertMinAndMaxAckDeadlines(
messageDispatcher,
Math.toIntExact(Subscriber.MIN_STREAM_ACK_DEADLINE.getSeconds()),
customMaxSeconds);

// This would normally be set from the streaming pull response in the
// StreamingSubscriberConnection
messageDispatcher.setEnableExactlyOnceDelivery(true);

// Because the customMaxSeconds is above the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -452,13 +452,18 @@ public void testSetFailureResponseOutstandingMessages() {

private StreamingSubscriberConnection getStreamingSubscriberConnection(
boolean exactlyOnceDeliveryEnabled) {
return getStreamingSubscriberReceiverFromBuilder(
StreamingSubscriberConnection.newBuilder(mock(MessageReceiverWithAckResponse.class)),
exactlyOnceDeliveryEnabled);
StreamingSubscriberConnection streamingSubscriberConnection =
getStreamingSubscriberConnectionFromBuilder(
StreamingSubscriberConnection.newBuilder(mock(MessageReceiverWithAckResponse.class)));

// This would normally be set from the streaming pull response
streamingSubscriberConnection.setExactlyOnceDeliveryEnabled(exactlyOnceDeliveryEnabled);

return streamingSubscriberConnection;
}

private StreamingSubscriberConnection getStreamingSubscriberReceiverFromBuilder(
StreamingSubscriberConnection.Builder builder, boolean exactlyOnceDeliveryEnabled) {
private StreamingSubscriberConnection getStreamingSubscriberConnectionFromBuilder(
StreamingSubscriberConnection.Builder builder) {
return builder
.setSubscription(MOCK_SUBSCRIPTION_NAME)
.setAckExpirationPadding(ACK_EXPIRATION_PADDING_DEFAULT)
Expand All @@ -474,7 +479,6 @@ private StreamingSubscriberConnection getStreamingSubscriberReceiverFromBuilder(
.setMinDurationPerAckExtensionDefaultUsed(true)
.setMaxDurationPerAckExtension(Subscriber.DEFAULT_MAX_ACK_DEADLINE_EXTENSION)
.setMaxDurationPerAckExtensionDefaultUsed(true)
.setExactlyOnceDeliveryEnabled(exactlyOnceDeliveryEnabled)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,16 +287,6 @@ public void testStreamAckDeadlineIsSetCorrectly() throws Exception {
fakeSubscriberServiceImpl.getLastSeenRequest().getStreamAckDeadlineSeconds());

subscriber.stopAsync().awaitTerminated();

// maxDurationPerAckExtension is unset with exactly once enabled
subscriber =
startSubscriber(getTestSubscriberBuilder(testReceiver).setExactlyOnceDeliveryEnabled(true));
assertEquals(
expectedChannelCount, fakeSubscriberServiceImpl.waitForOpenedStreams(expectedChannelCount));
assertEquals(
Math.toIntExact(Subscriber.STREAM_ACK_DEADLINE_EXACTLY_ONCE_DELIVERY_DEFAULT.getSeconds()),
fakeSubscriberServiceImpl.getLastSeenRequest().getStreamAckDeadlineSeconds());
subscriber.stopAsync().awaitTerminated();
}

@Test
Expand Down Expand Up @@ -358,7 +348,6 @@ private Builder getTestSubscriberBuilder(
.setCredentialsProvider(NoCredentialsProvider.create())
.setClock(fakeExecutor.getClock())
.setParallelPullCount(1)
.setExactlyOnceDeliveryEnabled(true)
.setFlowControlSettings(
FlowControlSettings.newBuilder().setMaxOutstandingElementCount(1000L).build());
}
Expand Down