diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index 474c140392..0420e47dcf 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -458,6 +458,7 @@ public Map extract(ReadRowsRequest readRowsRequest) { .setRetryableCodes(readRowsSettings.getRetryableCodes()) .setRetrySettings(readRowsSettings.getRetrySettings()) .setIdleTimeout(readRowsSettings.getIdleTimeout()) + .setWaitTimeout(readRowsSettings.getWaitTimeout()) .build(); ServerStreamingCallable watched = @@ -906,6 +907,8 @@ public Map extract( settings.generateInitialChangeStreamPartitionsSettings().getRetrySettings()) .setIdleTimeout( settings.generateInitialChangeStreamPartitionsSettings().getIdleTimeout()) + .setWaitTimeout( + settings.generateInitialChangeStreamPartitionsSettings().getWaitTimeout()) .build(); ServerStreamingCallable watched = @@ -980,6 +983,7 @@ public Map extract( .setRetryableCodes(settings.readChangeStreamSettings().getRetryableCodes()) .setRetrySettings(settings.readChangeStreamSettings().getRetrySettings()) .setIdleTimeout(settings.readChangeStreamSettings().getIdleTimeout()) + .setWaitTimeout(settings.readChangeStreamSettings().getWaitTimeout()) .build(); ServerStreamingCallable watched = diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java index fd54daa0d5..4e6b06f750 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java @@ -714,13 +714,15 @@ private Builder() { generateInitialChangeStreamPartitionsSettings .setRetryableCodes(GENERATE_INITIAL_CHANGE_STREAM_PARTITIONS_RETRY_CODES) .setRetrySettings(GENERATE_INITIAL_CHANGE_STREAM_PARTITIONS_RETRY_SETTINGS) - .setIdleTimeout(Duration.ofMinutes(5)); + .setIdleTimeout(Duration.ofMinutes(5)) + .setWaitTimeout(Duration.ofMinutes(1)); readChangeStreamSettings = ServerStreamingCallSettings.newBuilder(); readChangeStreamSettings .setRetryableCodes(READ_CHANGE_STREAM_RETRY_CODES) .setRetrySettings(READ_CHANGE_STREAM_RETRY_SETTINGS) - .setIdleTimeout(Duration.ofMinutes(5)); + .setIdleTimeout(Duration.ofMinutes(5)) + .setWaitTimeout(Duration.ofMinutes(1)); pingAndWarmSettings = UnaryCallSettings.newUnaryCallSettingsBuilder(); pingAndWarmSettings.setRetrySettings( diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java index ed3cec5d95..a0d56f2344 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java @@ -30,7 +30,9 @@ import com.google.api.gax.grpc.GrpcCallContext; import com.google.api.gax.grpc.GrpcTransportChannel; import com.google.api.gax.rpc.FixedTransportChannelProvider; +import com.google.api.gax.rpc.InstantiatingWatchdogProvider; import com.google.api.gax.rpc.ServerStreamingCallable; +import com.google.api.gax.rpc.WatchdogTimeoutException; import com.google.auth.oauth2.ServiceAccountJwtAccessCredentials; import com.google.bigtable.v2.BigtableGrpc; import com.google.bigtable.v2.FeatureFlags; @@ -38,6 +40,8 @@ import com.google.bigtable.v2.MutateRowsResponse; import com.google.bigtable.v2.PingAndWarmRequest; import com.google.bigtable.v2.PingAndWarmResponse; +import com.google.bigtable.v2.ReadChangeStreamRequest; +import com.google.bigtable.v2.ReadChangeStreamResponse; import com.google.bigtable.v2.ReadRowsRequest; import com.google.bigtable.v2.ReadRowsResponse; import com.google.bigtable.v2.RowSet; @@ -46,11 +50,8 @@ import com.google.cloud.bigtable.data.v2.BigtableDataSettings; import com.google.cloud.bigtable.data.v2.FakeServiceBuilder; import com.google.cloud.bigtable.data.v2.internal.RequestContext; -import com.google.cloud.bigtable.data.v2.models.BulkMutation; -import com.google.cloud.bigtable.data.v2.models.DefaultRowAdapter; -import com.google.cloud.bigtable.data.v2.models.Query; +import com.google.cloud.bigtable.data.v2.models.*; import com.google.cloud.bigtable.data.v2.models.Row; -import com.google.cloud.bigtable.data.v2.models.RowMutationEntry; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Queues; import com.google.common.io.BaseEncoding; @@ -82,11 +83,13 @@ import java.security.NoSuchAlgorithmException; import java.util.Base64; import java.util.Collection; +import java.util.Iterator; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -101,6 +104,8 @@ public class EnhancedBigtableStubTest { private static final String TABLE_NAME = NameUtil.formatTableName(PROJECT_ID, INSTANCE_ID, "fake-table"); private static final String APP_PROFILE_ID = "app-profile-id"; + private static final String WAIT_TIME_TABLE_ID = "test-wait-timeout"; + private static final Duration WATCHDOG_CHECK_DURATION = Duration.ofMillis(100); private Server server; private MetadataInterceptor metadataInterceptor; @@ -544,6 +549,46 @@ public void testBulkMutationFlowControlFeatureFlagIsNotSet() throws Exception { assertThat(featureFlags.getMutateRowsRateLimit()).isFalse(); } + @Test + public void testWaitTimeoutIsSet() throws Exception { + EnhancedBigtableStubSettings.Builder settings = defaultSettings.toBuilder(); + // Set a shorter wait timeout and make watchdog checks more frequently + settings.readRowsSettings().setWaitTimeout(WATCHDOG_CHECK_DURATION.dividedBy(2)); + settings.setStreamWatchdogProvider( + InstantiatingWatchdogProvider.create().withCheckInterval(WATCHDOG_CHECK_DURATION)); + + EnhancedBigtableStub stub = EnhancedBigtableStub.create(settings.build()); + Iterator iterator = + stub.readRowsCallable().call(Query.create(WAIT_TIME_TABLE_ID)).iterator(); + try { + iterator.next(); + Assert.fail("Should throw watchdog timeout exception"); + } catch (WatchdogTimeoutException e) { + assertThat(e.getMessage()).contains("Canceled due to timeout waiting for next response"); + } + } + + @Test + public void testReadChangeStreamWaitTimeoutIsSet() throws Exception { + EnhancedBigtableStubSettings.Builder settings = defaultSettings.toBuilder(); + // Set a shorter wait timeout and make watchdog checks more frequently + settings.readChangeStreamSettings().setWaitTimeout(WATCHDOG_CHECK_DURATION.dividedBy(2)); + settings.setStreamWatchdogProvider( + InstantiatingWatchdogProvider.create().withCheckInterval(WATCHDOG_CHECK_DURATION)); + + EnhancedBigtableStub stub = EnhancedBigtableStub.create(settings.build()); + Iterator iterator = + stub.readChangeStreamCallable() + .call(ReadChangeStreamQuery.create(WAIT_TIME_TABLE_ID)) + .iterator(); + try { + iterator.next(); + Assert.fail("Should throw watchdog timeout exception"); + } catch (WatchdogTimeoutException e) { + assertThat(e.getMessage()).contains("Canceled due to timeout waiting for next response"); + } + } + private static class MetadataInterceptor implements ServerInterceptor { final BlockingQueue headers = Queues.newLinkedBlockingDeque(); @@ -572,6 +617,8 @@ public Listener interceptCall( private static class FakeDataService extends BigtableGrpc.BigtableImplBase { final BlockingQueue requests = Queues.newLinkedBlockingDeque(); + final BlockingQueue readChangeReadStreamRequests = + Queues.newLinkedBlockingDeque(); final BlockingQueue pingRequests = Queues.newLinkedBlockingDeque(); @SuppressWarnings("unchecked") @@ -593,6 +640,13 @@ public void mutateRows( @Override public void readRows( ReadRowsRequest request, StreamObserver responseObserver) { + if (request.getTableName().contains(WAIT_TIME_TABLE_ID)) { + try { + Thread.sleep(WATCHDOG_CHECK_DURATION.toMillis() * 2); + } catch (Exception e) { + + } + } requests.add(request); // Dummy row for stream responseObserver.onNext( @@ -608,6 +662,23 @@ public void readRows( responseObserver.onCompleted(); } + @Override + public void readChangeStream( + ReadChangeStreamRequest request, + StreamObserver responseObserver) { + if (request.getTableName().contains(WAIT_TIME_TABLE_ID)) { + try { + Thread.sleep(WATCHDOG_CHECK_DURATION.toMillis() * 2); + } catch (Exception e) { + + } + } + readChangeReadStreamRequests.add(request); + // Dummy row for stream + responseObserver.onNext(ReadChangeStreamResponse.getDefaultInstance()); + responseObserver.onCompleted(); + } + @Override public void pingAndWarm( PingAndWarmRequest request, StreamObserver responseObserver) {