Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

api, core: add ClientCallTracer API for accessing Census span/trace ID on ClientCall #6090

31 changes: 31 additions & 0 deletions api/src/main/java/io/grpc/CallOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ public final class CallOptions {
// Unmodifiable list
private List<ClientStreamTracer.Factory> streamTracerFactories = Collections.emptyList();

// Unmodifiable list
private List<ClientCallTracer> clientCallTracers = Collections.emptyList();

/**
* Opposite to fail fast.
*/
Expand Down Expand Up @@ -236,6 +239,32 @@ public List<ClientStreamTracer.Factory> getStreamTracerFactories() {
return streamTracerFactories;
}

/**
* Returns a new {@code CallOptions} with a {@link ClientCallTracer} in addition to the existing
* client call tracers.
*
* <p>This method doesn't replace or try to de-duplicate existing client call tracers.
*
* @since 1.24.0
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/6080")
public CallOptions withClientCallTracer(ClientCallTracer tracer) {
CallOptions newOptions = new CallOptions(this);
List<ClientCallTracer> newList = new ArrayList<>(clientCallTracers.size() + 1);
newList.addAll(clientCallTracers);
newList.add(tracer);
newOptions.clientCallTracers = Collections.unmodifiableList(newList);
return newOptions;
}

/**
* Returns an immutable list of {@link ClientCallTracer}s.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/6080")
public List<ClientCallTracer> getClientCallTracers() {
return clientCallTracers;
}

/**
* Key for a key-value pair. Uses reference equality.
*/
Expand Down Expand Up @@ -432,6 +461,7 @@ private CallOptions(CallOptions other) {
maxInboundMessageSize = other.maxInboundMessageSize;
maxOutboundMessageSize = other.maxOutboundMessageSize;
streamTracerFactories = other.streamTracerFactories;
clientCallTracers = other.clientCallTracers;
}

@Override
Expand All @@ -447,6 +477,7 @@ public String toString() {
.add("maxInboundMessageSize", maxInboundMessageSize)
.add("maxOutboundMessageSize", maxOutboundMessageSize)
.add("streamTracerFactories", streamTracerFactories)
.add("clientCallTracers", clientCallTracers)
.toString();
}
}
12 changes: 12 additions & 0 deletions api/src/main/java/io/grpc/ClientCall.java
Original file line number Diff line number Diff line change
Expand Up @@ -279,4 +279,16 @@ public void setMessageCompression(boolean enabled) {
public Attributes getAttributes() {
return Attributes.EMPTY;
}

/**
* Returns tracer specific properties (if any) attached by tracing components via
* {@link ClientCallTracer}. Tracer properties are available as soon as the call is created.
*
* @return non-{@code null} attributes
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/6080")
@Grpc.TracerAttr
public Attributes getTracerAttributes() {
return Attributes.EMPTY;
}
}
36 changes: 36 additions & 0 deletions api/src/main/java/io/grpc/ClientCallTracer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2019 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc;

import javax.annotation.concurrent.ThreadSafe;

/**
* Client side RPC tracer, with exposure of tracer properties.
*
* @since 1.24.0
*/
@ThreadSafe
public abstract class ClientCallTracer extends ClientStreamTracer.Factory {

/**
* Puts tracer specific information as attributes to the provided builder. Tracer attributes are
* implementation specific.
*
* @param builder receiver of tracer information.
*/
public void getTracerAttributes(@Grpc.TracerAttr Attributes.Builder builder) {}
}
9 changes: 9 additions & 0 deletions api/src/main/java/io/grpc/Grpc.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,13 @@ private Grpc() {
@Retention(RetentionPolicy.SOURCE)
@Documented
public @interface TransportAttr {}

/**
* Annotation for tracer attributes. It follows the annotation semantics defined by
* {@link Attributes}.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/4972")
@Retention(RetentionPolicy.SOURCE)
@Documented
public @interface TracerAttr {}
}
5 changes: 5 additions & 0 deletions api/src/main/java/io/grpc/PartialForwardingClientCall.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ public Attributes getAttributes() {
return delegate().getAttributes();
}

@Override
public Attributes getTracerAttributes() {
return delegate().getTracerAttributes();
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this).add("delegate", delegate()).toString();
Expand Down
30 changes: 30 additions & 0 deletions api/src/test/java/io/grpc/CallOptionsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,36 @@ public void withStreamTracerFactory() {
}
}

@Test
public void withClientCallTracer() {
ClientCallTracer tracer1 = mock(ClientCallTracer.class);
ClientCallTracer tracer2 = mock(ClientCallTracer.class);

CallOptions opts1 = CallOptions.DEFAULT.withClientCallTracer(tracer1);
CallOptions opts2 = opts1.withClientCallTracer(tracer2);
CallOptions opts3 = opts2.withClientCallTracer(tracer2);

assertThat(opts1.getClientCallTracers()).containsExactly(tracer1);
assertThat(opts2.getClientCallTracers()).containsExactly(tracer1, tracer2)
.inOrder();
assertThat(opts3.getClientCallTracers())
.containsExactly(tracer1, tracer2, tracer2).inOrder();

try {
CallOptions.DEFAULT.getClientCallTracers().add(tracer1);
fail("Should have thrown. The list should be unmodifiable.");
} catch (UnsupportedOperationException e) {
// Expected
}

try {
opts2.getClientCallTracers().clear();
fail("Should have thrown. The list should be unmodifiable.");
} catch (UnsupportedOperationException e) {
// Expected
}
}

@Test
public void getWaitForReady() {
assertNull(CallOptions.DEFAULT.getWaitForReady());
Expand Down
27 changes: 15 additions & 12 deletions core/src/main/java/io/grpc/internal/CensusStatsModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,12 @@ public TagContext parseBytes(byte[] serialized) {
}

/**
* Creates a {@link ClientCallTracer} for a new call.
* Creates a {@link ClientCallFullLifecycleTracer} for a new call.
*/
@VisibleForTesting
ClientCallTracer newClientCallTracer(
ClientCallFullLifecycleTracer newClientCallTracer(
TagContext parentCtx, String fullMethodName) {
return new ClientCallTracer(this, parentCtx, fullMethodName);
return new ClientCallFullLifecycleTracer(this, parentCtx, fullMethodName);
}

/**
Expand Down Expand Up @@ -314,27 +314,29 @@ public void outboundMessage(int seqNo) {
}

@VisibleForTesting
static final class ClientCallTracer extends ClientStreamTracer.Factory {
static final class ClientCallFullLifecycleTracer extends ClientStreamTracer.Factory {
@Nullable
private static final AtomicReferenceFieldUpdater<ClientCallTracer, ClientTracer>
private static final AtomicReferenceFieldUpdater<ClientCallFullLifecycleTracer, ClientTracer>
streamTracerUpdater;

@Nullable private static final AtomicIntegerFieldUpdater<ClientCallTracer> callEndedUpdater;
@Nullable
private static final AtomicIntegerFieldUpdater<ClientCallFullLifecycleTracer> callEndedUpdater;

/**
* When using Atomic*FieldUpdater, some Samsung Android 5.0.x devices encounter a bug in their
* JDK reflection API that triggers a NoSuchFieldException. When this occurs, we fallback to
* (potentially racy) direct updates of the volatile variables.
*/
static {
AtomicReferenceFieldUpdater<ClientCallTracer, ClientTracer> tmpStreamTracerUpdater;
AtomicIntegerFieldUpdater<ClientCallTracer> tmpCallEndedUpdater;
AtomicReferenceFieldUpdater<ClientCallFullLifecycleTracer,
ClientTracer> tmpStreamTracerUpdater;
AtomicIntegerFieldUpdater<ClientCallFullLifecycleTracer> tmpCallEndedUpdater;
try {
tmpStreamTracerUpdater =
AtomicReferenceFieldUpdater.newUpdater(
ClientCallTracer.class, ClientTracer.class, "streamTracer");
ClientCallFullLifecycleTracer.class, ClientTracer.class, "streamTracer");
tmpCallEndedUpdater =
AtomicIntegerFieldUpdater.newUpdater(ClientCallTracer.class, "callEnded");
AtomicIntegerFieldUpdater.newUpdater(ClientCallFullLifecycleTracer.class, "callEnded");
} catch (Throwable t) {
logger.log(Level.SEVERE, "Creating atomic field updaters failed", t);
tmpStreamTracerUpdater = null;
Expand All @@ -351,7 +353,8 @@ static final class ClientCallTracer extends ClientStreamTracer.Factory {
private final TagContext parentCtx;
private final TagContext startCtx;

ClientCallTracer(CensusStatsModule module, TagContext parentCtx, String fullMethodName) {
ClientCallFullLifecycleTracer(CensusStatsModule module, TagContext parentCtx,
String fullMethodName) {
this.module = checkNotNull(module);
this.parentCtx = checkNotNull(parentCtx);
TagValue methodTag = TagValue.create(fullMethodName);
Expand Down Expand Up @@ -685,7 +688,7 @@ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
// New RPCs on client-side inherit the tag context from the current Context.
TagContext parentCtx = tagger.getCurrentTagContext();
final ClientCallTracer tracerFactory =
final ClientCallFullLifecycleTracer tracerFactory =
newClientCallTracer(parentCtx, method.getFullMethodName());
ClientCall<ReqT, RespT> call =
next.newCall(method, callOptions.withStreamTracerFactory(tracerFactory));
Expand Down
45 changes: 34 additions & 11 deletions core/src/main/java/io/grpc/internal/CensusTracingModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,17 @@
import static com.google.common.base.Preconditions.checkNotNull;

import com.google.common.annotations.VisibleForTesting;
import io.grpc.Attributes;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientCallTracer;
import io.grpc.ClientInterceptor;
import io.grpc.ClientStreamTracer;
import io.grpc.Context;
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
import io.grpc.Grpc;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.ServerStreamTracer;
Expand All @@ -37,7 +40,9 @@
import io.opencensus.trace.MessageEvent.Type;
import io.opencensus.trace.Span;
import io.opencensus.trace.SpanContext;
import io.opencensus.trace.SpanId;
import io.opencensus.trace.Status;
import io.opencensus.trace.TraceId;
import io.opencensus.trace.Tracer;
import io.opencensus.trace.propagation.BinaryFormat;
import io.opencensus.trace.unsafe.ContextUtils;
Expand All @@ -60,7 +65,18 @@
final class CensusTracingModule {
private static final Logger logger = Logger.getLogger(CensusTracingModule.class.getName());

@Nullable private static final AtomicIntegerFieldUpdater<ClientCallTracer> callEndedUpdater;
@VisibleForTesting
@Grpc.TracerAttr
static final Attributes.Key<SpanId> SPAN_ID_ATTRIBUTES_KEY = Attributes.Key
.create("census-span-id");

@VisibleForTesting
@Grpc.TracerAttr
static final Attributes.Key<TraceId> TRACE_ID_ATTRIBUTES_KEY = Attributes.Key
.create("census-trace-id");

@Nullable
private static final AtomicIntegerFieldUpdater<ClientCallFullLifecycleTracer> callEndedUpdater;

@Nullable private static final AtomicIntegerFieldUpdater<ServerTracer> streamClosedUpdater;

Expand All @@ -70,11 +86,11 @@ final class CensusTracingModule {
* (potentially racy) direct updates of the volatile variables.
*/
static {
AtomicIntegerFieldUpdater<ClientCallTracer> tmpCallEndedUpdater;
AtomicIntegerFieldUpdater<ClientCallFullLifecycleTracer> tmpCallEndedUpdater;
AtomicIntegerFieldUpdater<ServerTracer> tmpStreamClosedUpdater;
try {
tmpCallEndedUpdater =
AtomicIntegerFieldUpdater.newUpdater(ClientCallTracer.class, "callEnded");
AtomicIntegerFieldUpdater.newUpdater(ClientCallFullLifecycleTracer.class, "callEnded");
tmpStreamClosedUpdater =
AtomicIntegerFieldUpdater.newUpdater(ServerTracer.class, "streamClosed");
} catch (Throwable t) {
Expand Down Expand Up @@ -116,11 +132,12 @@ public SpanContext parseBytes(byte[] serialized) {
}

/**
* Creates a {@link ClientCallTracer} for a new call.
* Creates a {@link ClientCallFullLifecycleTracer} for a new call.
*/
@VisibleForTesting
ClientCallTracer newClientCallTracer(@Nullable Span parentSpan, MethodDescriptor<?, ?> method) {
return new ClientCallTracer(parentSpan, method);
ClientCallFullLifecycleTracer newClientCallTracer(@Nullable Span parentSpan,
MethodDescriptor<?, ?> method) {
return new ClientCallFullLifecycleTracer(parentSpan, method);
}

/**
Expand Down Expand Up @@ -223,13 +240,13 @@ private static void recordMessageEvent(
}

@VisibleForTesting
final class ClientCallTracer extends ClientStreamTracer.Factory {
final class ClientCallFullLifecycleTracer extends ClientCallTracer {
volatile int callEnded;

private final boolean isSampledToLocalTracing;
private final Span span;

ClientCallTracer(@Nullable Span parentSpan, MethodDescriptor<?, ?> method) {
ClientCallFullLifecycleTracer(@Nullable Span parentSpan, MethodDescriptor<?, ?> method) {
checkNotNull(method, "method");
this.isSampledToLocalTracing = method.isSampledToLocalTracing();
this.span =
Expand All @@ -251,6 +268,12 @@ public ClientStreamTracer newClientStreamTracer(
return new ClientTracer(span);
}

@Override
public void getTracerAttributes(Attributes.Builder builder) {
builder.set(SPAN_ID_ATTRIBUTES_KEY, span.getContext().getSpanId());
builder.set(TRACE_ID_ATTRIBUTES_KEY, span.getContext().getTraceId());
}

/**
* Record a finished call and mark the current time as the end time.
*
Expand Down Expand Up @@ -382,20 +405,20 @@ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
// Safe usage of the unsafe trace API because CONTEXT_SPAN_KEY.get() returns the same value
// as Tracer.getCurrentSpan() except when no value available when the return value is null
// for the direct access and BlankSpan when Tracer API is used.
final ClientCallTracer tracerFactory =
final ClientCallFullLifecycleTracer clientcallTracer =
newClientCallTracer(ContextUtils.getValue(Context.current()), method);
ClientCall<ReqT, RespT> call =
next.newCall(
method,
callOptions.withStreamTracerFactory(tracerFactory));
callOptions.withClientCallTracer(clientcallTracer));
return new SimpleForwardingClientCall<ReqT, RespT>(call) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
delegate().start(
new SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
public void onClose(io.grpc.Status status, Metadata trailers) {
tracerFactory.callEnded(status);
clientcallTracer.callEnded(status);
super.onClose(status, trailers);
}
},
Expand Down