Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: create the backbone of counting errors per connection each minute. #2094

Merged
merged 23 commits into from Feb 16, 2024
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
ac7363d
Create the backbone of counting errors per connection each minute.
rkaregar Feb 2, 2024
1f26796
Clean up creating new interceptors and StatsRecorderWrapper ctor.
rkaregar Feb 5, 2024
a82f332
Rename setting background task and fix imports.
rkaregar Feb 5, 2024
0c0ae00
Temporarily skip exporting per connection metrics to fix test failures.
rkaregar Feb 6, 2024
2a09449
Temporarily share the tests for debugging purposes
rkaregar Feb 6, 2024
b986073
Temporarily add the test for debugging.
rkaregar Feb 6, 2024
c254251
Remove the new ExecutorProvider and fix integration test failures.
rkaregar Feb 7, 2024
7c149ed
Update unit tests to reflect the new setup.
rkaregar Feb 7, 2024
69e693f
Clean up and add tests.
rkaregar Feb 8, 2024
9f6e964
Clean comments and add a TODO.
rkaregar Feb 8, 2024
92fd9ea
Improve tests and comments.
rkaregar Feb 8, 2024
cb29640
Address comments and refactor by defining new classes.
rkaregar Feb 13, 2024
d827cf3
Fix code formatting.
rkaregar Feb 13, 2024
b5389f0
Refactor classes and move to better packages.
rkaregar Feb 15, 2024
9248b14
Clean up classes and address comments.
rkaregar Feb 16, 2024
b5fcd12
Update the scheduler object.
rkaregar Feb 16, 2024
92e6fdf
Apply cleanups.
rkaregar Feb 16, 2024
96730e7
Fix unit tests and avoid hanging when getting error in close().
rkaregar Feb 16, 2024
fc67864
Fix code formatting.
rkaregar Feb 16, 2024
26e5a93
Improve error handling in the close() method.
rkaregar Feb 16, 2024
26c165b
Improve exception logging.
rkaregar Feb 16, 2024
a2e6d18
Fix code formatting.
rkaregar Feb 16, 2024
a1d9c77
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Feb 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -42,3 +42,4 @@ api_key
artman-genfiles

.flattened-pom.xml
dependency-reduced-pom.xml
Expand Up @@ -51,6 +51,10 @@ public void export(Collection<Metric> metrics) {
if (!metric.getMetricDescriptor().getName().contains("bigtable")) {
continue;
}
// TODO: temporarily skip exporting per connection metrics.
if (metric.getMetricDescriptor().getName().contains("per_connection_error_count")) {
continue;
}

projectToTimeSeries =
metric.getTimeSeriesList().stream()
Expand Down
@@ -0,0 +1,57 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.stats;

import com.google.api.core.InternalApi;
import io.opencensus.stats.MeasureMap;
import io.opencensus.stats.StatsRecorder;
import io.opencensus.tags.TagContext;
import io.opencensus.tags.TagContextBuilder;
import io.opencensus.tags.TagKey;
import io.opencensus.tags.TagValue;
import io.opencensus.tags.Tagger;
import io.opencensus.tags.Tags;
import java.util.Map;

/** A wrapper to record built-in metrics for connection metrics not tied to operations/RPCs. */
@InternalApi("For internal use only")
public class StatsRecorderWrapperForConnection {
private final StatsRecorder statsRecorder;
private final TagContext tagContext;
private MeasureMap perConnectionErrorCountMeasureMap;

public StatsRecorderWrapperForConnection(
Map<String, String> statsAttributes, StatsRecorder statsRecorder) {
this.statsRecorder = statsRecorder;

this.perConnectionErrorCountMeasureMap = statsRecorder.newMeasureMap();

Tagger tagger = Tags.getTagger();
TagContextBuilder tagContextBuilder = tagger.toBuilder(tagger.getCurrentTagContext());
for (Map.Entry<String, String> entry : statsAttributes.entrySet()) {
tagContextBuilder.putLocal(TagKey.create(entry.getKey()), TagValue.create(entry.getValue()));
}
this.tagContext = tagContextBuilder.build();
}

public void putAndRecordPerConnectionErrorCount(long errorCount) {
perConnectionErrorCountMeasureMap.put(
BuiltinMeasureConstants.PER_CONNECTION_ERROR_COUNT, errorCount);

perConnectionErrorCountMeasureMap.record(tagContext);
perConnectionErrorCountMeasureMap = statsRecorder.newMeasureMap();
}
}
Expand Up @@ -40,6 +40,11 @@ public static StatsRecorderWrapper createRecorder(
operationType, spanName, statsAttributes, Stats.getStatsRecorder());
}

public static StatsRecorderWrapperForConnection createRecorderForConnection(
Map<String, String> statsAttributes) {
return new StatsRecorderWrapperForConnection(statsAttributes, Stats.getStatsRecorder());
}

// This is used in integration tests to get the tag value strings from view manager because Stats
// is relocated to com.google.bigtable.veneer.repackaged.io.opencensus.
@InternalApi("Visible for testing")
Expand Down
Expand Up @@ -15,6 +15,7 @@
*/
package com.google.cloud.bigtable.data.v2.stub;

import com.google.api.core.ApiFunction;
import com.google.api.core.BetaApi;
import com.google.api.core.InternalApi;
import com.google.api.gax.batching.Batcher;
Expand Down Expand Up @@ -94,6 +95,7 @@
import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracerUnaryCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsTracerFactory;
import com.google.cloud.bigtable.data.v2.stub.metrics.CompositeTracerFactory;
import com.google.cloud.bigtable.data.v2.stub.metrics.ErrorCountPerConnectionMetricTracker;
import com.google.cloud.bigtable.data.v2.stub.metrics.MetricsTracerFactory;
import com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants;
import com.google.cloud.bigtable.data.v2.stub.metrics.StatsHeadersServerStreamingCallable;
Expand All @@ -117,6 +119,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.ByteString;
import io.grpc.ManagedChannelBuilder;
import io.opencensus.stats.Stats;
import io.opencensus.stats.StatsRecorder;
import io.opencensus.tags.TagKey;
Expand Down Expand Up @@ -149,7 +152,6 @@
public class EnhancedBigtableStub implements AutoCloseable {
private static final String CLIENT_NAME = "Bigtable";
private static final long FLOW_CONTROL_ADJUSTING_INTERVAL_MS = TimeUnit.SECONDS.toMillis(20);

private final EnhancedBigtableStubSettings settings;
private final ClientContext clientContext;

Expand All @@ -176,7 +178,6 @@ public class EnhancedBigtableStub implements AutoCloseable {

public static EnhancedBigtableStub create(EnhancedBigtableStubSettings settings)
throws IOException {

settings = settings.toBuilder().setTracerFactory(createBigtableTracerFactory(settings)).build();
ClientContext clientContext = createClientContext(settings);

Expand Down Expand Up @@ -204,10 +205,27 @@ public static ClientContext createClientContext(EnhancedBigtableStubSettings set
? ((InstantiatingGrpcChannelProvider) builder.getTransportChannelProvider()).toBuilder()
: null;

if (builder.getEnableRoutingCookie() && transportProvider != null) {
// TODO: this also need to be added to BigtableClientFactory
// patch cookies interceptor
transportProvider.setInterceptorProvider(() -> ImmutableList.of(new CookiesInterceptor()));
ErrorCountPerConnectionMetricTracker errorCountPerConnectionMetricTracker;
if (transportProvider != null) {
errorCountPerConnectionMetricTracker =
new ErrorCountPerConnectionMetricTracker(createBuiltinAttributes(builder));
ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder> oldChannelConfigurator =
transportProvider.getChannelConfigurator();
transportProvider.setChannelConfigurator(
managedChannelBuilder -> {
if (settings.getEnableRoutingCookie()) {
managedChannelBuilder.intercept(new CookiesInterceptor());
}

managedChannelBuilder.intercept(errorCountPerConnectionMetricTracker.getInterceptor());

if (oldChannelConfigurator != null) {
managedChannelBuilder = oldChannelConfigurator.apply(managedChannelBuilder);
}
return managedChannelBuilder;
});
} else {
errorCountPerConnectionMetricTracker = null;
}

// Inject channel priming
Expand All @@ -233,7 +251,12 @@ public static ClientContext createClientContext(EnhancedBigtableStubSettings set
builder.setTransportChannelProvider(transportProvider.build());
}

return ClientContext.create(builder.build());
ClientContext clientContext = ClientContext.create(builder.build());
if (errorCountPerConnectionMetricTracker != null) {
errorCountPerConnectionMetricTracker.startConnectionErrorCountTracker(
clientContext.getExecutor());
}
return clientContext;
}

public static ApiTracerFactory createBigtableTracerFactory(
Expand All @@ -254,13 +277,7 @@ public static ApiTracerFactory createBigtableTracerFactory(
.put(RpcMeasureConstants.BIGTABLE_INSTANCE_ID, TagValue.create(instanceId))
.put(RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID, TagValue.create(appProfileId))
.build();
ImmutableMap<String, String> builtinAttributes =
ImmutableMap.<String, String>builder()
.put("project_id", projectId)
.put("instance", instanceId)
.put("app_profile", appProfileId)
.put("client_name", "bigtable-java/" + Version.VERSION)
.build();
ImmutableMap<String, String> builtinAttributes = createBuiltinAttributes(settings.toBuilder());
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved

return new CompositeTracerFactory(
ImmutableList.of(
Expand All @@ -283,6 +300,16 @@ public static ApiTracerFactory createBigtableTracerFactory(
settings.getTracerFactory()));
}

private static ImmutableMap<String, String> createBuiltinAttributes(
EnhancedBigtableStubSettings.Builder builder) {
return ImmutableMap.<String, String>builder()
.put("project_id", builder.getProjectId())
.put("instance", builder.getInstanceId())
.put("app_profile", builder.getAppProfileId())
.put("client_name", "bigtable-java/" + Version.VERSION)
.build();
}

private static void patchCredentials(EnhancedBigtableStubSettings.Builder settings)
throws IOException {
int i = settings.getEndpoint().lastIndexOf(":");
Expand Down
Expand Up @@ -652,7 +652,6 @@ private Builder() {
setCredentialsProvider(defaultCredentialsProviderBuilder().build());
this.enableRoutingCookie = true;
this.enableRetryInfo = true;

// Defaults provider
BigtableStubSettings.Builder baseDefaults = BigtableStubSettings.newBuilder();

Expand Down Expand Up @@ -772,7 +771,6 @@ private Builder(EnhancedBigtableStubSettings settings) {
jwtAudienceMapping = settings.jwtAudienceMapping;
enableRoutingCookie = settings.enableRoutingCookie;
enableRetryInfo = settings.enableRetryInfo;

// Per method settings.
readRowsSettings = settings.readRowsSettings.toBuilder();
readRowSettings = settings.readRowSettings.toBuilder();
Expand Down
@@ -0,0 +1,89 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.stub.metrics;

import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ForwardingClientCall;
import io.grpc.ForwardingClientCallListener;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import java.util.concurrent.atomic.LongAdder;
import java.util.logging.Level;
import java.util.logging.Logger;

/** An interceptor which counts the number of failed responses for a channel. */
class ConnectionErrorCountInterceptor implements ClientInterceptor {
private static final Logger LOG =
Logger.getLogger(ConnectionErrorCountInterceptor.class.toString());
private final LongAdder numOfErrors;
private final LongAdder numOfSuccesses;

ConnectionErrorCountInterceptor() {
numOfErrors = new LongAdder();
numOfSuccesses = new LongAdder();
}

@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> methodDescriptor, CallOptions callOptions, Channel channel) {
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
channel.newCall(methodDescriptor, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
super.start(
new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(
responseListener) {
@Override
public void onClose(Status status, Metadata trailers) {
// Connection accounting is non-critical, so we log the exception, but let normal
// processing proceed.
try {
handleOnCloseUnsafe(status);
} catch (Throwable t) {
if (t instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
LOG.log(
Level.WARNING, "Unexpected error while updating connection error stats", t);
}
super.onClose(status, trailers);
}

private void handleOnCloseUnsafe(Status status) {
if (status.isOk()) {
numOfSuccesses.increment();
} else {
numOfErrors.increment();
}
}
},
headers);
}
};
}

long getAndResetNumOfErrors() {
return numOfErrors.sumThenReset();
}

long getAndResetNumOfSuccesses() {
return numOfSuccesses.sumThenReset();
}
}
@@ -0,0 +1,83 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.stub.metrics;

import com.google.api.core.InternalApi;
import com.google.cloud.bigtable.stats.StatsRecorderWrapperForConnection;
import com.google.cloud.bigtable.stats.StatsWrapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import io.grpc.ClientInterceptor;
import java.util.Collections;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/* Background task that goes through all connections and updates the errors_per_connection metric. */
@InternalApi("For internal use only")
public class ErrorCountPerConnectionMetricTracker implements Runnable {
rkaregar marked this conversation as resolved.
Show resolved Hide resolved
private static final Integer PER_CONNECTION_ERROR_COUNT_PERIOD_SECONDS = 60;
private final Set<ConnectionErrorCountInterceptor> connectionErrorCountInterceptors;
private final Object interceptorsLock = new Object();
// This is not final so that it can be updated and mocked during testing.
private StatsRecorderWrapperForConnection statsRecorderWrapperForConnection;

@VisibleForTesting
void setStatsRecorderWrapperForConnection(
StatsRecorderWrapperForConnection statsRecorderWrapperForConnection) {
this.statsRecorderWrapperForConnection = statsRecorderWrapperForConnection;
}

public ErrorCountPerConnectionMetricTracker(ImmutableMap<String, String> builtinAttributes) {
connectionErrorCountInterceptors =
Collections.synchronizedSet(Collections.newSetFromMap(new WeakHashMap<>()));

this.statsRecorderWrapperForConnection =
StatsWrapper.createRecorderForConnection(builtinAttributes);
}

public void startConnectionErrorCountTracker(ScheduledExecutorService scheduler) {
scheduler.scheduleAtFixedRate(
this, 0, PER_CONNECTION_ERROR_COUNT_PERIOD_SECONDS, TimeUnit.SECONDS);
}

public ClientInterceptor getInterceptor() {
ConnectionErrorCountInterceptor connectionErrorCountInterceptor =
new ConnectionErrorCountInterceptor();
synchronized (interceptorsLock) {
connectionErrorCountInterceptors.add(connectionErrorCountInterceptor);
}
return connectionErrorCountInterceptor;
}

@Override
public void run() {
synchronized (interceptorsLock) {
for (ConnectionErrorCountInterceptor interceptor : connectionErrorCountInterceptors) {
long errors = interceptor.getAndResetNumOfErrors();
long successes = interceptor.getAndResetNumOfSuccesses();
// We avoid keeping track of inactive connections (i.e., without any failed or successful
// requests).
if (errors > 0 || successes > 0) {
// TODO: add a metric to also keep track of the number of successful requests per each
// connection.
statsRecorderWrapperForConnection.putAndRecordPerConnectionErrorCount(errors);
}
}
}
}
}