diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingServerStreamingCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingServerStreamingCallable.java index 278019b07e..6208fce89e 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingServerStreamingCallable.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingServerStreamingCallable.java @@ -127,6 +127,7 @@ protected void onResponseImpl(MutateRowsResponse response) { Duration.ofSeconds(com.google.protobuf.util.Durations.toSeconds(info.getPeriod()))); } } + outerObserver.onResponse(response); } @Override diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingCallableTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingCallableTest.java index 96092b9e6e..92b93cfafe 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingCallableTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingCallableTest.java @@ -27,9 +27,13 @@ import com.google.api.gax.rpc.StreamController; import com.google.bigtable.v2.MutateRowsRequest; import com.google.bigtable.v2.MutateRowsResponse; +import com.google.bigtable.v2.Mutation; import com.google.bigtable.v2.RateLimitInfo; import com.google.cloud.bigtable.gaxx.testing.FakeStatusCode; +import com.google.protobuf.ByteString; import com.google.protobuf.Duration; +import com.google.rpc.Code; +import com.google.rpc.Status; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -138,6 +142,46 @@ public void testErrorInfoLowerQPS() throws Exception { assertThat(newQps).isWithin(0.1).of(oldQps * RateLimitingServerStreamingCallable.MIN_FACTOR); } + @Test + public void testResponseIsPropagated() { + MutateRowsResponse expectedResponse = + MutateRowsResponse.newBuilder() + .addEntries( + MutateRowsResponse.Entry.newBuilder() + .setIndex(0) + .setStatus(Status.newBuilder().setCode(Code.PERMISSION_DENIED_VALUE))) + .build(); + innerCallable = + new MockCallable() { + @Override + public void call( + MutateRowsRequest mutateRowsRequest, + ResponseObserver responseObserver, + ApiCallContext apiCallContext) { + responseObserver.onResponse(expectedResponse); + responseObserver.onComplete(); + } + }; + + callableToTest = new RateLimitingServerStreamingCallable(innerCallable); + + ResponseObserver mockObserver = Mockito.mock(ResponseObserver.class); + + MutateRowsRequest req = + MutateRowsRequest.newBuilder() + .addEntries( + MutateRowsRequest.Entry.newBuilder() + .setRowKey(ByteString.copyFromUtf8("k1")) + .addMutations( + Mutation.newBuilder() + .setDeleteFromRow(Mutation.DeleteFromRow.getDefaultInstance()))) + .build(); + + callableToTest.call(req, mockObserver, context); + + Mockito.verify(mockObserver, Mockito.times(1)).onResponse(Mockito.eq(expectedResponse)); + } + private static class MockResponseObserver implements ResponseObserver { private ResponseObserver observer;