Skip to content

Commit

Permalink
all: API refactoring in preparation to support retry stats (#8355)
Browse files Browse the repository at this point in the history
Rebased PR #8343 into the first commit of this PR, then (the 2nd commit) reverted the part for metric recording of retry attempts. The PR as a whole is mechanical refactoring. No behavior change (except that some of the old code path when tracer is created is moved into the new method `streamCreated()`).

The API change is documented in go/grpc-stats-api-change-for-retry-java
  • Loading branch information
dapengzhang0 committed Aug 2, 2021
1 parent 1e85892 commit 7849d3b
Show file tree
Hide file tree
Showing 55 changed files with 1,210 additions and 563 deletions.
82 changes: 55 additions & 27 deletions api/src/main/java/io/grpc/ClientStreamTracer.java
Expand Up @@ -19,7 +19,6 @@
import static com.google.common.base.Preconditions.checkNotNull;

import com.google.common.base.MoreObjects;
import io.grpc.Grpc;
import javax.annotation.concurrent.ThreadSafe;

/**
Expand All @@ -28,6 +27,18 @@
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/2861")
@ThreadSafe
public abstract class ClientStreamTracer extends StreamTracer {

/**
* The stream is being created on a ready transport.
*
* @param headers the mutable initial metadata. Modifications to it will be sent to the socket but
* not be seen by client interceptors and the application.
*
* @since 1.40.0
*/
public void streamCreated(@Grpc.TransportAttr Attributes transportAttrs, Metadata headers) {
}

/**
* Headers has been sent to the socket.
*/
Expand All @@ -54,22 +65,6 @@ public void inboundTrailers(Metadata trailers) {
* Factory class for {@link ClientStreamTracer}.
*/
public abstract static class Factory {
/**
* Creates a {@link ClientStreamTracer} for a new client stream.
*
* @param callOptions the effective CallOptions of the call
* @param headers the mutable headers of the stream. It can be safely mutated within this
* method. It should not be saved because it is not safe for read or write after the
* method returns.
*
* @deprecated use {@link
* #newClientStreamTracer(io.grpc.ClientStreamTracer.StreamInfo, io.grpc.Metadata)} instead.
*/
@Deprecated
public ClientStreamTracer newClientStreamTracer(CallOptions callOptions, Metadata headers) {
throw new UnsupportedOperationException("Not implemented");
}

/**
* Creates a {@link ClientStreamTracer} for a new client stream. This is called inside the
* transport when it's creating the stream.
Expand All @@ -81,12 +76,15 @@ public ClientStreamTracer newClientStreamTracer(CallOptions callOptions, Metadat
*
* @since 1.20.0
*/
@SuppressWarnings("deprecation")
public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) {
return newClientStreamTracer(info.getCallOptions(), headers);
throw new UnsupportedOperationException("Not implemented");
}
}

/** An abstract class for internal use only. */
@Internal
public abstract static class InternalLimitedInfoFactory extends Factory {}

/**
* Information about a stream.
*
Expand All @@ -99,15 +97,21 @@ public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata header
public static final class StreamInfo {
private final Attributes transportAttrs;
private final CallOptions callOptions;
private final boolean isTransparentRetry;

StreamInfo(Attributes transportAttrs, CallOptions callOptions) {
StreamInfo(Attributes transportAttrs, CallOptions callOptions, boolean isTransparentRetry) {
this.transportAttrs = checkNotNull(transportAttrs, "transportAttrs");
this.callOptions = checkNotNull(callOptions, "callOptions");
this.isTransparentRetry = isTransparentRetry;
}

/**
* Returns the attributes of the transport that this stream was created on.
*
* @deprecated Use {@link ClientStreamTracer#streamCreated(Attributes, Metadata)} to handle
* the transport Attributes instead.
*/
@Deprecated
@Grpc.TransportAttr
public Attributes getTransportAttrs() {
return transportAttrs;
Expand All @@ -120,16 +124,25 @@ public CallOptions getCallOptions() {
return callOptions;
}

/**
* Whether the stream is a transparent retry.
*
* @since 1.40.0
*/
public boolean isTransparentRetry() {
return isTransparentRetry;
}

/**
* Converts this StreamInfo into a new Builder.
*
* @since 1.21.0
*/
public Builder toBuilder() {
Builder builder = new Builder();
builder.setTransportAttrs(transportAttrs);
builder.setCallOptions(callOptions);
return builder;
return new Builder()
.setCallOptions(callOptions)
.setTransportAttrs(transportAttrs)
.setIsTransparentRetry(isTransparentRetry);
}

/**
Expand All @@ -146,6 +159,7 @@ public String toString() {
return MoreObjects.toStringHelper(this)
.add("transportAttrs", transportAttrs)
.add("callOptions", callOptions)
.add("isTransparentRetry", isTransparentRetry)
.toString();
}

Expand All @@ -157,16 +171,20 @@ public String toString() {
public static final class Builder {
private Attributes transportAttrs = Attributes.EMPTY;
private CallOptions callOptions = CallOptions.DEFAULT;
private boolean isTransparentRetry;

Builder() {
}

/**
* Sets the attributes of the transport that this stream was created on. This field is
* optional.
*
* @deprecated Use {@link ClientStreamTracer#streamCreated(Attributes, Metadata)} to handle
* the transport Attributes instead.
*/
@Grpc.TransportAttr
public Builder setTransportAttrs(Attributes transportAttrs) {
@Deprecated
public Builder setTransportAttrs(@Grpc.TransportAttr Attributes transportAttrs) {
this.transportAttrs = checkNotNull(transportAttrs, "transportAttrs cannot be null");
return this;
}
Expand All @@ -179,11 +197,21 @@ public Builder setCallOptions(CallOptions callOptions) {
return this;
}

/**
* Sets whether the stream is a transparent retry.
*
* @since 1.40.0
*/
public Builder setIsTransparentRetry(boolean isTransparentRetry) {
this.isTransparentRetry = isTransparentRetry;
return this;
}

/**
* Builds a new StreamInfo.
*/
public StreamInfo build() {
return new StreamInfo(transportAttrs, callOptions);
return new StreamInfo(transportAttrs, callOptions, isTransparentRetry);
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions api/src/test/java/io/grpc/CallOptionsTest.java
Expand Up @@ -30,6 +30,7 @@
import static org.mockito.Mockito.mock;

import com.google.common.base.Objects;
import io.grpc.ClientStreamTracer.StreamInfo;
import io.grpc.internal.SerializingExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -271,16 +272,15 @@ public void increment(long period, TimeUnit unit) {
}
}

private static class FakeTracerFactory extends ClientStreamTracer.Factory {
private static class FakeTracerFactory extends ClientStreamTracer.InternalLimitedInfoFactory {
final String name;

FakeTracerFactory(String name) {
this.name = name;
}

@Override
public ClientStreamTracer newClientStreamTracer(
ClientStreamTracer.StreamInfo info, Metadata headers) {
public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) {
return new ClientStreamTracer() {};
}

Expand Down
21 changes: 11 additions & 10 deletions binder/src/main/java/io/grpc/binder/internal/BinderTransport.java
Expand Up @@ -32,6 +32,7 @@
import com.google.common.util.concurrent.ListenableFuture;
import io.grpc.Attributes;
import io.grpc.CallOptions;
import io.grpc.ClientStreamTracer;
import io.grpc.Grpc;
import io.grpc.Internal;
import io.grpc.InternalChannelz.SocketStats;
Expand Down Expand Up @@ -632,28 +633,28 @@ public synchronized Runnable start(ManagedClientTransport.Listener clientTranspo
public synchronized ClientStream newStream(
final MethodDescriptor<?, ?> method,
final Metadata headers,
final CallOptions callOptions) {
final CallOptions callOptions,
ClientStreamTracer[] tracers) {
if (isShutdown()) {
return newFailingClientStream(shutdownStatus, callOptions, attributes, headers);
return newFailingClientStream(shutdownStatus, attributes, headers, tracers);
} else {
int callId = latestCallId++;
if (latestCallId == LAST_CALL_ID) {
latestCallId = FIRST_CALL_ID;
}
StatsTraceContext statsTraceContext =
StatsTraceContext.newClientContext(tracers, attributes, headers);
Inbound.ClientInbound inbound =
new Inbound.ClientInbound(
this, attributes, callId, GrpcUtil.shouldBeCountedForInUse(callOptions));
if (ongoingCalls.putIfAbsent(callId, inbound) != null) {
Status failure = Status.INTERNAL.withDescription("Clashing call IDs");
shutdownInternal(failure, true);
return newFailingClientStream(failure, callOptions, attributes, headers);
return newFailingClientStream(failure, attributes, headers, tracers);
} else {
if (inbound.countsForInUse() && numInUseStreams.getAndIncrement() == 0) {
clientTransportListener.transportInUse(true);
}
StatsTraceContext statsTraceContext =
StatsTraceContext.newClientContext(callOptions, attributes, headers);

Outbound.ClientOutbound outbound =
new Outbound.ClientOutbound(this, callId, method, headers, statsTraceContext);
if (method.getType().clientSendsOneMessage()) {
Expand Down Expand Up @@ -763,12 +764,12 @@ protected void handlePingResponse(Parcel parcel) {
}

private static ClientStream newFailingClientStream(
Status failure, CallOptions callOptions, Attributes attributes, Metadata headers) {
Status failure, Attributes attributes, Metadata headers,
ClientStreamTracer[] tracers) {
StatsTraceContext statsTraceContext =
StatsTraceContext.newClientContext(callOptions, attributes, headers);
StatsTraceContext.newClientContext(tracers, attributes, headers);
statsTraceContext.clientOutboundHeaders();
statsTraceContext.streamClosed(failure);
return new FailingClientStream(failure);
return new FailingClientStream(failure, tracers);
}

private static InternalLogId buildLogId(
Expand Down

0 comments on commit 7849d3b

Please sign in to comment.