Skip to content

Commit

Permalink
fix: Too many leases (#1135)
Browse files Browse the repository at this point in the history
* Moved the ackIdsInRequest for modack operations to the correct place.
* Wrote tests around expected batching/partition behavior to verify change works as expected
  • Loading branch information
mmicatka committed May 18, 2022
1 parent 01b3a60 commit c9bcec5
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 13 deletions.
Expand Up @@ -431,9 +431,9 @@ private void sendModackOperations(
// Send modacks
int pendingOperations = 0;
for (ModackRequestData modackRequestData : modackRequestDataList) {
List<String> ackIdsInRequest = new ArrayList<>();
for (List<AckRequestData> ackRequestDataInRequestList :
Lists.partition(modackRequestData.getAckRequestData(), MAX_PER_REQUEST_CHANGES)) {
List<String> ackIdsInRequest = new ArrayList<>();
for (AckRequestData ackRequestData : ackRequestDataInRequestList) {
ackIdsInRequest.add(ackRequestData.getAckId());
if (ackRequestData.hasMessageFuture()) {
Expand Down Expand Up @@ -511,9 +511,10 @@ public void onFailure(Throwable t) {
// Remove from our pending operations
ackOperationsWaiter.incrementPendingCount(-1);

Level level = isAlive() ? Level.WARNING : Level.FINER;
logger.log(level, "failed to send operations", t);

if (!getExactlyOnceDeliveryEnabled()) {
Level level = isAlive() ? Level.WARNING : Level.FINER;
logger.log(level, "failed to send operations", t);
return;
}

Expand Down Expand Up @@ -578,9 +579,6 @@ public void run() {
currentBackoffMillis,
TimeUnit.MILLISECONDS);
}

Level level = isAlive() ? Level.WARNING : Level.FINER;
logger.log(level, "failed to send operations", t);
}
};
}
Expand Down
Expand Up @@ -26,6 +26,7 @@
import com.google.api.gax.core.Distribution;
import com.google.api.gax.rpc.StatusCode;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.common.collect.Lists;
import com.google.protobuf.Any;
import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.ModifyAckDeadlineRequest;
Expand Down Expand Up @@ -71,8 +72,8 @@ public class StreamingSubscriberConnectionTest {
"TRANSIENT_FAILURE_SERVICE_UNAVAILABLE";
private static final String PERMANENT_FAILURE_OTHER = "I_DO_NOT_MATCH_ANY_KNOWN_ERRORS";

private static int MOCK_ACK_EXTENSION_DEFAULT = 10;
private static Duration ACK_EXPIRATION_PADDING_DEFAULT = Duration.ofSeconds(10);
private static int MOCK_ACK_EXTENSION_DEFAULT_SECONDS = 10;
private static Duration ACK_EXPIRATION_PADDING_DEFAULT_DURATION = Duration.ofSeconds(10);
private static int MAX_DURATION_PER_ACK_EXTENSION_DEFAULT_SECONDS = 10;

@Before
Expand Down Expand Up @@ -105,7 +106,8 @@ public void testSendAckOperationsExactlyOnceDisabledNoMessageFutures() {

ModackRequestData modackRequestDataSuccess =
new ModackRequestData(
MOCK_ACK_EXTENSION_DEFAULT, AckRequestData.newBuilder(MOCK_ACK_ID_SUCCESS).build());
MOCK_ACK_EXTENSION_DEFAULT_SECONDS,
AckRequestData.newBuilder(MOCK_ACK_ID_SUCCESS).build());
modackRequestDataList.add(modackRequestDataSuccess);

ModackRequestData modackRequestDataNack =
Expand Down Expand Up @@ -138,7 +140,8 @@ public void testSendAckOperationsExactlyOnceEnabledMessageFuturesModacks() {
Map<String, String> errorInfoMetadataMapInitialRequest = new HashMap<String, String>();
List<ModackRequestData> modackRequestDataList = new ArrayList<ModackRequestData>();

ModackRequestData modackRequestDataDefault = new ModackRequestData(MOCK_ACK_EXTENSION_DEFAULT);
ModackRequestData modackRequestDataDefault =
new ModackRequestData(MOCK_ACK_EXTENSION_DEFAULT_SECONDS);

// Nack SUCCESS
SettableApiFuture<AckResponse> messageFutureSuccessExpected = SettableApiFuture.create();
Expand Down Expand Up @@ -218,14 +221,14 @@ public void testSendAckOperationsExactlyOnceEnabledMessageFuturesModacks() {
ModifyAckDeadlineRequest.newBuilder()
.setSubscription(MOCK_SUBSCRIPTION_NAME)
.addAllAckIds(ackIdsInitialRequest)
.setAckDeadlineSeconds(MOCK_ACK_EXTENSION_DEFAULT)
.setAckDeadlineSeconds(MOCK_ACK_EXTENSION_DEFAULT_SECONDS)
.build();

ModifyAckDeadlineRequest modifyAckDeadlineRequestRetry =
ModifyAckDeadlineRequest.newBuilder()
.setSubscription(MOCK_SUBSCRIPTION_NAME)
.addAllAckIds(ackIdsRetryRequest)
.setAckDeadlineSeconds(MOCK_ACK_EXTENSION_DEFAULT)
.setAckDeadlineSeconds(MOCK_ACK_EXTENSION_DEFAULT_SECONDS)
.build();

// Set mock grpc responses
Expand Down Expand Up @@ -450,6 +453,53 @@ public void testSetFailureResponseOutstandingMessages() {
});
}

@Test
public void testMaxPerRequestChanges() {
// Setup mocks
List<ModackRequestData> modackRequestDataList = new ArrayList<ModackRequestData>();
List<AckRequestData> ackRequestDataList = new ArrayList<AckRequestData>();

int numAckIds = 3000;
int numMaxPerRequestChanges = 1000;

List<String> mockAckIds = new ArrayList<String>();

for (int i = 0; i < numAckIds; i++) {
String mockAckId = "MOCK-ACK-ID-" + i;
mockAckIds.add(mockAckId);
ackRequestDataList.add(AckRequestData.newBuilder(mockAckId).build());
}

modackRequestDataList.add(
new ModackRequestData(MOCK_ACK_EXTENSION_DEFAULT_SECONDS, ackRequestDataList));

// Instantiate class and run operation(s)
StreamingSubscriberConnection streamingSubscriberConnection =
getStreamingSubscriberConnection(false);
streamingSubscriberConnection.sendAckOperations(ackRequestDataList);
streamingSubscriberConnection.sendModackOperations(modackRequestDataList);

// Assert expected behavior
for (List<String> mockAckIdsInRequest : Lists.partition(mockAckIds, numMaxPerRequestChanges)) {
AcknowledgeRequest expectedAcknowledgeRequest =
AcknowledgeRequest.newBuilder()
.setSubscription(MOCK_SUBSCRIPTION_NAME)
.addAllAckIds(mockAckIdsInRequest)
.build();
verify(mockSubscriberStub.acknowledgeCallable(), times(1))
.futureCall(expectedAcknowledgeRequest);

ModifyAckDeadlineRequest expectedModifyAckDeadlineRequest =
ModifyAckDeadlineRequest.newBuilder()
.setSubscription(MOCK_SUBSCRIPTION_NAME)
.addAllAckIds(mockAckIdsInRequest)
.setAckDeadlineSeconds(MOCK_ACK_EXTENSION_DEFAULT_SECONDS)
.build();
verify(mockSubscriberStub.modifyAckDeadlineCallable(), times(1))
.futureCall(expectedModifyAckDeadlineRequest);
}
}

private StreamingSubscriberConnection getStreamingSubscriberConnection(
boolean exactlyOnceDeliveryEnabled) {
StreamingSubscriberConnection streamingSubscriberConnection =
Expand All @@ -466,7 +516,7 @@ private StreamingSubscriberConnection getStreamingSubscriberConnectionFromBuilde
StreamingSubscriberConnection.Builder builder) {
return builder
.setSubscription(MOCK_SUBSCRIPTION_NAME)
.setAckExpirationPadding(ACK_EXPIRATION_PADDING_DEFAULT)
.setAckExpirationPadding(ACK_EXPIRATION_PADDING_DEFAULT_DURATION)
.setAckLatencyDistribution(mock(Distribution.class))
.setSubscriberStub(mockSubscriberStub)
.setChannelAffinity(0)
Expand Down

0 comments on commit c9bcec5

Please sign in to comment.