Skip to content

Commit

Permalink
[bes] Attach RequestMetadata to RPC calls
Browse files Browse the repository at this point in the history
This allows servers to trace the requests similarly to other RPCs.

Closes #16359.

PiperOrigin-RevId: 479292105
Change-Id: Ic6598175171577c6fce23a3bfd637b1b12b6a916
  • Loading branch information
Yannic authored and Copybara-Service committed Oct 6, 2022
1 parent 894f471 commit dbcf260
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,9 @@ protected BuildEventServiceClient getBesClient(
new BuildEventServiceGrpcClient(
newGrpcChannel(config),
credentials != null ? MoreCallCredentials.from(credentials) : null,
makeGrpcInterceptor(config));
makeGrpcInterceptor(config),
env.getBuildRequestId(),
env.getCommandId());
}
return client;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ java_library(
"//third_party:netty_tcnative",
],
deps = [
"//src/main/java/com/google/devtools/build/lib/remote/util",
"//third_party:guava",
"//third_party:jsr305",
"//third_party/grpc-java:grpc-jar",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
import static java.util.concurrent.TimeUnit.MILLISECONDS;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
import com.google.devtools.build.v1.PublishBuildEventGrpc;
import com.google.devtools.build.v1.PublishBuildEventGrpc.PublishBuildEventBlockingStub;
import com.google.devtools.build.v1.PublishBuildEventGrpc.PublishBuildEventStub;
Expand All @@ -36,6 +38,7 @@
import io.grpc.stub.AbstractStub;
import io.grpc.stub.StreamObserver;
import java.time.Duration;
import java.util.UUID;
import javax.annotation.Nullable;

/** Implementation of BuildEventServiceClient that uploads data using gRPC. */
Expand All @@ -48,15 +51,22 @@ public class BuildEventServiceGrpcClient implements BuildEventServiceClient {
private final PublishBuildEventStub besAsync;
private final PublishBuildEventBlockingStub besBlocking;

private final String buildRequestId;
private final UUID commandId;

public BuildEventServiceGrpcClient(
ManagedChannel channel,
@Nullable CallCredentials callCredentials,
ClientInterceptor interceptor) {
ClientInterceptor interceptor,
String buildRequestId,
UUID commandId) {
this.besAsync =
configureStub(PublishBuildEventGrpc.newStub(channel), callCredentials, interceptor);
this.besBlocking =
configureStub(PublishBuildEventGrpc.newBlockingStub(channel), callCredentials, interceptor);
this.channel = channel;
this.buildRequestId = Preconditions.checkNotNull(buildRequestId);
this.commandId = Preconditions.checkNotNull(commandId);
}

@VisibleForTesting
Expand All @@ -67,6 +77,8 @@ protected BuildEventServiceGrpcClient(
this.besAsync = besAsync;
this.besBlocking = besBlocking;
this.channel = channel;
this.buildRequestId = "testing/" + UUID.randomUUID();
this.commandId = UUID.randomUUID();
}

private static <T extends AbstractStub<T>> T configureStub(
Expand All @@ -83,6 +95,13 @@ public void publish(PublishLifecycleEventRequest lifecycleEvent)
try {
besBlocking
.withDeadlineAfter(RPC_TIMEOUT.toMillis(), MILLISECONDS)
.withInterceptors(
TracingMetadataUtils.attachMetadataInterceptor(
TracingMetadataUtils.buildMetadata(
buildRequestId,
commandId.toString(),
"publish_lifecycle_event",
/* actionMetadata= */ null)))
.publishLifecycleEvent(lifecycleEvent);
} catch (StatusRuntimeException e) {
Throwables.throwIfInstanceOf(Throwables.getRootCause(e), InterruptedException.class);
Expand All @@ -94,36 +113,49 @@ private static class BESGrpcStreamContext implements StreamContext {
private final StreamObserver<PublishBuildToolEventStreamRequest> stream;
private final SettableFuture<Status> streamStatus;

public BESGrpcStreamContext(PublishBuildEventStub besAsync, AckCallback ackCallback) {
public BESGrpcStreamContext(
PublishBuildEventStub besAsync,
AckCallback ackCallback,
String buildRequestId,
UUID commandId) {
this.streamStatus = SettableFuture.create();
this.stream =
besAsync.publishBuildToolEventStream(
new StreamObserver<PublishBuildToolEventStreamResponse>() {
@Override
public void onNext(PublishBuildToolEventStreamResponse response) {
ackCallback.apply(response);
}

@Override
public void onError(Throwable t) {
Status error = Status.fromThrowable(t);
if (error.getCode() == Status.CANCELLED.getCode()
&& error.getCause() != null
&& Status.fromThrowable(error.getCause()).getCode()
!= Status.UNKNOWN.getCode()) {
// gRPC likes to wrap Status(Runtime)Exceptions in StatusRuntimeExceptions. If
// the status is cancelled and has a Status(Runtime)Exception as a cause it
// means the error was generated client side.
error = Status.fromThrowable(error.getCause());
}
streamStatus.set(error);
}

@Override
public void onCompleted() {
streamStatus.set(Status.OK);
}
});
besAsync
.withInterceptors(
TracingMetadataUtils.attachMetadataInterceptor(
TracingMetadataUtils.buildMetadata(
buildRequestId,
commandId.toString(),
"publish_build_tool_event_stream",
/* actionMetadata= */ null)))
.publishBuildToolEventStream(
new StreamObserver<PublishBuildToolEventStreamResponse>() {
@Override
public void onNext(PublishBuildToolEventStreamResponse response) {
ackCallback.apply(response);
}

@Override
public void onError(Throwable t) {
Status error = Status.fromThrowable(t);
if (error.getCode() == Status.CANCELLED.getCode()
&& error.getCause() != null
&& Status.fromThrowable(error.getCause()).getCode()
!= Status.UNKNOWN.getCode()) {
// gRPC likes to wrap Status(Runtime)Exceptions in StatusRuntimeExceptions.
// If
// the status is cancelled and has a Status(Runtime)Exception as a cause it
// means the error was generated client side.
error = Status.fromThrowable(error.getCause());
}
streamStatus.set(error);
}

@Override
public void onCompleted() {
streamStatus.set(Status.OK);
}
});
}

@Override
Expand Down Expand Up @@ -157,7 +189,7 @@ public ListenableFuture<Status> getStatus() {
@Override
public StreamContext openStream(AckCallback ackCallback) throws InterruptedException {
try {
return new BESGrpcStreamContext(besAsync, ackCallback);
return new BESGrpcStreamContext(besAsync, ackCallback, buildRequestId, commandId);
} catch (StatusRuntimeException e) {
Throwables.throwIfInstanceOf(Throwables.getRootCause(e), InterruptedException.class);
ListenableFuture<Status> status = Futures.immediateFuture(Status.fromThrowable(e));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ java_test(
"//src/main/java/com/google/devtools/build/lib/buildeventstream/transports",
"//src/main/java/com/google/devtools/build/lib/network:connectivity_status",
"//src/main/java/com/google/devtools/build/lib/network:noop_connectivity",
"//src/main/java/com/google/devtools/build/lib/remote/util",
"//src/main/java/com/google/devtools/build/lib/util:abrupt_exit_exception",
"//src/main/java/com/google/devtools/build/lib/util:exit_code",
"//src/test/java/com/google/devtools/build/lib/analysis/util",
Expand All @@ -71,5 +72,6 @@ java_test(
"@googleapis//:google_devtools_build_v1_build_events_java_proto",
"@googleapis//:google_devtools_build_v1_publish_build_event_java_grpc",
"@googleapis//:google_devtools_build_v1_publish_build_event_java_proto",
"@remoteapis//:build_bazel_remote_execution_v2_remote_execution_java_proto",
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.junit.Assert.assertThrows;
import static org.junit.Assume.assumeFalse;

import build.bazel.remote.execution.v2.RequestMetadata;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -56,6 +57,7 @@
import com.google.devtools.build.lib.network.ConnectivityStatus;
import com.google.devtools.build.lib.network.ConnectivityStatusProvider;
import com.google.devtools.build.lib.network.NoOpConnectivityModule;
import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
import com.google.devtools.build.lib.runtime.BlazeModule;
import com.google.devtools.build.lib.runtime.BlazeRuntime;
import com.google.devtools.build.lib.runtime.CommandEnvironment;
Expand All @@ -74,6 +76,7 @@
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.Server;
import io.grpc.ServerInterceptors;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.inprocess.InProcessChannelBuilder;
Expand Down Expand Up @@ -180,7 +183,9 @@ protected UncaughtExceptionHandler createUncaughtExceptionHandler() {

@Before
public void setUp() throws Exception {
serviceRegistry.addService(buildEventService);
serviceRegistry.addService(
ServerInterceptors.intercept(
buildEventService, new TracingMetadataUtils.ServerHeadersInterceptor()));
fakeServer =
InProcessServerBuilder.forName(fakeServerName)
.fallbackHandlerRegistry(serviceRegistry)
Expand Down Expand Up @@ -960,6 +965,11 @@ private static final class DelayingPublishBuildEventService extends PublishBuild
@Override
public void publishLifecycleEvent(
PublishLifecycleEventRequest request, StreamObserver<Empty> responseObserver) {
RequestMetadata metadata = TracingMetadataUtils.fromCurrentContext();
assertThat(metadata.getToolInvocationId()).isNotEmpty();
assertThat(metadata.getCorrelatedInvocationsId()).isNotEmpty();
assertThat(metadata.getActionId()).isEqualTo("publish_lifecycle_event");

responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
}
Expand All @@ -968,6 +978,11 @@ public void publishLifecycleEvent(
public synchronized StreamObserver<PublishBuildToolEventStreamRequest>
publishBuildToolEventStream(
StreamObserver<PublishBuildToolEventStreamResponse> responseObserver) {
RequestMetadata metadata = TracingMetadataUtils.fromCurrentContext();
assertThat(metadata.getToolInvocationId()).isNotEmpty();
assertThat(metadata.getCorrelatedInvocationsId()).isNotEmpty();
assertThat(metadata.getActionId()).isEqualTo("publish_build_tool_event_stream");

if (errorMessage != null) {
return new ErroringPublishBuildStreamObserver(responseObserver, errorMessage);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,12 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
extraHeaders.put(Metadata.Key.of("metadata-foo", Metadata.ASCII_STRING_MARSHALLER), "bar");
ClientInterceptor interceptor = MetadataUtils.newAttachHeadersInterceptor(extraHeaders);
BuildEventServiceGrpcClient grpcClient =
new BuildEventServiceGrpcClient(server.getChannel(), null, interceptor);
new BuildEventServiceGrpcClient(
server.getChannel(),
null,
interceptor,
"testing/" + UUID.randomUUID(),
UUID.randomUUID());
assertThat(grpcClient.openStream(ack -> {}).getStatus().get()).isEqualTo(Status.OK);
assertThat(seenHeaders).hasSize(1);
Metadata headers = seenHeaders.get(0);
Expand All @@ -133,7 +138,12 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
public void immediateSuccess() throws Exception {
try (TestServer server = startTestServer(NOOP_SERVER.bindService())) {
assertThat(
new BuildEventServiceGrpcClient(server.getChannel(), null, null)
new BuildEventServiceGrpcClient(
server.getChannel(),
null,
null,
"testing/" + UUID.randomUUID(),
UUID.randomUUID())
.openStream(ack -> {})
.getStatus()
.get())
Expand All @@ -154,7 +164,12 @@ public StreamObserver<PublishBuildToolEventStreamRequest> publishBuildToolEventS
}
}.bindService())) {
assertThat(
new BuildEventServiceGrpcClient(server.getChannel(), null, null)
new BuildEventServiceGrpcClient(
server.getChannel(),
null,
null,
"testing/" + UUID.randomUUID(),
UUID.randomUUID())
.openStream(ack -> {})
.getStatus()
.get())
Expand Down

0 comments on commit dbcf260

Please sign in to comment.