Skip to content

Commit

Permalink
Plumb optional labels from LB to ClientStreamTracer
Browse files Browse the repository at this point in the history
As part of gRFC A78:

> To support the locality label in the per-call metrics, we will provide
> a mechanism for LB picker to add optional labels to the call attempt
> tracer.
  • Loading branch information
ejona86 committed Apr 29, 2024
1 parent 06df25b commit 4c78a97
Show file tree
Hide file tree
Showing 18 changed files with 305 additions and 94 deletions.
7 changes: 7 additions & 0 deletions api/src/main/java/io/grpc/ClientStreamTracer.java
Expand Up @@ -79,6 +79,13 @@ public void inboundHeaders() {
public void inboundTrailers(Metadata trailers) {
}

/**
* Information providing context to the call became available.
*/
@Internal
public void addOptionalLabel(String key, String value) {
}

/**
* Factory class for {@link ClientStreamTracer}.
*/
Expand Down
23 changes: 23 additions & 0 deletions api/src/main/java/io/grpc/LoadBalancer.java
Expand Up @@ -490,6 +490,29 @@ public abstract static class PickSubchannelArgs {
* @since 1.2.0
*/
public abstract MethodDescriptor<?, ?> getMethodDescriptor();

/**
* Gets an object that can be informed about what sort of pick was made.
*/
@Internal
public PickDetailsConsumer getPickDetailsConsumer() {
return new PickDetailsConsumer() {};
}
}

/** Receives information about the pick being chosen. */
@Internal
public interface PickDetailsConsumer {
/**
* Optional labels that provide context of how the pick was routed. Particularly helpful for
* per-RPC metrics.
*
* @throws NullPointerException if key or value is {@code null}
*/
default void addOptionalLabel(String key, String value) {
checkNotNull(key, "key");
checkNotNull(value, "value");
}
}

/**
Expand Down
59 changes: 59 additions & 0 deletions api/src/testFixtures/java/io/grpc/PickSubchannelArgsMatcher.java
@@ -0,0 +1,59 @@
/*
* Copyright 2024 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc;

import com.google.common.base.Preconditions;
import io.grpc.CallOptions;
import io.grpc.LoadBalancer.PickSubchannelArgs;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import org.mockito.ArgumentMatcher;
import org.mockito.ArgumentMatchers;

/**
* Mockito Matcher for {@link PickSubchannelArgs}.
*/
public final class PickSubchannelArgsMatcher implements ArgumentMatcher<PickSubchannelArgs> {
private final MethodDescriptor<?, ?> method;
private final Metadata headers;
private final CallOptions callOptions;

public PickSubchannelArgsMatcher(
MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions) {
this.method = Preconditions.checkNotNull(method, "method");
this.headers = Preconditions.checkNotNull(headers, "headers");
this.callOptions = Preconditions.checkNotNull(callOptions, "callOptions");
}

@Override
public boolean matches(PickSubchannelArgs args) {
return args != null
&& method.equals(args.getMethodDescriptor())
&& headers.equals(args.getHeaders())
&& callOptions.equals(args.getCallOptions());
}

@Override
public final String toString() {
return "[method=" + method + " headers=" + headers + " callOptions=" + callOptions + "]";
}

public static PickSubchannelArgs eqPickSubchannelArgs(
MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions) {
return ArgumentMatchers.argThat(new PickSubchannelArgsMatcher(method, headers, callOptions));
}
}
Expand Up @@ -137,7 +137,8 @@ public final ClientStream newStream(
MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions,
ClientStreamTracer[] tracers) {
try {
PickSubchannelArgs args = new PickSubchannelArgsImpl(method, headers, callOptions);
PickSubchannelArgs args = new PickSubchannelArgsImpl(
method, headers, callOptions, new PickDetailsConsumerImpl(tracers));
SubchannelPicker picker = null;
long pickerVersion = -1;
while (true) {
Expand Down
Expand Up @@ -54,6 +54,11 @@ public void inboundTrailers(Metadata trailers) {
delegate().inboundTrailers(trailers);
}

@Override
public void addOptionalLabel(String key, String value) {
delegate().addOptionalLabel(key, value);
}

@Override
public void streamClosed(Status status) {
delegate().streamClosed(status);
Expand Down
15 changes: 9 additions & 6 deletions core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
Expand Up @@ -158,6 +158,8 @@ public Result selectConfig(PickSubchannelArgs args) {
throw new IllegalStateException("Resolution is pending");
}
};
private static final LoadBalancer.PickDetailsConsumer NOOP_PICK_DETAILS_CONSUMER =
new LoadBalancer.PickDetailsConsumer() {};

private final InternalLogId logId;
private final String target;
Expand Down Expand Up @@ -519,11 +521,11 @@ public ClientStream newStream(
final Metadata headers,
final Context context) {
if (!retryEnabled) {
ClientTransport transport =
getTransport(new PickSubchannelArgsImpl(method, headers, callOptions));
Context origContext = context.attach();
ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
callOptions, headers, 0, /* isTransparentRetry= */ false);
ClientTransport transport = getTransport(new PickSubchannelArgsImpl(
method, headers, callOptions, new PickDetailsConsumerImpl(tracers)));
Context origContext = context.attach();
try {
return transport.newStream(method, headers, callOptions, tracers);
} finally {
Expand Down Expand Up @@ -566,8 +568,8 @@ ClientStream newSubstream(
CallOptions newOptions = callOptions.withStreamTracerFactory(factory);
ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
newOptions, newHeaders, previousAttempts, isTransparentRetry);
ClientTransport transport =
getTransport(new PickSubchannelArgsImpl(method, newHeaders, newOptions));
ClientTransport transport = getTransport(new PickSubchannelArgsImpl(
method, newHeaders, newOptions, new PickDetailsConsumerImpl(tracers)));
Context origContext = context.attach();
try {
return transport.newStream(method, newHeaders, newOptions, tracers);
Expand Down Expand Up @@ -1207,7 +1209,8 @@ protected ClientCall<ReqT, RespT> delegate() {
@SuppressWarnings("unchecked")
@Override
public void start(Listener<RespT> observer, Metadata headers) {
PickSubchannelArgs args = new PickSubchannelArgsImpl(method, headers, callOptions);
PickSubchannelArgs args =
new PickSubchannelArgsImpl(method, headers, callOptions, NOOP_PICK_DETAILS_CONSUMER);
InternalConfigSelector.Result result = configSelector.selectConfig(args);
Status status = result.getStatus();
if (!status.isOk()) {
Expand Down
42 changes: 42 additions & 0 deletions core/src/main/java/io/grpc/internal/PickDetailsConsumerImpl.java
@@ -0,0 +1,42 @@
/*
* Copyright 2024 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.internal;

import com.google.common.base.Preconditions;
import io.grpc.ClientStreamTracer;
import io.grpc.LoadBalancer.PickDetailsConsumer;

/**
* Adapter for tracers into details consumers.
*/
final class PickDetailsConsumerImpl implements PickDetailsConsumer {
private final ClientStreamTracer[] tracers;

/** Construct a consumer with unchanging tracers array. */
public PickDetailsConsumerImpl(ClientStreamTracer[] tracers) {
this.tracers = Preconditions.checkNotNull(tracers, "tracers");
}

@Override
public void addOptionalLabel(String key, String value) {
Preconditions.checkNotNull(key, "key");
Preconditions.checkNotNull(value, "value");
for (ClientStreamTracer tracer : tracers) {
tracer.addOptionalLabel(key, value);
}
}
}
16 changes: 13 additions & 3 deletions core/src/main/java/io/grpc/internal/PickSubchannelArgsImpl.java
Expand Up @@ -20,6 +20,7 @@

import com.google.common.base.Objects;
import io.grpc.CallOptions;
import io.grpc.LoadBalancer.PickDetailsConsumer;
import io.grpc.LoadBalancer.PickSubchannelArgs;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
Expand All @@ -29,15 +30,18 @@ public final class PickSubchannelArgsImpl extends PickSubchannelArgs {
private final CallOptions callOptions;
private final Metadata headers;
private final MethodDescriptor<?, ?> method;
private final PickDetailsConsumer pickDetailsConsumer;

/**
* Creates call args object for given method with its call options, metadata.
*/
public PickSubchannelArgsImpl(
MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions) {
MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions,
PickDetailsConsumer pickDetailsConsumer) {
this.method = checkNotNull(method, "method");
this.headers = checkNotNull(headers, "headers");
this.callOptions = checkNotNull(callOptions, "callOptions");
this.pickDetailsConsumer = checkNotNull(pickDetailsConsumer, "pickDetailsConsumer");
}

@Override
Expand All @@ -55,6 +59,11 @@ public CallOptions getCallOptions() {
return method;
}

@Override
public PickDetailsConsumer getPickDetailsConsumer() {
return pickDetailsConsumer;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -66,12 +75,13 @@ public boolean equals(Object o) {
PickSubchannelArgsImpl that = (PickSubchannelArgsImpl) o;
return Objects.equal(callOptions, that.callOptions)
&& Objects.equal(headers, that.headers)
&& Objects.equal(method, that.method);
&& Objects.equal(method, that.method)
&& Objects.equal(pickDetailsConsumer, that.pickDetailsConsumer);
}

@Override
public int hashCode() {
return Objects.hashCode(callOptions, headers, method);
return Objects.hashCode(callOptions, headers, method, pickDetailsConsumer);
}

@Override
Expand Down

0 comments on commit 4c78a97

Please sign in to comment.