From 7ec5dd5bc9499b01f5e330c28c9c5ee8cfadad68 Mon Sep 17 00:00:00 2001 From: kolea2 <45548808+kolea2@users.noreply.github.com> Date: Fri, 15 Sep 2023 13:30:38 -0400 Subject: [PATCH] fix: make sure to propagate the response when throttling is enabled (#1908) (#1922) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Change-Id: I690c522aebea03a966155d930bff26042d1bb1f1 Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly: - [ ] Make sure to open an issue as a [bug/issue](https://togithub.com/googleapis/java-bigtable/issues/new/choose) before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea - [ ] Ensure the tests and linter pass - [ ] Code coverage does not decrease (if any source code was changed) - [ ] Appropriate docs were updated (if necessary) Fixes # ☕️ If you write sample code, please follow the [samples format]( https://togithub.com/GoogleCloudPlatform/java-docs-samples/blob/main/SAMPLE_FORMAT.md). Co-authored-by: Igor Bernstein --- .../RateLimitingServerStreamingCallable.java | 1 + .../v2/stub/RateLimitingCallableTest.java | 44 +++++++++++++++++++ 2 files changed, 45 insertions(+) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingServerStreamingCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingServerStreamingCallable.java index 278019b07e..6208fce89e 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingServerStreamingCallable.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingServerStreamingCallable.java @@ -127,6 +127,7 @@ protected void onResponseImpl(MutateRowsResponse response) { Duration.ofSeconds(com.google.protobuf.util.Durations.toSeconds(info.getPeriod()))); } } + outerObserver.onResponse(response); } @Override diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingCallableTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingCallableTest.java index 96092b9e6e..92b93cfafe 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingCallableTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingCallableTest.java @@ -27,9 +27,13 @@ import com.google.api.gax.rpc.StreamController; import com.google.bigtable.v2.MutateRowsRequest; import com.google.bigtable.v2.MutateRowsResponse; +import com.google.bigtable.v2.Mutation; import com.google.bigtable.v2.RateLimitInfo; import com.google.cloud.bigtable.gaxx.testing.FakeStatusCode; +import com.google.protobuf.ByteString; import com.google.protobuf.Duration; +import com.google.rpc.Code; +import com.google.rpc.Status; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -138,6 +142,46 @@ public void testErrorInfoLowerQPS() throws Exception { assertThat(newQps).isWithin(0.1).of(oldQps * RateLimitingServerStreamingCallable.MIN_FACTOR); } + @Test + public void testResponseIsPropagated() { + MutateRowsResponse expectedResponse = + MutateRowsResponse.newBuilder() + .addEntries( + MutateRowsResponse.Entry.newBuilder() + .setIndex(0) + .setStatus(Status.newBuilder().setCode(Code.PERMISSION_DENIED_VALUE))) + .build(); + innerCallable = + new MockCallable() { + @Override + public void call( + MutateRowsRequest mutateRowsRequest, + ResponseObserver responseObserver, + ApiCallContext apiCallContext) { + responseObserver.onResponse(expectedResponse); + responseObserver.onComplete(); + } + }; + + callableToTest = new RateLimitingServerStreamingCallable(innerCallable); + + ResponseObserver mockObserver = Mockito.mock(ResponseObserver.class); + + MutateRowsRequest req = + MutateRowsRequest.newBuilder() + .addEntries( + MutateRowsRequest.Entry.newBuilder() + .setRowKey(ByteString.copyFromUtf8("k1")) + .addMutations( + Mutation.newBuilder() + .setDeleteFromRow(Mutation.DeleteFromRow.getDefaultInstance()))) + .build(); + + callableToTest.call(req, mockObserver, context); + + Mockito.verify(mockObserver, Mockito.times(1)).onResponse(Mockito.eq(expectedResponse)); + } + private static class MockResponseObserver implements ResponseObserver { private ResponseObserver observer;