Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: create the backbone of counting errors per connection each minute. #2094

Merged
merged 23 commits into from Feb 16, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
ac7363d
Create the backbone of counting errors per connection each minute.
rkaregar Feb 2, 2024
1f26796
Clean up creating new interceptors and StatsRecorderWrapper ctor.
rkaregar Feb 5, 2024
a82f332
Rename setting background task and fix imports.
rkaregar Feb 5, 2024
0c0ae00
Temporarily skip exporting per connection metrics to fix test failures.
rkaregar Feb 6, 2024
2a09449
Temporarily share the tests for debugging purposes
rkaregar Feb 6, 2024
b986073
Temporarily add the test for debugging.
rkaregar Feb 6, 2024
c254251
Remove the new ExecutorProvider and fix integration test failures.
rkaregar Feb 7, 2024
7c149ed
Update unit tests to reflect the new setup.
rkaregar Feb 7, 2024
69e693f
Clean up and add tests.
rkaregar Feb 8, 2024
9f6e964
Clean comments and add a TODO.
rkaregar Feb 8, 2024
92fd9ea
Improve tests and comments.
rkaregar Feb 8, 2024
cb29640
Address comments and refactor by defining new classes.
rkaregar Feb 13, 2024
d827cf3
Fix code formatting.
rkaregar Feb 13, 2024
b5389f0
Refactor classes and move to better packages.
rkaregar Feb 15, 2024
9248b14
Clean up classes and address comments.
rkaregar Feb 16, 2024
b5fcd12
Update the scheduler object.
rkaregar Feb 16, 2024
92e6fdf
Apply cleanups.
rkaregar Feb 16, 2024
96730e7
Fix unit tests and avoid hanging when getting error in close().
rkaregar Feb 16, 2024
fc67864
Fix code formatting.
rkaregar Feb 16, 2024
26e5a93
Improve error handling in the close() method.
rkaregar Feb 16, 2024
26c165b
Improve exception logging.
rkaregar Feb 16, 2024
a2e6d18
Fix code formatting.
rkaregar Feb 16, 2024
a1d9c77
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Feb 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -42,3 +42,4 @@ api_key
artman-genfiles

.flattened-pom.xml
dependency-reduced-pom.xml
Expand Up @@ -42,6 +42,7 @@ public class StatsRecorderWrapper {

private MeasureMap attemptMeasureMap;
private MeasureMap operationMeasureMap;
private MeasureMap perConnectionErrorCountMeasureMap;

public StatsRecorderWrapper(
OperationType operationType,
Expand All @@ -57,6 +58,7 @@ public StatsRecorderWrapper(

this.attemptMeasureMap = statsRecorder.newMeasureMap();
this.operationMeasureMap = statsRecorder.newMeasureMap();
this.perConnectionErrorCountMeasureMap = statsRecorder.newMeasureMap();
}

public void recordOperation(String status, String tableId, String zone, String cluster) {
Expand Down Expand Up @@ -87,6 +89,15 @@ public void recordAttempt(String status, String tableId, String zone, String clu
attemptMeasureMap = statsRecorder.newMeasureMap();
}

public void putAndRecordPerConnectionErrorCount(long errorCount) {
perConnectionErrorCountMeasureMap.put(
BuiltinMeasureConstants.PER_CONNECTION_ERROR_COUNT, errorCount);

perConnectionErrorCountMeasureMap.record(
newTagContextBuilderForPerConnectionErrorCount().build());
perConnectionErrorCountMeasureMap = statsRecorder.newMeasureMap();
}

public void putOperationLatencies(long operationLatency) {
operationMeasureMap.put(BuiltinMeasureConstants.OPERATION_LATENCIES, operationLatency);
}
Expand Down Expand Up @@ -132,4 +143,12 @@ private TagContextBuilder newTagContextBuilder(String tableId, String zone, Stri
}
return tagContextBuilder;
}

private TagContextBuilder newTagContextBuilderForPerConnectionErrorCount() {
rkaregar marked this conversation as resolved.
Show resolved Hide resolved
TagContextBuilder tagContextBuilder = tagger.toBuilder(parentContext);
for (Map.Entry<String, String> entry : statsAttributes.entrySet()) {
tagContextBuilder.putLocal(TagKey.create(entry.getKey()), TagValue.create(entry.getValue()));
}
return tagContextBuilder;
}
}
@@ -0,0 +1,71 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.stub;

import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ForwardingClientCall;
import io.grpc.ForwardingClientCallListener;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import java.util.concurrent.atomic.AtomicInteger;

/** An interceptor which counts the number of failed responses for a channel. */
class ConnectionErrorCountInterceptor implements ClientInterceptor {
rkaregar marked this conversation as resolved.
Show resolved Hide resolved
rkaregar marked this conversation as resolved.
Show resolved Hide resolved
private final AtomicInteger numOfErrors;
private final AtomicInteger numOfSuccesses;

public ConnectionErrorCountInterceptor() {
rkaregar marked this conversation as resolved.
Show resolved Hide resolved
numOfErrors = new AtomicInteger(0);
numOfSuccesses = new AtomicInteger(0);
rkaregar marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> methodDescriptor, CallOptions callOptions, Channel channel) {
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
channel.newCall(methodDescriptor, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
super.start(
new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(
responseListener) {
@Override
public void onClose(Status status, Metadata trailers) {
if (status.isOk()) {
numOfSuccesses.getAndIncrement();
} else {
numOfErrors.getAndIncrement();
}
super.onClose(status, trailers);
}
},
headers);
}
};
}

public int getAndResetNumOfErrors() {
return numOfErrors.getAndSet(0);
}

public int getAndResetNumOfSuccesses() {
return numOfSuccesses.getAndSet(0);
}
}
@@ -0,0 +1,48 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.stub;

import com.google.cloud.bigtable.stats.StatsRecorderWrapper;
import com.google.cloud.bigtable.stats.StatsWrapper;
import com.google.common.collect.ImmutableMap;
import java.util.Set;

class CountErrorsPerInterceptorTask implements Runnable {
rkaregar marked this conversation as resolved.
Show resolved Hide resolved
private final Set<ConnectionErrorCountInterceptor> interceptors;
private final StatsRecorderWrapper statsRecorderWrapper;

public CountErrorsPerInterceptorTask(
Set<ConnectionErrorCountInterceptor> interceptors,
ImmutableMap<String, String> builtinAttributes) {
this.interceptors = interceptors;
this.statsRecorderWrapper = StatsWrapper.createRecorder(null, null, builtinAttributes);
rkaregar marked this conversation as resolved.
Show resolved Hide resolved
rkaregar marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public void run() {
for (ConnectionErrorCountInterceptor interceptor : interceptors) {
int errors = interceptor.getAndResetNumOfErrors();
int successes = interceptor.getAndResetNumOfSuccesses();
// We avoid keeping track of inactive connections (i.e., without any failed or successful
rkaregar marked this conversation as resolved.
Show resolved Hide resolved
// requests).
if (errors > 0 || successes > 0) {
// TODO: add a metric to also keep track of the number of successful requests per each
// connection.
this.statsRecorderWrapper.putAndRecordPerConnectionErrorCount(errors);
}
}
}
}
Expand Up @@ -117,6 +117,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.ByteString;
import io.grpc.ClientInterceptor;
import io.opencensus.stats.Stats;
import io.opencensus.stats.StatsRecorder;
import io.opencensus.tags.TagKey;
Expand All @@ -129,6 +130,10 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand All @@ -149,6 +154,7 @@
public class EnhancedBigtableStub implements AutoCloseable {
private static final String CLIENT_NAME = "Bigtable";
private static final long FLOW_CONTROL_ADJUSTING_INTERVAL_MS = TimeUnit.SECONDS.toMillis(20);
private static final Integer PER_CONNECTION_ERROR_COUNT_PERIOD_SECONDS = 60;

private final EnhancedBigtableStubSettings settings;
private final ClientContext clientContext;
Expand Down Expand Up @@ -204,10 +210,13 @@ public static ClientContext createClientContext(EnhancedBigtableStubSettings set
? ((InstantiatingGrpcChannelProvider) builder.getTransportChannelProvider()).toBuilder()
: null;

if (builder.getEnableRoutingCookie() && transportProvider != null) {
// TODO: this also need to be added to BigtableClientFactory
// patch cookies interceptor
transportProvider.setInterceptorProvider(() -> ImmutableList.of(new CookiesInterceptor()));
Set<ConnectionErrorCountInterceptor> connectionErrorCountInterceptors =
rkaregar marked this conversation as resolved.
Show resolved Hide resolved
Collections.newSetFromMap(new WeakHashMap<>());
setupConnectionErrorCountTask(builder, connectionErrorCountInterceptors);

if (transportProvider != null) {
transportProvider.setInterceptorProvider(
() -> getInterceptors(builder, connectionErrorCountInterceptors));
rkaregar marked this conversation as resolved.
Show resolved Hide resolved
}

// Inject channel priming
Expand Down Expand Up @@ -236,6 +245,35 @@ public static ClientContext createClientContext(EnhancedBigtableStubSettings set
return ClientContext.create(builder.build());
}

private static ImmutableList<ClientInterceptor> getInterceptors(
EnhancedBigtableStubSettings.Builder settings,
Set<ConnectionErrorCountInterceptor> connectionErrorCountInterceptors) {
ConnectionErrorCountInterceptor connectionErrorCountInterceptor =
new ConnectionErrorCountInterceptor();
connectionErrorCountInterceptors.add(connectionErrorCountInterceptor);
ImmutableList.Builder<ClientInterceptor> builder =
ImmutableList.<ClientInterceptor>builder().add(connectionErrorCountInterceptor);

if (settings.getEnableRoutingCookie()) {
// TODO: this also need to be added to BigtableClientFactory
builder.add(new CookiesInterceptor());
}

return builder.build();
}

private static void setupConnectionErrorCountTask(
EnhancedBigtableStubSettings.Builder settings,
Set<ConnectionErrorCountInterceptor> interceptors) {
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
rkaregar marked this conversation as resolved.
Show resolved Hide resolved
ImmutableMap<String, String> builtinAttributes = createBuiltinAttributes(settings);
scheduler.scheduleAtFixedRate(
new CountErrorsPerInterceptorTask(interceptors, builtinAttributes),
0,
PER_CONNECTION_ERROR_COUNT_PERIOD_SECONDS,
TimeUnit.SECONDS);
}

public static ApiTracerFactory createBigtableTracerFactory(
EnhancedBigtableStubSettings settings) {
return createBigtableTracerFactory(settings, Tags.getTagger(), Stats.getStatsRecorder());
Expand All @@ -254,13 +292,7 @@ public static ApiTracerFactory createBigtableTracerFactory(
.put(RpcMeasureConstants.BIGTABLE_INSTANCE_ID, TagValue.create(instanceId))
.put(RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID, TagValue.create(appProfileId))
.build();
ImmutableMap<String, String> builtinAttributes =
ImmutableMap.<String, String>builder()
.put("project_id", projectId)
.put("instance", instanceId)
.put("app_profile", appProfileId)
.put("client_name", "bigtable-java/" + Version.VERSION)
.build();
ImmutableMap<String, String> builtinAttributes = createBuiltinAttributes(settings.toBuilder());
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved

return new CompositeTracerFactory(
ImmutableList.of(
Expand All @@ -283,6 +315,16 @@ public static ApiTracerFactory createBigtableTracerFactory(
settings.getTracerFactory()));
}

private static ImmutableMap<String, String> createBuiltinAttributes(
EnhancedBigtableStubSettings.Builder settings) {
return ImmutableMap.<String, String>builder()
.put("project", settings.getProjectId())
.put("instance", settings.getInstanceId())
.put("app_profile", settings.getAppProfileId())
.put("client_name", "bigtable-java/" + Version.VERSION)
.build();
}

private static void patchCredentials(EnhancedBigtableStubSettings.Builder settings)
throws IOException {
int i = settings.getEndpoint().lastIndexOf(":");
Expand Down