Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: create the backbone of counting errors per connection each minute. #2094

Merged
merged 23 commits into from Feb 16, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
ac7363d
Create the backbone of counting errors per connection each minute.
rkaregar Feb 2, 2024
1f26796
Clean up creating new interceptors and StatsRecorderWrapper ctor.
rkaregar Feb 5, 2024
a82f332
Rename setting background task and fix imports.
rkaregar Feb 5, 2024
0c0ae00
Temporarily skip exporting per connection metrics to fix test failures.
rkaregar Feb 6, 2024
2a09449
Temporarily share the tests for debugging purposes
rkaregar Feb 6, 2024
b986073
Temporarily add the test for debugging.
rkaregar Feb 6, 2024
c254251
Remove the new ExecutorProvider and fix integration test failures.
rkaregar Feb 7, 2024
7c149ed
Update unit tests to reflect the new setup.
rkaregar Feb 7, 2024
69e693f
Clean up and add tests.
rkaregar Feb 8, 2024
9f6e964
Clean comments and add a TODO.
rkaregar Feb 8, 2024
92fd9ea
Improve tests and comments.
rkaregar Feb 8, 2024
cb29640
Address comments and refactor by defining new classes.
rkaregar Feb 13, 2024
d827cf3
Fix code formatting.
rkaregar Feb 13, 2024
b5389f0
Refactor classes and move to better packages.
rkaregar Feb 15, 2024
9248b14
Clean up classes and address comments.
rkaregar Feb 16, 2024
b5fcd12
Update the scheduler object.
rkaregar Feb 16, 2024
92e6fdf
Apply cleanups.
rkaregar Feb 16, 2024
96730e7
Fix unit tests and avoid hanging when getting error in close().
rkaregar Feb 16, 2024
fc67864
Fix code formatting.
rkaregar Feb 16, 2024
26e5a93
Improve error handling in the close() method.
rkaregar Feb 16, 2024
26c165b
Improve exception logging.
rkaregar Feb 16, 2024
a2e6d18
Fix code formatting.
rkaregar Feb 16, 2024
a1d9c77
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Feb 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -42,3 +42,4 @@ api_key
artman-genfiles

.flattened-pom.xml
dependency-reduced-pom.xml
Expand Up @@ -32,16 +32,17 @@
@InternalApi("For internal use only")
public class StatsRecorderWrapper {

private final OperationType operationType;
private OperationType operationType;
mutianf marked this conversation as resolved.
Show resolved Hide resolved

private final Tagger tagger;
private final StatsRecorder statsRecorder;
private final TagContext parentContext;
private final SpanName spanName;
private SpanName spanName;
rkaregar marked this conversation as resolved.
Show resolved Hide resolved
private final Map<String, String> statsAttributes;

private MeasureMap attemptMeasureMap;
private MeasureMap operationMeasureMap;
private MeasureMap perConnectionErrorCountMeasureMap;

public StatsRecorderWrapper(
OperationType operationType,
Expand All @@ -59,6 +60,14 @@ public StatsRecorderWrapper(
this.operationMeasureMap = statsRecorder.newMeasureMap();
}

public StatsRecorderWrapper(Map<String, String> statsAttributes, StatsRecorder statsRecorder) {
rkaregar marked this conversation as resolved.
Show resolved Hide resolved
this.tagger = Tags.getTagger();
this.statsRecorder = statsRecorder;
this.parentContext = tagger.getCurrentTagContext();
this.statsAttributes = statsAttributes;
this.perConnectionErrorCountMeasureMap = statsRecorder.newMeasureMap();
}

public void recordOperation(String status, String tableId, String zone, String cluster) {
TagContextBuilder tagCtx =
newTagContextBuilder(tableId, zone, cluster)
Expand Down Expand Up @@ -87,6 +96,15 @@ public void recordAttempt(String status, String tableId, String zone, String clu
attemptMeasureMap = statsRecorder.newMeasureMap();
}

public void putAndRecordPerConnectionErrorCount(long errorCount) {
perConnectionErrorCountMeasureMap.put(
BuiltinMeasureConstants.PER_CONNECTION_ERROR_COUNT, errorCount);

perConnectionErrorCountMeasureMap.record(
newTagContextBuilderForPerConnectionErrorCount().build());
perConnectionErrorCountMeasureMap = statsRecorder.newMeasureMap();
}

public void putOperationLatencies(long operationLatency) {
operationMeasureMap.put(BuiltinMeasureConstants.OPERATION_LATENCIES, operationLatency);
}
Expand Down Expand Up @@ -132,4 +150,12 @@ private TagContextBuilder newTagContextBuilder(String tableId, String zone, Stri
}
return tagContextBuilder;
}

private TagContextBuilder newTagContextBuilderForPerConnectionErrorCount() {
rkaregar marked this conversation as resolved.
Show resolved Hide resolved
TagContextBuilder tagContextBuilder = tagger.toBuilder(parentContext);
for (Map.Entry<String, String> entry : statsAttributes.entrySet()) {
tagContextBuilder.putLocal(TagKey.create(entry.getKey()), TagValue.create(entry.getValue()));
}
return tagContextBuilder;
}
}
Expand Up @@ -40,6 +40,10 @@ public static StatsRecorderWrapper createRecorder(
operationType, spanName, statsAttributes, Stats.getStatsRecorder());
}

public static StatsRecorderWrapper createRecorder(Map<String, String> statsAttributes) {
return new StatsRecorderWrapper(statsAttributes, Stats.getStatsRecorder());
rkaregar marked this conversation as resolved.
Show resolved Hide resolved
}

// This is used in integration tests to get the tag value strings from view manager because Stats
// is relocated to com.google.bigtable.veneer.repackaged.io.opencensus.
@InternalApi("Visible for testing")
Expand Down
@@ -0,0 +1,73 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.stub;

import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ForwardingClientCall;
import io.grpc.ForwardingClientCallListener;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

/** An interceptor which counts the number of failed responses for a channel. */
class ConnectionErrorCountInterceptor implements ClientInterceptor {
rkaregar marked this conversation as resolved.
Show resolved Hide resolved
rkaregar marked this conversation as resolved.
Show resolved Hide resolved
private final AtomicInteger numOfErrors;
private final AtomicInteger numOfSuccesses;

public ConnectionErrorCountInterceptor(Set<ConnectionErrorCountInterceptor> interceptors) {
numOfErrors = new AtomicInteger(0);
numOfSuccesses = new AtomicInteger(0);
rkaregar marked this conversation as resolved.
Show resolved Hide resolved
interceptors.add(this);
}

@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> methodDescriptor, CallOptions callOptions, Channel channel) {
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
channel.newCall(methodDescriptor, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
super.start(
new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(
responseListener) {
@Override
public void onClose(Status status, Metadata trailers) {
if (status.isOk()) {
numOfSuccesses.getAndIncrement();
} else {
numOfErrors.getAndIncrement();
}
super.onClose(status, trailers);
}
},
headers);
}
};
}

public int getAndResetNumOfErrors() {
return numOfErrors.getAndSet(0);
}

public int getAndResetNumOfSuccesses() {
return numOfSuccesses.getAndSet(0);
}
}
@@ -0,0 +1,48 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.stub;

import com.google.cloud.bigtable.stats.StatsRecorderWrapper;
import com.google.cloud.bigtable.stats.StatsWrapper;
import com.google.common.collect.ImmutableMap;
import java.util.Set;

class CountErrorsPerInterceptorTask implements Runnable {
rkaregar marked this conversation as resolved.
Show resolved Hide resolved
private final Set<ConnectionErrorCountInterceptor> interceptors;
private final StatsRecorderWrapper statsRecorderWrapper;

public CountErrorsPerInterceptorTask(
Set<ConnectionErrorCountInterceptor> interceptors,
ImmutableMap<String, String> builtinAttributes) {
this.interceptors = interceptors;
this.statsRecorderWrapper = StatsWrapper.createRecorder(builtinAttributes);
}

@Override
public void run() {
for (ConnectionErrorCountInterceptor interceptor : interceptors) {
int errors = interceptor.getAndResetNumOfErrors();
int successes = interceptor.getAndResetNumOfSuccesses();
// We avoid keeping track of inactive connections (i.e., without any failed or successful
rkaregar marked this conversation as resolved.
Show resolved Hide resolved
// requests).
if (errors > 0 || successes > 0) {
// TODO: add a metric to also keep track of the number of successful requests per each
// connection.
this.statsRecorderWrapper.putAndRecordPerConnectionErrorCount(errors);
}
}
}
}
Expand Up @@ -126,9 +126,9 @@
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.*;
rkaregar marked this conversation as resolved.
Show resolved Hide resolved
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand All @@ -149,6 +149,7 @@
public class EnhancedBigtableStub implements AutoCloseable {
private static final String CLIENT_NAME = "Bigtable";
private static final long FLOW_CONTROL_ADJUSTING_INTERVAL_MS = TimeUnit.SECONDS.toMillis(20);
private static final Integer PER_CONNECTION_ERROR_COUNT_PERIOD_SECONDS = 60;

private final EnhancedBigtableStubSettings settings;
private final ClientContext clientContext;
Expand Down Expand Up @@ -204,11 +205,7 @@ public static ClientContext createClientContext(EnhancedBigtableStubSettings set
? ((InstantiatingGrpcChannelProvider) builder.getTransportChannelProvider()).toBuilder()
: null;

if (builder.getEnableRoutingCookie() && transportProvider != null) {
// TODO: this also need to be added to BigtableClientFactory
// patch cookies interceptor
transportProvider.setInterceptorProvider(() -> ImmutableList.of(new CookiesInterceptor()));
}
setInterceptors(transportProvider, builder);

// Inject channel priming
if (settings.isRefreshingChannel()) {
Expand Down Expand Up @@ -236,6 +233,39 @@ public static ClientContext createClientContext(EnhancedBigtableStubSettings set
return ClientContext.create(builder.build());
}

private static void setInterceptors(
InstantiatingGrpcChannelProvider.Builder transportProvider,
EnhancedBigtableStubSettings.Builder settings) {
Set<ConnectionErrorCountInterceptor> interceptors =
setupConnectionErrorCountInterceptors(settings);
if (transportProvider != null) {
rkaregar marked this conversation as resolved.
Show resolved Hide resolved
if (settings.getEnableRoutingCookie()) {
// TODO: this also need to be added to BigtableClientFactory
transportProvider.setInterceptorProvider(
() ->
ImmutableList.of(
new CookiesInterceptor(), new ConnectionErrorCountInterceptor(interceptors)));
} else {
transportProvider.setInterceptorProvider(
() -> ImmutableList.of(new ConnectionErrorCountInterceptor(interceptors)));
}
}
}

private static Set<ConnectionErrorCountInterceptor> setupConnectionErrorCountInterceptors(
rkaregar marked this conversation as resolved.
Show resolved Hide resolved
EnhancedBigtableStubSettings.Builder settings) {
Set<ConnectionErrorCountInterceptor> interceptors =
Collections.newSetFromMap(new WeakHashMap<>());
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
rkaregar marked this conversation as resolved.
Show resolved Hide resolved
ImmutableMap<String, String> builtinAttributes = createBuiltinAttributes(settings);
scheduler.scheduleAtFixedRate(
new CountErrorsPerInterceptorTask(interceptors, builtinAttributes),
0,
PER_CONNECTION_ERROR_COUNT_PERIOD_SECONDS,
TimeUnit.SECONDS);
return interceptors;
}

public static ApiTracerFactory createBigtableTracerFactory(
EnhancedBigtableStubSettings settings) {
return createBigtableTracerFactory(settings, Tags.getTagger(), Stats.getStatsRecorder());
Expand All @@ -254,13 +284,7 @@ public static ApiTracerFactory createBigtableTracerFactory(
.put(RpcMeasureConstants.BIGTABLE_INSTANCE_ID, TagValue.create(instanceId))
.put(RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID, TagValue.create(appProfileId))
.build();
ImmutableMap<String, String> builtinAttributes =
ImmutableMap.<String, String>builder()
.put("project_id", projectId)
.put("instance", instanceId)
.put("app_profile", appProfileId)
.put("client_name", "bigtable-java/" + Version.VERSION)
.build();
ImmutableMap<String, String> builtinAttributes = createBuiltinAttributes(settings.toBuilder());
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved

return new CompositeTracerFactory(
ImmutableList.of(
Expand All @@ -283,6 +307,16 @@ public static ApiTracerFactory createBigtableTracerFactory(
settings.getTracerFactory()));
}

private static ImmutableMap<String, String> createBuiltinAttributes(
EnhancedBigtableStubSettings.Builder settings) {
return ImmutableMap.<String, String>builder()
.put("project", settings.getProjectId())
.put("instance", settings.getInstanceId())
.put("app_profile", settings.getAppProfileId())
.put("client_name", "bigtable-java/" + Version.VERSION)
.build();
}

private static void patchCredentials(EnhancedBigtableStubSettings.Builder settings)
throws IOException {
int i = settings.getEndpoint().lastIndexOf(":");
Expand Down