From c9bcec531bf175684306e50eaf7ef96ee60cba78 Mon Sep 17 00:00:00 2001 From: Mike Micatka <31972785+mmicatka@users.noreply.github.com> Date: Wed, 18 May 2022 10:30:55 -0400 Subject: [PATCH] fix: Too many leases (#1135) * Moved the ackIdsInRequest for modack operations to the correct place. * Wrote tests around expected batching/partition behavior to verify change works as expected --- .../v1/StreamingSubscriberConnection.java | 10 ++- .../v1/StreamingSubscriberConnectionTest.java | 64 +++++++++++++++++-- 2 files changed, 61 insertions(+), 13 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java index 78f35efea..b19a1cbbe 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java @@ -431,9 +431,9 @@ private void sendModackOperations( // Send modacks int pendingOperations = 0; for (ModackRequestData modackRequestData : modackRequestDataList) { - List ackIdsInRequest = new ArrayList<>(); for (List ackRequestDataInRequestList : Lists.partition(modackRequestData.getAckRequestData(), MAX_PER_REQUEST_CHANGES)) { + List ackIdsInRequest = new ArrayList<>(); for (AckRequestData ackRequestData : ackRequestDataInRequestList) { ackIdsInRequest.add(ackRequestData.getAckId()); if (ackRequestData.hasMessageFuture()) { @@ -511,9 +511,10 @@ public void onFailure(Throwable t) { // Remove from our pending operations ackOperationsWaiter.incrementPendingCount(-1); + Level level = isAlive() ? Level.WARNING : Level.FINER; + logger.log(level, "failed to send operations", t); + if (!getExactlyOnceDeliveryEnabled()) { - Level level = isAlive() ? Level.WARNING : Level.FINER; - logger.log(level, "failed to send operations", t); return; } @@ -578,9 +579,6 @@ public void run() { currentBackoffMillis, TimeUnit.MILLISECONDS); } - - Level level = isAlive() ? Level.WARNING : Level.FINER; - logger.log(level, "failed to send operations", t); } }; } diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnectionTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnectionTest.java index d8e1878dd..95f8897a4 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnectionTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnectionTest.java @@ -26,6 +26,7 @@ import com.google.api.gax.core.Distribution; import com.google.api.gax.rpc.StatusCode; import com.google.cloud.pubsub.v1.stub.SubscriberStub; +import com.google.common.collect.Lists; import com.google.protobuf.Any; import com.google.pubsub.v1.AcknowledgeRequest; import com.google.pubsub.v1.ModifyAckDeadlineRequest; @@ -71,8 +72,8 @@ public class StreamingSubscriberConnectionTest { "TRANSIENT_FAILURE_SERVICE_UNAVAILABLE"; private static final String PERMANENT_FAILURE_OTHER = "I_DO_NOT_MATCH_ANY_KNOWN_ERRORS"; - private static int MOCK_ACK_EXTENSION_DEFAULT = 10; - private static Duration ACK_EXPIRATION_PADDING_DEFAULT = Duration.ofSeconds(10); + private static int MOCK_ACK_EXTENSION_DEFAULT_SECONDS = 10; + private static Duration ACK_EXPIRATION_PADDING_DEFAULT_DURATION = Duration.ofSeconds(10); private static int MAX_DURATION_PER_ACK_EXTENSION_DEFAULT_SECONDS = 10; @Before @@ -105,7 +106,8 @@ public void testSendAckOperationsExactlyOnceDisabledNoMessageFutures() { ModackRequestData modackRequestDataSuccess = new ModackRequestData( - MOCK_ACK_EXTENSION_DEFAULT, AckRequestData.newBuilder(MOCK_ACK_ID_SUCCESS).build()); + MOCK_ACK_EXTENSION_DEFAULT_SECONDS, + AckRequestData.newBuilder(MOCK_ACK_ID_SUCCESS).build()); modackRequestDataList.add(modackRequestDataSuccess); ModackRequestData modackRequestDataNack = @@ -138,7 +140,8 @@ public void testSendAckOperationsExactlyOnceEnabledMessageFuturesModacks() { Map errorInfoMetadataMapInitialRequest = new HashMap(); List modackRequestDataList = new ArrayList(); - ModackRequestData modackRequestDataDefault = new ModackRequestData(MOCK_ACK_EXTENSION_DEFAULT); + ModackRequestData modackRequestDataDefault = + new ModackRequestData(MOCK_ACK_EXTENSION_DEFAULT_SECONDS); // Nack SUCCESS SettableApiFuture messageFutureSuccessExpected = SettableApiFuture.create(); @@ -218,14 +221,14 @@ public void testSendAckOperationsExactlyOnceEnabledMessageFuturesModacks() { ModifyAckDeadlineRequest.newBuilder() .setSubscription(MOCK_SUBSCRIPTION_NAME) .addAllAckIds(ackIdsInitialRequest) - .setAckDeadlineSeconds(MOCK_ACK_EXTENSION_DEFAULT) + .setAckDeadlineSeconds(MOCK_ACK_EXTENSION_DEFAULT_SECONDS) .build(); ModifyAckDeadlineRequest modifyAckDeadlineRequestRetry = ModifyAckDeadlineRequest.newBuilder() .setSubscription(MOCK_SUBSCRIPTION_NAME) .addAllAckIds(ackIdsRetryRequest) - .setAckDeadlineSeconds(MOCK_ACK_EXTENSION_DEFAULT) + .setAckDeadlineSeconds(MOCK_ACK_EXTENSION_DEFAULT_SECONDS) .build(); // Set mock grpc responses @@ -450,6 +453,53 @@ public void testSetFailureResponseOutstandingMessages() { }); } + @Test + public void testMaxPerRequestChanges() { + // Setup mocks + List modackRequestDataList = new ArrayList(); + List ackRequestDataList = new ArrayList(); + + int numAckIds = 3000; + int numMaxPerRequestChanges = 1000; + + List mockAckIds = new ArrayList(); + + for (int i = 0; i < numAckIds; i++) { + String mockAckId = "MOCK-ACK-ID-" + i; + mockAckIds.add(mockAckId); + ackRequestDataList.add(AckRequestData.newBuilder(mockAckId).build()); + } + + modackRequestDataList.add( + new ModackRequestData(MOCK_ACK_EXTENSION_DEFAULT_SECONDS, ackRequestDataList)); + + // Instantiate class and run operation(s) + StreamingSubscriberConnection streamingSubscriberConnection = + getStreamingSubscriberConnection(false); + streamingSubscriberConnection.sendAckOperations(ackRequestDataList); + streamingSubscriberConnection.sendModackOperations(modackRequestDataList); + + // Assert expected behavior + for (List mockAckIdsInRequest : Lists.partition(mockAckIds, numMaxPerRequestChanges)) { + AcknowledgeRequest expectedAcknowledgeRequest = + AcknowledgeRequest.newBuilder() + .setSubscription(MOCK_SUBSCRIPTION_NAME) + .addAllAckIds(mockAckIdsInRequest) + .build(); + verify(mockSubscriberStub.acknowledgeCallable(), times(1)) + .futureCall(expectedAcknowledgeRequest); + + ModifyAckDeadlineRequest expectedModifyAckDeadlineRequest = + ModifyAckDeadlineRequest.newBuilder() + .setSubscription(MOCK_SUBSCRIPTION_NAME) + .addAllAckIds(mockAckIdsInRequest) + .setAckDeadlineSeconds(MOCK_ACK_EXTENSION_DEFAULT_SECONDS) + .build(); + verify(mockSubscriberStub.modifyAckDeadlineCallable(), times(1)) + .futureCall(expectedModifyAckDeadlineRequest); + } + } + private StreamingSubscriberConnection getStreamingSubscriberConnection( boolean exactlyOnceDeliveryEnabled) { StreamingSubscriberConnection streamingSubscriberConnection = @@ -466,7 +516,7 @@ private StreamingSubscriberConnection getStreamingSubscriberConnectionFromBuilde StreamingSubscriberConnection.Builder builder) { return builder .setSubscription(MOCK_SUBSCRIPTION_NAME) - .setAckExpirationPadding(ACK_EXPIRATION_PADDING_DEFAULT) + .setAckExpirationPadding(ACK_EXPIRATION_PADDING_DEFAULT_DURATION) .setAckLatencyDistribution(mock(Distribution.class)) .setSubscriberStub(mockSubscriberStub) .setChannelAffinity(0)