From 7d27816b959a06412f59f49023611f0bca8e3536 Mon Sep 17 00:00:00 2001 From: Reza Karegar Date: Fri, 16 Feb 2024 15:47:36 -0500 Subject: [PATCH] feat: create the backbone of counting errors per connection each minute. (#2094) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Create the backbone of counting errors per connection each minute. * Clean up creating new interceptors and StatsRecorderWrapper ctor. * Rename setting background task and fix imports. * Temporarily skip exporting per connection metrics to fix test failures. * Temporarily share the tests for debugging purposes * Temporarily add the test for debugging. * Remove the new ExecutorProvider and fix integration test failures. * Update unit tests to reflect the new setup. * Clean up and add tests. * Clean comments and add a TODO. * Improve tests and comments. * Address comments and refactor by defining new classes. * Fix code formatting. * Refactor classes and move to better packages. * Clean up classes and address comments. * Update the scheduler object. * Apply cleanups. * Fix unit tests and avoid hanging when getting error in close(). * Fix code formatting. * Improve error handling in the close() method. * Improve exception logging. * Fix code formatting. * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --------- Co-authored-by: Owl Bot --- .gitignore | 1 + .../BigtableCreateTimeSeriesExporter.java | 4 + .../StatsRecorderWrapperForConnection.java | 57 ++++ .../cloud/bigtable/stats/StatsWrapper.java | 5 + .../data/v2/stub/EnhancedBigtableStub.java | 55 +++- .../v2/stub/EnhancedBigtableStubSettings.java | 2 - .../ConnectionErrorCountInterceptor.java | 89 ++++++ .../ErrorCountPerConnectionMetricTracker.java | 83 ++++++ .../v2/BigtableDataClientFactoryTest.java | 2 +- .../metrics/ErrorCountPerConnectionTest.java | 262 ++++++++++++++++++ 10 files changed, 543 insertions(+), 17 deletions(-) create mode 100644 google-cloud-bigtable-stats/src/main/java/com/google/cloud/bigtable/stats/StatsRecorderWrapperForConnection.java create mode 100644 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/ConnectionErrorCountInterceptor.java create mode 100644 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/ErrorCountPerConnectionMetricTracker.java create mode 100644 google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/ErrorCountPerConnectionTest.java diff --git a/.gitignore b/.gitignore index dbde6a740b..4a670f8c53 100644 --- a/.gitignore +++ b/.gitignore @@ -42,3 +42,4 @@ api_key artman-genfiles .flattened-pom.xml +dependency-reduced-pom.xml diff --git a/google-cloud-bigtable-stats/src/main/java/com/google/cloud/bigtable/stats/BigtableCreateTimeSeriesExporter.java b/google-cloud-bigtable-stats/src/main/java/com/google/cloud/bigtable/stats/BigtableCreateTimeSeriesExporter.java index 325a07a0c5..dca52f1c81 100644 --- a/google-cloud-bigtable-stats/src/main/java/com/google/cloud/bigtable/stats/BigtableCreateTimeSeriesExporter.java +++ b/google-cloud-bigtable-stats/src/main/java/com/google/cloud/bigtable/stats/BigtableCreateTimeSeriesExporter.java @@ -51,6 +51,10 @@ public void export(Collection metrics) { if (!metric.getMetricDescriptor().getName().contains("bigtable")) { continue; } + // TODO: temporarily skip exporting per connection metrics. + if (metric.getMetricDescriptor().getName().contains("per_connection_error_count")) { + continue; + } projectToTimeSeries = metric.getTimeSeriesList().stream() diff --git a/google-cloud-bigtable-stats/src/main/java/com/google/cloud/bigtable/stats/StatsRecorderWrapperForConnection.java b/google-cloud-bigtable-stats/src/main/java/com/google/cloud/bigtable/stats/StatsRecorderWrapperForConnection.java new file mode 100644 index 0000000000..3c335d28bc --- /dev/null +++ b/google-cloud-bigtable-stats/src/main/java/com/google/cloud/bigtable/stats/StatsRecorderWrapperForConnection.java @@ -0,0 +1,57 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.stats; + +import com.google.api.core.InternalApi; +import io.opencensus.stats.MeasureMap; +import io.opencensus.stats.StatsRecorder; +import io.opencensus.tags.TagContext; +import io.opencensus.tags.TagContextBuilder; +import io.opencensus.tags.TagKey; +import io.opencensus.tags.TagValue; +import io.opencensus.tags.Tagger; +import io.opencensus.tags.Tags; +import java.util.Map; + +/** A wrapper to record built-in metrics for connection metrics not tied to operations/RPCs. */ +@InternalApi("For internal use only") +public class StatsRecorderWrapperForConnection { + private final StatsRecorder statsRecorder; + private final TagContext tagContext; + private MeasureMap perConnectionErrorCountMeasureMap; + + public StatsRecorderWrapperForConnection( + Map statsAttributes, StatsRecorder statsRecorder) { + this.statsRecorder = statsRecorder; + + this.perConnectionErrorCountMeasureMap = statsRecorder.newMeasureMap(); + + Tagger tagger = Tags.getTagger(); + TagContextBuilder tagContextBuilder = tagger.toBuilder(tagger.getCurrentTagContext()); + for (Map.Entry entry : statsAttributes.entrySet()) { + tagContextBuilder.putLocal(TagKey.create(entry.getKey()), TagValue.create(entry.getValue())); + } + this.tagContext = tagContextBuilder.build(); + } + + public void putAndRecordPerConnectionErrorCount(long errorCount) { + perConnectionErrorCountMeasureMap.put( + BuiltinMeasureConstants.PER_CONNECTION_ERROR_COUNT, errorCount); + + perConnectionErrorCountMeasureMap.record(tagContext); + perConnectionErrorCountMeasureMap = statsRecorder.newMeasureMap(); + } +} diff --git a/google-cloud-bigtable-stats/src/main/java/com/google/cloud/bigtable/stats/StatsWrapper.java b/google-cloud-bigtable-stats/src/main/java/com/google/cloud/bigtable/stats/StatsWrapper.java index 401a1cf975..0a97b21d67 100644 --- a/google-cloud-bigtable-stats/src/main/java/com/google/cloud/bigtable/stats/StatsWrapper.java +++ b/google-cloud-bigtable-stats/src/main/java/com/google/cloud/bigtable/stats/StatsWrapper.java @@ -40,6 +40,11 @@ public static StatsRecorderWrapper createRecorder( operationType, spanName, statsAttributes, Stats.getStatsRecorder()); } + public static StatsRecorderWrapperForConnection createRecorderForConnection( + Map statsAttributes) { + return new StatsRecorderWrapperForConnection(statsAttributes, Stats.getStatsRecorder()); + } + // This is used in integration tests to get the tag value strings from view manager because Stats // is relocated to com.google.bigtable.veneer.repackaged.io.opencensus. @InternalApi("Visible for testing") diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index 9fb906c2d1..ef37dd4e48 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -15,6 +15,7 @@ */ package com.google.cloud.bigtable.data.v2.stub; +import com.google.api.core.ApiFunction; import com.google.api.core.BetaApi; import com.google.api.core.InternalApi; import com.google.api.gax.batching.Batcher; @@ -94,6 +95,7 @@ import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracerUnaryCallable; import com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsTracerFactory; import com.google.cloud.bigtable.data.v2.stub.metrics.CompositeTracerFactory; +import com.google.cloud.bigtable.data.v2.stub.metrics.ErrorCountPerConnectionMetricTracker; import com.google.cloud.bigtable.data.v2.stub.metrics.MetricsTracerFactory; import com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants; import com.google.cloud.bigtable.data.v2.stub.metrics.StatsHeadersServerStreamingCallable; @@ -117,6 +119,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.protobuf.ByteString; +import io.grpc.ManagedChannelBuilder; import io.opencensus.stats.Stats; import io.opencensus.stats.StatsRecorder; import io.opencensus.tags.TagKey; @@ -149,7 +152,6 @@ public class EnhancedBigtableStub implements AutoCloseable { private static final String CLIENT_NAME = "Bigtable"; private static final long FLOW_CONTROL_ADJUSTING_INTERVAL_MS = TimeUnit.SECONDS.toMillis(20); - private final EnhancedBigtableStubSettings settings; private final ClientContext clientContext; @@ -176,7 +178,6 @@ public class EnhancedBigtableStub implements AutoCloseable { public static EnhancedBigtableStub create(EnhancedBigtableStubSettings settings) throws IOException { - settings = settings.toBuilder().setTracerFactory(createBigtableTracerFactory(settings)).build(); ClientContext clientContext = createClientContext(settings); @@ -204,10 +205,27 @@ public static ClientContext createClientContext(EnhancedBigtableStubSettings set ? ((InstantiatingGrpcChannelProvider) builder.getTransportChannelProvider()).toBuilder() : null; - if (builder.getEnableRoutingCookie() && transportProvider != null) { - // TODO: this also need to be added to BigtableClientFactory - // patch cookies interceptor - transportProvider.setInterceptorProvider(() -> ImmutableList.of(new CookiesInterceptor())); + ErrorCountPerConnectionMetricTracker errorCountPerConnectionMetricTracker; + if (transportProvider != null) { + errorCountPerConnectionMetricTracker = + new ErrorCountPerConnectionMetricTracker(createBuiltinAttributes(builder)); + ApiFunction oldChannelConfigurator = + transportProvider.getChannelConfigurator(); + transportProvider.setChannelConfigurator( + managedChannelBuilder -> { + if (settings.getEnableRoutingCookie()) { + managedChannelBuilder.intercept(new CookiesInterceptor()); + } + + managedChannelBuilder.intercept(errorCountPerConnectionMetricTracker.getInterceptor()); + + if (oldChannelConfigurator != null) { + managedChannelBuilder = oldChannelConfigurator.apply(managedChannelBuilder); + } + return managedChannelBuilder; + }); + } else { + errorCountPerConnectionMetricTracker = null; } // Inject channel priming @@ -233,7 +251,12 @@ public static ClientContext createClientContext(EnhancedBigtableStubSettings set builder.setTransportChannelProvider(transportProvider.build()); } - return ClientContext.create(builder.build()); + ClientContext clientContext = ClientContext.create(builder.build()); + if (errorCountPerConnectionMetricTracker != null) { + errorCountPerConnectionMetricTracker.startConnectionErrorCountTracker( + clientContext.getExecutor()); + } + return clientContext; } public static ApiTracerFactory createBigtableTracerFactory( @@ -254,13 +277,7 @@ public static ApiTracerFactory createBigtableTracerFactory( .put(RpcMeasureConstants.BIGTABLE_INSTANCE_ID, TagValue.create(instanceId)) .put(RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID, TagValue.create(appProfileId)) .build(); - ImmutableMap builtinAttributes = - ImmutableMap.builder() - .put("project_id", projectId) - .put("instance", instanceId) - .put("app_profile", appProfileId) - .put("client_name", "bigtable-java/" + Version.VERSION) - .build(); + ImmutableMap builtinAttributes = createBuiltinAttributes(settings.toBuilder()); return new CompositeTracerFactory( ImmutableList.of( @@ -283,6 +300,16 @@ public static ApiTracerFactory createBigtableTracerFactory( settings.getTracerFactory())); } + private static ImmutableMap createBuiltinAttributes( + EnhancedBigtableStubSettings.Builder builder) { + return ImmutableMap.builder() + .put("project_id", builder.getProjectId()) + .put("instance", builder.getInstanceId()) + .put("app_profile", builder.getAppProfileId()) + .put("client_name", "bigtable-java/" + Version.VERSION) + .build(); + } + private static void patchCredentials(EnhancedBigtableStubSettings.Builder settings) throws IOException { int i = settings.getEndpoint().lastIndexOf(":"); diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java index 44e4752cd5..9a5027c740 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java @@ -652,7 +652,6 @@ private Builder() { setCredentialsProvider(defaultCredentialsProviderBuilder().build()); this.enableRoutingCookie = true; this.enableRetryInfo = true; - // Defaults provider BigtableStubSettings.Builder baseDefaults = BigtableStubSettings.newBuilder(); @@ -772,7 +771,6 @@ private Builder(EnhancedBigtableStubSettings settings) { jwtAudienceMapping = settings.jwtAudienceMapping; enableRoutingCookie = settings.enableRoutingCookie; enableRetryInfo = settings.enableRetryInfo; - // Per method settings. readRowsSettings = settings.readRowsSettings.toBuilder(); readRowSettings = settings.readRowSettings.toBuilder(); diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/ConnectionErrorCountInterceptor.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/ConnectionErrorCountInterceptor.java new file mode 100644 index 0000000000..17fcf9018e --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/ConnectionErrorCountInterceptor.java @@ -0,0 +1,89 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub.metrics; + +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.ForwardingClientCall; +import io.grpc.ForwardingClientCallListener; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.Status; +import java.util.concurrent.atomic.LongAdder; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** An interceptor which counts the number of failed responses for a channel. */ +class ConnectionErrorCountInterceptor implements ClientInterceptor { + private static final Logger LOG = + Logger.getLogger(ConnectionErrorCountInterceptor.class.toString()); + private final LongAdder numOfErrors; + private final LongAdder numOfSuccesses; + + ConnectionErrorCountInterceptor() { + numOfErrors = new LongAdder(); + numOfSuccesses = new LongAdder(); + } + + @Override + public ClientCall interceptCall( + MethodDescriptor methodDescriptor, CallOptions callOptions, Channel channel) { + return new ForwardingClientCall.SimpleForwardingClientCall( + channel.newCall(methodDescriptor, callOptions)) { + @Override + public void start(Listener responseListener, Metadata headers) { + super.start( + new ForwardingClientCallListener.SimpleForwardingClientCallListener( + responseListener) { + @Override + public void onClose(Status status, Metadata trailers) { + // Connection accounting is non-critical, so we log the exception, but let normal + // processing proceed. + try { + handleOnCloseUnsafe(status); + } catch (Throwable t) { + if (t instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + LOG.log( + Level.WARNING, "Unexpected error while updating connection error stats", t); + } + super.onClose(status, trailers); + } + + private void handleOnCloseUnsafe(Status status) { + if (status.isOk()) { + numOfSuccesses.increment(); + } else { + numOfErrors.increment(); + } + } + }, + headers); + } + }; + } + + long getAndResetNumOfErrors() { + return numOfErrors.sumThenReset(); + } + + long getAndResetNumOfSuccesses() { + return numOfSuccesses.sumThenReset(); + } +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/ErrorCountPerConnectionMetricTracker.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/ErrorCountPerConnectionMetricTracker.java new file mode 100644 index 0000000000..cab3b0bbd0 --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/ErrorCountPerConnectionMetricTracker.java @@ -0,0 +1,83 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub.metrics; + +import com.google.api.core.InternalApi; +import com.google.cloud.bigtable.stats.StatsRecorderWrapperForConnection; +import com.google.cloud.bigtable.stats.StatsWrapper; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import io.grpc.ClientInterceptor; +import java.util.Collections; +import java.util.Set; +import java.util.WeakHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/* Background task that goes through all connections and updates the errors_per_connection metric. */ +@InternalApi("For internal use only") +public class ErrorCountPerConnectionMetricTracker implements Runnable { + private static final Integer PER_CONNECTION_ERROR_COUNT_PERIOD_SECONDS = 60; + private final Set connectionErrorCountInterceptors; + private final Object interceptorsLock = new Object(); + // This is not final so that it can be updated and mocked during testing. + private StatsRecorderWrapperForConnection statsRecorderWrapperForConnection; + + @VisibleForTesting + void setStatsRecorderWrapperForConnection( + StatsRecorderWrapperForConnection statsRecorderWrapperForConnection) { + this.statsRecorderWrapperForConnection = statsRecorderWrapperForConnection; + } + + public ErrorCountPerConnectionMetricTracker(ImmutableMap builtinAttributes) { + connectionErrorCountInterceptors = + Collections.synchronizedSet(Collections.newSetFromMap(new WeakHashMap<>())); + + this.statsRecorderWrapperForConnection = + StatsWrapper.createRecorderForConnection(builtinAttributes); + } + + public void startConnectionErrorCountTracker(ScheduledExecutorService scheduler) { + scheduler.scheduleAtFixedRate( + this, 0, PER_CONNECTION_ERROR_COUNT_PERIOD_SECONDS, TimeUnit.SECONDS); + } + + public ClientInterceptor getInterceptor() { + ConnectionErrorCountInterceptor connectionErrorCountInterceptor = + new ConnectionErrorCountInterceptor(); + synchronized (interceptorsLock) { + connectionErrorCountInterceptors.add(connectionErrorCountInterceptor); + } + return connectionErrorCountInterceptor; + } + + @Override + public void run() { + synchronized (interceptorsLock) { + for (ConnectionErrorCountInterceptor interceptor : connectionErrorCountInterceptors) { + long errors = interceptor.getAndResetNumOfErrors(); + long successes = interceptor.getAndResetNumOfSuccesses(); + // We avoid keeping track of inactive connections (i.e., without any failed or successful + // requests). + if (errors > 0 || successes > 0) { + // TODO: add a metric to also keep track of the number of successful requests per each + // connection. + statsRecorderWrapperForConnection.putAndRecordPerConnectionErrorCount(errors); + } + } + } + } +} diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactoryTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactoryTest.java index edcda45938..a35112b380 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactoryTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactoryTest.java @@ -266,7 +266,7 @@ public void testCreateWithRefreshingChannel() throws Exception { factory.createForAppProfile("other-appprofile"); factory.createForInstance("other-project", "other-instance"); - // Make sure that only 1 instance is created for all clients + // Make sure that only 1 instance is created by each provider Mockito.verify(credentialsProvider, Mockito.times(1)).getCredentials(); Mockito.verify(executorProvider, Mockito.times(1)).getExecutor(); Mockito.verify(watchdogProvider, Mockito.times(1)).getWatchdog(); diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/ErrorCountPerConnectionTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/ErrorCountPerConnectionTest.java new file mode 100644 index 0000000000..a6670182b8 --- /dev/null +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/ErrorCountPerConnectionTest.java @@ -0,0 +1,262 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub.metrics; + +import static com.google.common.truth.Truth.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; + +import com.google.api.gax.core.FixedExecutorProvider; +import com.google.api.gax.grpc.ChannelPoolSettings; +import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; +import com.google.bigtable.v2.*; +import com.google.cloud.bigtable.data.v2.BigtableDataSettings; +import com.google.cloud.bigtable.data.v2.FakeServiceBuilder; +import com.google.cloud.bigtable.data.v2.models.*; +import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStub; +import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStubSettings; +import com.google.cloud.bigtable.stats.StatsRecorderWrapperForConnection; +import io.grpc.Server; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; +import java.util.List; +import java.util.concurrent.ScheduledExecutorService; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +@RunWith(JUnit4.class) +public class ErrorCountPerConnectionTest { + private static final String SUCCESS_TABLE_NAME = "some-table"; + private static final String ERROR_TABLE_NAME = "nonexistent-table"; + private Server server; + private final FakeService fakeService = new FakeService(); + private EnhancedBigtableStubSettings.Builder builder; + private ArgumentCaptor runnableCaptor; + private StatsRecorderWrapperForConnection statsRecorderWrapperForConnection; + + @Before + public void setup() throws Exception { + server = FakeServiceBuilder.create(fakeService).start(); + + ScheduledExecutorService executors = Mockito.mock(ScheduledExecutorService.class); + builder = + BigtableDataSettings.newBuilderForEmulator(server.getPort()) + .stubSettings() + .setBackgroundExecutorProvider(FixedExecutorProvider.create(executors)) + .setProjectId("fake-project") + .setInstanceId("fake-instance"); + runnableCaptor = ArgumentCaptor.forClass(Runnable.class); + Mockito.when( + executors.scheduleAtFixedRate(runnableCaptor.capture(), anyLong(), anyLong(), any())) + .thenReturn(null); + + statsRecorderWrapperForConnection = Mockito.mock(StatsRecorderWrapperForConnection.class); + } + + @After + public void tearDown() throws Exception { + if (server != null) { + server.shutdown(); + } + } + + @Test + public void readWithOneChannel() throws Exception { + EnhancedBigtableStub stub = EnhancedBigtableStub.create(builder.build()); + long errorCount = 0; + + for (int i = 0; i < 20; i++) { + Query query; + if (i % 3 == 0) { + query = Query.create(ERROR_TABLE_NAME); + errorCount += 1; + } else { + query = Query.create(SUCCESS_TABLE_NAME); + } + try { + stub.readRowsCallable().call(query).iterator().hasNext(); + } catch (Exception e) { + // noop + } + } + ArgumentCaptor errorCountCaptor = ArgumentCaptor.forClass(long.class); + Mockito.doNothing() + .when(statsRecorderWrapperForConnection) + .putAndRecordPerConnectionErrorCount(errorCountCaptor.capture()); + runInterceptorTasksAndAssertCount(); + List allErrorCounts = errorCountCaptor.getAllValues(); + assertThat(allErrorCounts.size()).isEqualTo(1); + assertThat(allErrorCounts.get(0)).isEqualTo(errorCount); + } + + @Test + public void readWithTwoChannels() throws Exception { + EnhancedBigtableStubSettings.Builder builderWithTwoChannels = + builder.setTransportChannelProvider( + ((InstantiatingGrpcChannelProvider) builder.getTransportChannelProvider()) + .toBuilder() + .setChannelPoolSettings(ChannelPoolSettings.staticallySized(2)) + .build()); + EnhancedBigtableStub stub = EnhancedBigtableStub.create(builderWithTwoChannels.build()); + long totalErrorCount = 0; + + for (int i = 0; i < 20; i++) { + try { + if (i < 10) { + totalErrorCount += 1; + stub.readRowsCallable().call(Query.create(ERROR_TABLE_NAME)).iterator().hasNext(); + } else { + stub.readRowsCallable().call(Query.create(SUCCESS_TABLE_NAME)).iterator().hasNext(); + } + } catch (Exception e) { + // noop + } + } + ArgumentCaptor errorCountCaptor = ArgumentCaptor.forClass(long.class); + Mockito.doNothing() + .when(statsRecorderWrapperForConnection) + .putAndRecordPerConnectionErrorCount(errorCountCaptor.capture()); + runInterceptorTasksAndAssertCount(); + + List allErrorCounts = errorCountCaptor.getAllValues(); + assertThat(allErrorCounts.size()).isEqualTo(2); + // Requests get assigned to channels using a Round Robin algorithm, so half to each. + assertThat(allErrorCounts).containsExactly(totalErrorCount / 2, totalErrorCount / 2); + } + + @Test + public void readOverTwoPeriods() throws Exception { + EnhancedBigtableStub stub = EnhancedBigtableStub.create(builder.build()); + long errorCount = 0; + + for (int i = 0; i < 20; i++) { + Query query; + if (i % 3 == 0) { + query = Query.create(ERROR_TABLE_NAME); + errorCount += 1; + } else { + query = Query.create(SUCCESS_TABLE_NAME); + } + try { + stub.readRowsCallable().call(query).iterator().hasNext(); + } catch (Exception e) { + // noop + } + } + ArgumentCaptor errorCountCaptor = ArgumentCaptor.forClass(long.class); + Mockito.doNothing() + .when(statsRecorderWrapperForConnection) + .putAndRecordPerConnectionErrorCount(errorCountCaptor.capture()); + runInterceptorTasksAndAssertCount(); + List allErrorCounts = errorCountCaptor.getAllValues(); + assertThat(allErrorCounts.size()).isEqualTo(1); + assertThat(allErrorCounts.get(0)).isEqualTo(errorCount); + + errorCount = 0; + + for (int i = 0; i < 20; i++) { + Query query; + if (i % 3 == 0) { + query = Query.create(SUCCESS_TABLE_NAME); + } else { + query = Query.create(ERROR_TABLE_NAME); + errorCount += 1; + } + try { + stub.readRowsCallable().call(query).iterator().hasNext(); + } catch (Exception e) { + // noop + } + } + errorCountCaptor = ArgumentCaptor.forClass(long.class); + Mockito.doNothing() + .when(statsRecorderWrapperForConnection) + .putAndRecordPerConnectionErrorCount(errorCountCaptor.capture()); + runInterceptorTasksAndAssertCount(); + allErrorCounts = errorCountCaptor.getAllValues(); + assertThat(allErrorCounts.size()).isEqualTo(1); + assertThat(allErrorCounts.get(0)).isEqualTo(errorCount); + } + + @Test + public void ignoreInactiveConnection() throws Exception { + EnhancedBigtableStub stub = EnhancedBigtableStub.create(builder.build()); + + ArgumentCaptor errorCountCaptor = ArgumentCaptor.forClass(long.class); + Mockito.doNothing() + .when(statsRecorderWrapperForConnection) + .putAndRecordPerConnectionErrorCount(errorCountCaptor.capture()); + runInterceptorTasksAndAssertCount(); + List allErrorCounts = errorCountCaptor.getAllValues(); + assertThat(allErrorCounts).isEmpty(); + } + + @Test + public void noFailedRequests() throws Exception { + EnhancedBigtableStub stub = EnhancedBigtableStub.create(builder.build()); + + for (int i = 0; i < 20; i++) { + try { + stub.readRowsCallable().call(Query.create(SUCCESS_TABLE_NAME)).iterator().hasNext(); + } catch (Exception e) { + // noop + } + } + ArgumentCaptor errorCountCaptor = ArgumentCaptor.forClass(long.class); + Mockito.doNothing() + .when(statsRecorderWrapperForConnection) + .putAndRecordPerConnectionErrorCount(errorCountCaptor.capture()); + runInterceptorTasksAndAssertCount(); + List allErrorCounts = errorCountCaptor.getAllValues(); + assertThat(allErrorCounts.size()).isEqualTo(1); + assertThat(allErrorCounts.get(0)).isEqualTo(0); + } + + private void runInterceptorTasksAndAssertCount() { + int actualNumOfTasks = 0; + for (Runnable runnable : runnableCaptor.getAllValues()) { + if (runnable instanceof ErrorCountPerConnectionMetricTracker) { + ((ErrorCountPerConnectionMetricTracker) runnable) + .setStatsRecorderWrapperForConnection(statsRecorderWrapperForConnection); + runnable.run(); + actualNumOfTasks++; + } + } + assertThat(actualNumOfTasks).isEqualTo(1); + } + + static class FakeService extends BigtableGrpc.BigtableImplBase { + @Override + public void readRows( + ReadRowsRequest request, StreamObserver responseObserver) { + if (request.getTableName().contains(SUCCESS_TABLE_NAME)) { + responseObserver.onNext(ReadRowsResponse.getDefaultInstance()); + responseObserver.onCompleted(); + } else { + // Send a non-retriable error, since otherwise the client tries to use the mocked + // ScheduledExecutorService object for retyring, resulting in a hang. + StatusRuntimeException exception = new StatusRuntimeException(Status.INTERNAL); + responseObserver.onError(exception); + } + } + } +}