Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Too many leases #1135

Merged
merged 3 commits into from
May 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -431,9 +431,9 @@ private void sendModackOperations(
// Send modacks
int pendingOperations = 0;
for (ModackRequestData modackRequestData : modackRequestDataList) {
List<String> ackIdsInRequest = new ArrayList<>();
for (List<AckRequestData> ackRequestDataInRequestList :
Lists.partition(modackRequestData.getAckRequestData(), MAX_PER_REQUEST_CHANGES)) {
List<String> ackIdsInRequest = new ArrayList<>();
for (AckRequestData ackRequestData : ackRequestDataInRequestList) {
ackIdsInRequest.add(ackRequestData.getAckId());
if (ackRequestData.hasMessageFuture()) {
Expand Down Expand Up @@ -511,9 +511,10 @@ public void onFailure(Throwable t) {
// Remove from our pending operations
ackOperationsWaiter.incrementPendingCount(-1);

Level level = isAlive() ? Level.WARNING : Level.FINER;
logger.log(level, "failed to send operations", t);

if (!getExactlyOnceDeliveryEnabled()) {
Level level = isAlive() ? Level.WARNING : Level.FINER;
logger.log(level, "failed to send operations", t);
return;
}

Expand Down Expand Up @@ -578,9 +579,6 @@ public void run() {
currentBackoffMillis,
TimeUnit.MILLISECONDS);
}

Level level = isAlive() ? Level.WARNING : Level.FINER;
logger.log(level, "failed to send operations", t);
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.api.gax.core.Distribution;
import com.google.api.gax.rpc.StatusCode;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.common.collect.Lists;
import com.google.protobuf.Any;
import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.ModifyAckDeadlineRequest;
Expand Down Expand Up @@ -71,8 +72,8 @@ public class StreamingSubscriberConnectionTest {
"TRANSIENT_FAILURE_SERVICE_UNAVAILABLE";
private static final String PERMANENT_FAILURE_OTHER = "I_DO_NOT_MATCH_ANY_KNOWN_ERRORS";

private static int MOCK_ACK_EXTENSION_DEFAULT = 10;
private static Duration ACK_EXPIRATION_PADDING_DEFAULT = Duration.ofSeconds(10);
private static int MOCK_ACK_EXTENSION_DEFAULT_SECONDS = 10;
private static Duration ACK_EXPIRATION_PADDING_DEFAULT_DURATION = Duration.ofSeconds(10);
private static int MAX_DURATION_PER_ACK_EXTENSION_DEFAULT_SECONDS = 10;

@Before
Expand Down Expand Up @@ -105,7 +106,8 @@ public void testSendAckOperationsExactlyOnceDisabledNoMessageFutures() {

ModackRequestData modackRequestDataSuccess =
new ModackRequestData(
MOCK_ACK_EXTENSION_DEFAULT, AckRequestData.newBuilder(MOCK_ACK_ID_SUCCESS).build());
MOCK_ACK_EXTENSION_DEFAULT_SECONDS,
AckRequestData.newBuilder(MOCK_ACK_ID_SUCCESS).build());
modackRequestDataList.add(modackRequestDataSuccess);

ModackRequestData modackRequestDataNack =
Expand Down Expand Up @@ -138,7 +140,8 @@ public void testSendAckOperationsExactlyOnceEnabledMessageFuturesModacks() {
Map<String, String> errorInfoMetadataMapInitialRequest = new HashMap<String, String>();
List<ModackRequestData> modackRequestDataList = new ArrayList<ModackRequestData>();

ModackRequestData modackRequestDataDefault = new ModackRequestData(MOCK_ACK_EXTENSION_DEFAULT);
ModackRequestData modackRequestDataDefault =
new ModackRequestData(MOCK_ACK_EXTENSION_DEFAULT_SECONDS);

// Nack SUCCESS
SettableApiFuture<AckResponse> messageFutureSuccessExpected = SettableApiFuture.create();
Expand Down Expand Up @@ -218,14 +221,14 @@ public void testSendAckOperationsExactlyOnceEnabledMessageFuturesModacks() {
ModifyAckDeadlineRequest.newBuilder()
.setSubscription(MOCK_SUBSCRIPTION_NAME)
.addAllAckIds(ackIdsInitialRequest)
.setAckDeadlineSeconds(MOCK_ACK_EXTENSION_DEFAULT)
.setAckDeadlineSeconds(MOCK_ACK_EXTENSION_DEFAULT_SECONDS)
.build();

ModifyAckDeadlineRequest modifyAckDeadlineRequestRetry =
ModifyAckDeadlineRequest.newBuilder()
.setSubscription(MOCK_SUBSCRIPTION_NAME)
.addAllAckIds(ackIdsRetryRequest)
.setAckDeadlineSeconds(MOCK_ACK_EXTENSION_DEFAULT)
.setAckDeadlineSeconds(MOCK_ACK_EXTENSION_DEFAULT_SECONDS)
.build();

// Set mock grpc responses
Expand Down Expand Up @@ -450,6 +453,53 @@ public void testSetFailureResponseOutstandingMessages() {
});
}

@Test
public void testMaxPerRequestChanges() {
// Setup mocks
List<ModackRequestData> modackRequestDataList = new ArrayList<ModackRequestData>();
List<AckRequestData> ackRequestDataList = new ArrayList<AckRequestData>();

int numAckIds = 3000;
int numMaxPerRequestChanges = 1000;

List<String> mockAckIds = new ArrayList<String>();

for (int i = 0; i < numAckIds; i++) {
String mockAckId = "MOCK-ACK-ID-" + i;
mockAckIds.add(mockAckId);
ackRequestDataList.add(AckRequestData.newBuilder(mockAckId).build());
}

modackRequestDataList.add(
new ModackRequestData(MOCK_ACK_EXTENSION_DEFAULT_SECONDS, ackRequestDataList));

// Instantiate class and run operation(s)
StreamingSubscriberConnection streamingSubscriberConnection =
getStreamingSubscriberConnection(false);
streamingSubscriberConnection.sendAckOperations(ackRequestDataList);
streamingSubscriberConnection.sendModackOperations(modackRequestDataList);

// Assert expected behavior
for (List<String> mockAckIdsInRequest : Lists.partition(mockAckIds, numMaxPerRequestChanges)) {
AcknowledgeRequest expectedAcknowledgeRequest =
AcknowledgeRequest.newBuilder()
.setSubscription(MOCK_SUBSCRIPTION_NAME)
.addAllAckIds(mockAckIdsInRequest)
.build();
verify(mockSubscriberStub.acknowledgeCallable(), times(1))
.futureCall(expectedAcknowledgeRequest);

ModifyAckDeadlineRequest expectedModifyAckDeadlineRequest =
ModifyAckDeadlineRequest.newBuilder()
.setSubscription(MOCK_SUBSCRIPTION_NAME)
.addAllAckIds(mockAckIdsInRequest)
.setAckDeadlineSeconds(MOCK_ACK_EXTENSION_DEFAULT_SECONDS)
.build();
verify(mockSubscriberStub.modifyAckDeadlineCallable(), times(1))
.futureCall(expectedModifyAckDeadlineRequest);
}
}

private StreamingSubscriberConnection getStreamingSubscriberConnection(
boolean exactlyOnceDeliveryEnabled) {
StreamingSubscriberConnection streamingSubscriberConnection =
Expand All @@ -466,7 +516,7 @@ private StreamingSubscriberConnection getStreamingSubscriberConnectionFromBuilde
StreamingSubscriberConnection.Builder builder) {
return builder
.setSubscription(MOCK_SUBSCRIPTION_NAME)
.setAckExpirationPadding(ACK_EXPIRATION_PADDING_DEFAULT)
.setAckExpirationPadding(ACK_EXPIRATION_PADDING_DEFAULT_DURATION)
.setAckLatencyDistribution(mock(Distribution.class))
.setSubscriberStub(mockSubscriberStub)
.setChannelAffinity(0)
Expand Down