diff --git a/api/src/main/java/io/grpc/ClientStreamTracer.java b/api/src/main/java/io/grpc/ClientStreamTracer.java
index 6a5d3cc3397..bb836ac82e1 100644
--- a/api/src/main/java/io/grpc/ClientStreamTracer.java
+++ b/api/src/main/java/io/grpc/ClientStreamTracer.java
@@ -97,11 +97,15 @@ public abstract static class InternalLimitedInfoFactory extends Factory {}
public static final class StreamInfo {
private final Attributes transportAttrs;
private final CallOptions callOptions;
+ private final int previousAttempts;
private final boolean isTransparentRetry;
- StreamInfo(Attributes transportAttrs, CallOptions callOptions, boolean isTransparentRetry) {
+ StreamInfo(
+ Attributes transportAttrs, CallOptions callOptions, int previousAttempts,
+ boolean isTransparentRetry) {
this.transportAttrs = checkNotNull(transportAttrs, "transportAttrs");
this.callOptions = checkNotNull(callOptions, "callOptions");
+ this.previousAttempts = previousAttempts;
this.isTransparentRetry = isTransparentRetry;
}
@@ -124,6 +128,15 @@ public CallOptions getCallOptions() {
return callOptions;
}
+ /**
+ * Returns the number of preceding attempts for the RPC.
+ *
+ * @since 1.40.0
+ */
+ public int getPreviousAttempts() {
+ return previousAttempts;
+ }
+
/**
* Whether the stream is a transparent retry.
*
@@ -142,6 +155,7 @@ public Builder toBuilder() {
return new Builder()
.setCallOptions(callOptions)
.setTransportAttrs(transportAttrs)
+ .setPreviousAttempts(previousAttempts)
.setIsTransparentRetry(isTransparentRetry);
}
@@ -159,6 +173,7 @@ public String toString() {
return MoreObjects.toStringHelper(this)
.add("transportAttrs", transportAttrs)
.add("callOptions", callOptions)
+ .add("previousAttempts", previousAttempts)
.add("isTransparentRetry", isTransparentRetry)
.toString();
}
@@ -171,6 +186,7 @@ public String toString() {
public static final class Builder {
private Attributes transportAttrs = Attributes.EMPTY;
private CallOptions callOptions = CallOptions.DEFAULT;
+ private int previousAttempts;
private boolean isTransparentRetry;
Builder() {
@@ -197,6 +213,16 @@ public Builder setCallOptions(CallOptions callOptions) {
return this;
}
+ /**
+ * Set the number of preceding attempts of the RPC.
+ *
+ * @since 1.40.0
+ */
+ public Builder setPreviousAttempts(int previousAttempts) {
+ this.previousAttempts = previousAttempts;
+ return this;
+ }
+
/**
* Sets whether the stream is a transparent retry.
*
@@ -211,7 +237,7 @@ public Builder setIsTransparentRetry(boolean isTransparentRetry) {
* Builds a new StreamInfo.
*/
public StreamInfo build() {
- return new StreamInfo(transportAttrs, callOptions, isTransparentRetry);
+ return new StreamInfo(transportAttrs, callOptions, previousAttempts, isTransparentRetry);
}
}
}
diff --git a/api/src/main/java/io/grpc/ManagedChannelBuilder.java b/api/src/main/java/io/grpc/ManagedChannelBuilder.java
index e4a4611541d..98b22807ccc 100644
--- a/api/src/main/java/io/grpc/ManagedChannelBuilder.java
+++ b/api/src/main/java/io/grpc/ManagedChannelBuilder.java
@@ -467,7 +467,6 @@ public T perRpcBufferLimit(long bytes) {
* @return this
* @since 1.11.0
*/
- @ExperimentalApi("https://github.com/grpc/grpc-java/issues/3982")
public T disableRetry() {
throw new UnsupportedOperationException();
}
@@ -479,13 +478,9 @@ public T disableRetry() {
* transparent retries, which are safe for non-idempotent RPCs. Service config is ideally provided
* by the name resolver, but may also be specified via {@link #defaultServiceConfig}.
*
- *
For the current release, this method may have a side effect that disables Census stats and
- * tracing.
- *
* @return this
* @since 1.11.0
*/
- @ExperimentalApi("https://github.com/grpc/grpc-java/issues/3982")
public T enableRetry() {
throw new UnsupportedOperationException();
}
diff --git a/census/src/main/java/io/grpc/census/CensusStatsModule.java b/census/src/main/java/io/grpc/census/CensusStatsModule.java
index ac5f4e705e3..6faeb575ccc 100644
--- a/census/src/main/java/io/grpc/census/CensusStatsModule.java
+++ b/census/src/main/java/io/grpc/census/CensusStatsModule.java
@@ -17,7 +17,6 @@
package io.grpc.census;
import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
@@ -28,16 +27,20 @@
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ClientStreamTracer;
+import io.grpc.ClientStreamTracer.StreamInfo;
import io.grpc.Context;
+import io.grpc.Deadline;
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.ServerStreamTracer;
import io.grpc.Status;
+import io.grpc.Status.Code;
import io.grpc.StreamTracer;
import io.grpc.census.internal.DeprecatedCensusConstants;
import io.opencensus.contrib.grpc.metrics.RpcMeasureConstants;
+import io.opencensus.stats.Measure;
import io.opencensus.stats.Measure.MeasureDouble;
import io.opencensus.stats.Measure.MeasureLong;
import io.opencensus.stats.MeasureMap;
@@ -51,9 +54,11 @@
import io.opencensus.tags.propagation.TagContextSerializationException;
import io.opencensus.tags.unsafe.ContextUtils;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
@@ -61,9 +66,10 @@
/**
* Provides factories for {@link StreamTracer} that records stats to Census.
*
- *
On the client-side, a factory is created for each call, because ClientCall starts earlier than
- * the ClientStream, and in some cases may even not create a ClientStream at all. Therefore, it's
- * the factory that reports the summary to Census.
+ *
On the client-side, a factory is created for each call, and the factory creates a stream
+ * tracer for each attempt. If there is no stream created when the call is ended, we still create a
+ * tracer. It's the tracer that reports per-attempt stats, and the factory that reports the stats
+ * of the overall RPC, such as RETRIES_PER_CALL, to Census.
*
*
On the server-side, there is only one ServerStream per each ServerCall, and ServerStream
* starts earlier than the ServerCall. Therefore, only one tracer is created per stream/call and
@@ -168,7 +174,6 @@ private void recordRealTimeMetric(TagContext ctx, MeasureLong measure, long valu
}
private static final class ClientTracer extends ClientStreamTracer {
-
@Nullable private static final AtomicLongFieldUpdater outboundMessageCountUpdater;
@Nullable private static final AtomicLongFieldUpdater inboundMessageCountUpdater;
@Nullable private static final AtomicLongFieldUpdater outboundWireSizeUpdater;
@@ -222,21 +227,31 @@ private static final class ClientTracer extends ClientStreamTracer {
inboundUncompressedSizeUpdater = tmpInboundUncompressedSizeUpdater;
}
- private final CensusStatsModule module;
+ final Stopwatch stopwatch;
+ final CallAttemptsTracerFactory attemptsState;
+ final AtomicBoolean inboundReceivedOrClosed = new AtomicBoolean();
+ final CensusStatsModule module;
final TagContext parentCtx;
- private final TagContext startCtx;
-
+ final TagContext startCtx;
+ final StreamInfo info;
volatile long outboundMessageCount;
volatile long inboundMessageCount;
volatile long outboundWireSize;
volatile long inboundWireSize;
volatile long outboundUncompressedSize;
volatile long inboundUncompressedSize;
-
- ClientTracer(CensusStatsModule module, TagContext parentCtx, TagContext startCtx) {
- this.module = checkNotNull(module, "module");
+ long roundtripNanos;
+ Code statusCode;
+
+ ClientTracer(
+ CallAttemptsTracerFactory attemptsState, CensusStatsModule module, TagContext parentCtx,
+ TagContext startCtx, StreamInfo info) {
+ this.attemptsState = attemptsState;
+ this.module = module;
this.parentCtx = parentCtx;
- this.startCtx = checkNotNull(startCtx, "startCtx");
+ this.startCtx = startCtx;
+ this.info = info;
+ this.stopwatch = module.stopwatchSupplier.get().start();
}
@Override
@@ -296,6 +311,11 @@ public void inboundUncompressedSize(long bytes) {
@Override
@SuppressWarnings("NonAtomicVolatileUpdate")
public void inboundMessage(int seqNo) {
+ if (inboundReceivedOrClosed.compareAndSet(false, true)) {
+ // Because inboundUncompressedSize() might be called after streamClosed(),
+ // we will report stats in callEnded(). Note that this attempt is already committed.
+ attemptsState.inboundMetricTracer = this;
+ }
if (inboundMessageCountUpdater != null) {
inboundMessageCountUpdater.getAndIncrement(this);
} else {
@@ -316,14 +336,74 @@ public void outboundMessage(int seqNo) {
module.recordRealTimeMetric(
startCtx, RpcMeasureConstants.GRPC_CLIENT_SENT_MESSAGES_PER_METHOD, 1);
}
+
+ @Override
+ public void streamClosed(Status status) {
+ attemptsState.attemptEnded();
+ stopwatch.stop();
+ roundtripNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS);
+ Deadline deadline = info.getCallOptions().getDeadline();
+ statusCode = status.getCode();
+ if (statusCode == Status.Code.CANCELLED && deadline != null) {
+ // When the server's deadline expires, it can only reset the stream with CANCEL and no
+ // description. Since our timer may be delayed in firing, we double-check the deadline and
+ // turn the failure into the likely more helpful DEADLINE_EXCEEDED status.
+ if (deadline.isExpired()) {
+ statusCode = Code.DEADLINE_EXCEEDED;
+ }
+ }
+ if (inboundReceivedOrClosed.compareAndSet(false, true)) {
+ if (module.recordFinishedRpcs) {
+ // Stream is closed early. So no need to record metrics for any inbound events after this
+ // point.
+ recordFinishedRpc();
+ }
+ } // Otherwise will report stats in callEnded() to guarantee all inbound metrics are recorded.
+ }
+
+ void recordFinishedRpc() {
+ MeasureMap measureMap = module.statsRecorder.newMeasureMap()
+ // TODO(songya): remove the deprecated measure constants once they are completed removed.
+ .put(DeprecatedCensusConstants.RPC_CLIENT_FINISHED_COUNT, 1)
+ // The latency is double value
+ .put(
+ DeprecatedCensusConstants.RPC_CLIENT_ROUNDTRIP_LATENCY,
+ roundtripNanos / NANOS_PER_MILLI)
+ .put(DeprecatedCensusConstants.RPC_CLIENT_REQUEST_COUNT, outboundMessageCount)
+ .put(DeprecatedCensusConstants.RPC_CLIENT_RESPONSE_COUNT, inboundMessageCount)
+ .put(DeprecatedCensusConstants.RPC_CLIENT_REQUEST_BYTES, outboundWireSize)
+ .put(DeprecatedCensusConstants.RPC_CLIENT_RESPONSE_BYTES, inboundWireSize)
+ .put(
+ DeprecatedCensusConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES,
+ outboundUncompressedSize)
+ .put(
+ DeprecatedCensusConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES,
+ inboundUncompressedSize);
+ if (statusCode != Code.OK) {
+ measureMap.put(DeprecatedCensusConstants.RPC_CLIENT_ERROR_COUNT, 1);
+ }
+ TagValue statusTag = TagValue.create(statusCode.toString());
+ measureMap.record(
+ module
+ .tagger
+ .toBuilder(startCtx)
+ .putLocal(RpcMeasureConstants.GRPC_CLIENT_STATUS, statusTag)
+ .build());
+ }
}
@VisibleForTesting
static final class CallAttemptsTracerFactory extends
ClientStreamTracer.InternalLimitedInfoFactory {
- @Nullable
- private static final AtomicReferenceFieldUpdater
- streamTracerUpdater;
+ static final MeasureLong RETRIES_PER_CALL =
+ Measure.MeasureLong.create(
+ "grpc.io/client/retries_per_call", "Number of retries per call", "1");
+ static final MeasureLong TRANSPARENT_RETRIES_PER_CALL =
+ Measure.MeasureLong.create(
+ "grpc.io/client/transparent_retries_per_call", "Transparent retries per call", "1");
+ static final MeasureDouble RETRY_DELAY_PER_CALL =
+ Measure.MeasureDouble.create(
+ "grpc.io/client/retry_delay_per_call", "Retry delay per call", "ms");
@Nullable
private static final AtomicIntegerFieldUpdater callEndedUpdater;
@@ -334,40 +414,45 @@ static final class CallAttemptsTracerFactory extends
* (potentially racy) direct updates of the volatile variables.
*/
static {
- AtomicReferenceFieldUpdater tmpStreamTracerUpdater;
AtomicIntegerFieldUpdater tmpCallEndedUpdater;
try {
- tmpStreamTracerUpdater =
- AtomicReferenceFieldUpdater.newUpdater(
- CallAttemptsTracerFactory.class, ClientTracer.class, "streamTracer");
tmpCallEndedUpdater =
AtomicIntegerFieldUpdater.newUpdater(CallAttemptsTracerFactory.class, "callEnded");
} catch (Throwable t) {
logger.log(Level.SEVERE, "Creating atomic field updaters failed", t);
- tmpStreamTracerUpdater = null;
tmpCallEndedUpdater = null;
}
- streamTracerUpdater = tmpStreamTracerUpdater;
callEndedUpdater = tmpCallEndedUpdater;
}
+ ClientTracer inboundMetricTracer;
private final CensusStatsModule module;
private final Stopwatch stopwatch;
- private volatile ClientTracer streamTracer;
private volatile int callEnded;
private final TagContext parentCtx;
private final TagContext startCtx;
+ private final String fullMethodName;
+
+ // TODO(zdapeng): optimize memory allocation using AtomicFieldUpdater.
+ private final AtomicLong attemptsPerCall = new AtomicLong();
+ private final AtomicLong transparentRetriesPerCall = new AtomicLong();
+ private final AtomicLong retryDelayNanos = new AtomicLong();
+ private final AtomicLong lastInactiveTimeStamp = new AtomicLong();
+ private final AtomicInteger activeStreams = new AtomicInteger();
+ private final AtomicBoolean activated = new AtomicBoolean();
CallAttemptsTracerFactory(
CensusStatsModule module, TagContext parentCtx, String fullMethodName) {
- this.module = checkNotNull(module);
- this.parentCtx = checkNotNull(parentCtx);
+ this.module = checkNotNull(module, "module");
+ this.parentCtx = checkNotNull(parentCtx, "parentCtx");
+ this.fullMethodName = checkNotNull(fullMethodName, "fullMethodName");
+ this.stopwatch = module.stopwatchSupplier.get().start();
TagValue methodTag = TagValue.create(fullMethodName);
- this.startCtx = module.tagger.toBuilder(parentCtx)
+ startCtx = module.tagger.toBuilder(parentCtx)
.putLocal(RpcMeasureConstants.GRPC_CLIENT_METHOD, methodTag)
.build();
- this.stopwatch = module.stopwatchSupplier.get().start();
if (module.recordStartedRpcs) {
+ // Record here in case newClientStreamTracer() would never be called.
module.statsRecorder.newMeasureMap()
.put(DeprecatedCensusConstants.RPC_CLIENT_STARTED_COUNT, 1)
.record(startCtx);
@@ -375,30 +460,37 @@ static final class CallAttemptsTracerFactory extends
}
@Override
- public ClientStreamTracer newClientStreamTracer(
- ClientStreamTracer.StreamInfo info, Metadata headers) {
- ClientTracer tracer = new ClientTracer(module, parentCtx, startCtx);
- // TODO(zhangkun83): Once retry or hedging is implemented, a ClientCall may start more than
- // one streams. We will need to update this file to support them.
- if (streamTracerUpdater != null) {
- checkState(
- streamTracerUpdater.compareAndSet(this, null, tracer),
- "Are you creating multiple streams per call? This class doesn't yet support this case");
+ public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata metadata) {
+ ClientTracer tracer = new ClientTracer(this, module, parentCtx, startCtx, info);
+ if (activeStreams.incrementAndGet() == 1) {
+ if (!activated.compareAndSet(false, true)) {
+ retryDelayNanos.addAndGet(stopwatch.elapsed(TimeUnit.NANOSECONDS));
+ }
+ }
+ if (module.recordStartedRpcs && attemptsPerCall.get() > 0) {
+ module.statsRecorder.newMeasureMap()
+ .put(DeprecatedCensusConstants.RPC_CLIENT_STARTED_COUNT, 1)
+ .record(startCtx);
+ }
+ if (info.isTransparentRetry()) {
+ transparentRetriesPerCall.incrementAndGet();
} else {
- checkState(
- streamTracer == null,
- "Are you creating multiple streams per call? This class doesn't yet support this case");
- streamTracer = tracer;
+ attemptsPerCall.incrementAndGet();
}
return tracer;
}
- /**
- * Record a finished call and mark the current time as the end time.
- *
- * Can be called from any thread without synchronization. Calling it the second time or more
- * is a no-op.
- */
+ // Called whenever each attempt is ended.
+ void attemptEnded() {
+ if (activeStreams.decrementAndGet() == 0) {
+ // Race condition between two extremely close events does not matter because the difference
+ // in the result would be very small.
+ long lastInactiveTimeStamp =
+ this.lastInactiveTimeStamp.getAndSet(stopwatch.elapsed(TimeUnit.NANOSECONDS));
+ retryDelayNanos.addAndGet(-lastInactiveTimeStamp);
+ }
+ }
+
void callEnded(Status status) {
if (callEndedUpdater != null) {
if (callEndedUpdater.getAndSet(this, 1) != 0) {
@@ -414,36 +506,30 @@ void callEnded(Status status) {
return;
}
stopwatch.stop();
- long roundtripNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS);
- ClientTracer tracer = streamTracer;
- if (tracer == null) {
- tracer = new ClientTracer(module, parentCtx, startCtx);
+ if (attemptsPerCall.get() == 0) {
+ ClientTracer tracer = new ClientTracer(this, module, parentCtx, startCtx, null);
+ tracer.roundtripNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS);
+ tracer.statusCode = status.getCode();
+ tracer.recordFinishedRpc();
+ } else if (inboundMetricTracer != null) {
+ inboundMetricTracer.recordFinishedRpc();
}
- MeasureMap measureMap = module.statsRecorder.newMeasureMap()
- // TODO(songya): remove the deprecated measure constants once they are completed removed.
- .put(DeprecatedCensusConstants.RPC_CLIENT_FINISHED_COUNT, 1)
- // The latency is double value
- .put(
- DeprecatedCensusConstants.RPC_CLIENT_ROUNDTRIP_LATENCY,
- roundtripNanos / NANOS_PER_MILLI)
- .put(DeprecatedCensusConstants.RPC_CLIENT_REQUEST_COUNT, tracer.outboundMessageCount)
- .put(DeprecatedCensusConstants.RPC_CLIENT_RESPONSE_COUNT, tracer.inboundMessageCount)
- .put(DeprecatedCensusConstants.RPC_CLIENT_REQUEST_BYTES, tracer.outboundWireSize)
- .put(DeprecatedCensusConstants.RPC_CLIENT_RESPONSE_BYTES, tracer.inboundWireSize)
- .put(
- DeprecatedCensusConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES,
- tracer.outboundUncompressedSize)
- .put(
- DeprecatedCensusConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES,
- tracer.inboundUncompressedSize);
- if (!status.isOk()) {
- measureMap.put(DeprecatedCensusConstants.RPC_CLIENT_ERROR_COUNT, 1);
+
+ long retriesPerCall = 0;
+ long attempts = attemptsPerCall.get();
+ if (attempts > 0) {
+ retriesPerCall = attempts - 1;
}
+ MeasureMap measureMap = module.statsRecorder.newMeasureMap()
+ .put(RETRIES_PER_CALL, retriesPerCall)
+ .put(TRANSPARENT_RETRIES_PER_CALL, transparentRetriesPerCall.get())
+ .put(RETRY_DELAY_PER_CALL, retryDelayNanos.get() / NANOS_PER_MILLI);
+ TagValue methodTag = TagValue.create(fullMethodName);
TagValue statusTag = TagValue.create(status.getCode().toString());
measureMap.record(
- module
- .tagger
- .toBuilder(startCtx)
+ module.tagger
+ .toBuilder(parentCtx)
+ .putLocal(RpcMeasureConstants.GRPC_CLIENT_METHOD, methodTag)
.putLocal(RpcMeasureConstants.GRPC_CLIENT_STATUS, statusTag)
.build());
}
diff --git a/census/src/main/java/io/grpc/census/CensusTracingModule.java b/census/src/main/java/io/grpc/census/CensusTracingModule.java
index dac62206fd2..08d5fe3ca97 100644
--- a/census/src/main/java/io/grpc/census/CensusTracingModule.java
+++ b/census/src/main/java/io/grpc/census/CensusTracingModule.java
@@ -32,6 +32,7 @@
import io.grpc.MethodDescriptor;
import io.grpc.ServerStreamTracer;
import io.grpc.StreamTracer;
+import io.opencensus.trace.AttributeValue;
import io.opencensus.trace.BlankSpan;
import io.opencensus.trace.EndSpanOptions;
import io.opencensus.trace.MessageEvent;
@@ -60,7 +61,8 @@
final class CensusTracingModule {
private static final Logger logger = Logger.getLogger(CensusTracingModule.class.getName());
- @Nullable private static final AtomicIntegerFieldUpdater callEndedUpdater;
+ @Nullable
+ private static final AtomicIntegerFieldUpdater callEndedUpdater;
@Nullable private static final AtomicIntegerFieldUpdater streamClosedUpdater;
@@ -70,11 +72,11 @@ final class CensusTracingModule {
* (potentially racy) direct updates of the volatile variables.
*/
static {
- AtomicIntegerFieldUpdater tmpCallEndedUpdater;
+ AtomicIntegerFieldUpdater tmpCallEndedUpdater;
AtomicIntegerFieldUpdater tmpStreamClosedUpdater;
try {
tmpCallEndedUpdater =
- AtomicIntegerFieldUpdater.newUpdater(ClientCallTracer.class, "callEnded");
+ AtomicIntegerFieldUpdater.newUpdater(CallAttemptsTracerFactory.class, "callEnded");
tmpStreamClosedUpdater =
AtomicIntegerFieldUpdater.newUpdater(ServerTracer.class, "streamClosed");
} catch (Throwable t) {
@@ -116,11 +118,12 @@ public SpanContext parseBytes(byte[] serialized) {
}
/**
- * Creates a {@link ClientCallTracer} for a new call.
+ * Creates a {@link CallAttemptsTracerFactory} for a new call.
*/
@VisibleForTesting
- ClientCallTracer newClientCallTracer(@Nullable Span parentSpan, MethodDescriptor, ?> method) {
- return new ClientCallTracer(parentSpan, method);
+ CallAttemptsTracerFactory newClientCallTracer(
+ @Nullable Span parentSpan, MethodDescriptor, ?> method) {
+ return new CallAttemptsTracerFactory(parentSpan, method);
}
/**
@@ -223,19 +226,21 @@ private static void recordMessageEvent(
}
@VisibleForTesting
- final class ClientCallTracer extends ClientStreamTracer.InternalLimitedInfoFactory {
+ final class CallAttemptsTracerFactory extends ClientStreamTracer.InternalLimitedInfoFactory {
volatile int callEnded;
private final boolean isSampledToLocalTracing;
private final Span span;
+ private final String fullMethodName;
- ClientCallTracer(@Nullable Span parentSpan, MethodDescriptor, ?> method) {
+ CallAttemptsTracerFactory(@Nullable Span parentSpan, MethodDescriptor, ?> method) {
checkNotNull(method, "method");
this.isSampledToLocalTracing = method.isSampledToLocalTracing();
+ this.fullMethodName = method.getFullMethodName();
this.span =
censusTracer
.spanBuilderWithExplicitParent(
- generateTraceSpanName(false, method.getFullMethodName()),
+ generateTraceSpanName(false, fullMethodName),
parentSpan)
.setRecordEvents(true)
.startSpan();
@@ -244,7 +249,17 @@ final class ClientCallTracer extends ClientStreamTracer.InternalLimitedInfoFacto
@Override
public ClientStreamTracer newClientStreamTracer(
ClientStreamTracer.StreamInfo info, Metadata headers) {
- return new ClientTracer(span, tracingHeader);
+ Span attemptSpan = censusTracer
+ .spanBuilderWithExplicitParent(
+ "Attempt." + fullMethodName.replace('/', '.'),
+ span)
+ .setRecordEvents(true)
+ .startSpan();
+ attemptSpan.putAttribute(
+ "previous-rpc-attempts", AttributeValue.longAttributeValue(info.getPreviousAttempts()));
+ attemptSpan.putAttribute(
+ "transparent-retry", AttributeValue.booleanAttributeValue(info.isTransparentRetry()));
+ return new ClientTracer(attemptSpan, tracingHeader, isSampledToLocalTracing);
}
/**
@@ -271,10 +286,13 @@ void callEnded(io.grpc.Status status) {
private static final class ClientTracer extends ClientStreamTracer {
private final Span span;
final Metadata.Key tracingHeader;
+ final boolean isSampledToLocalTracing;
- ClientTracer(Span span, Metadata.Key tracingHeader) {
+ ClientTracer(
+ Span span, Metadata.Key tracingHeader, boolean isSampledToLocalTracing) {
this.span = checkNotNull(span, "span");
this.tracingHeader = tracingHeader;
+ this.isSampledToLocalTracing = isSampledToLocalTracing;
}
@Override
@@ -298,6 +316,11 @@ public void inboundMessageRead(
recordMessageEvent(
span, MessageEvent.Type.RECEIVED, seqNo, optionalWireSize, optionalUncompressedSize);
}
+
+ @Override
+ public void streamClosed(io.grpc.Status status) {
+ span.end(createEndSpanOptions(status, isSampledToLocalTracing));
+ }
}
@@ -388,7 +411,7 @@ public ClientCall interceptCall(
// Safe usage of the unsafe trace API because CONTEXT_SPAN_KEY.get() returns the same value
// as Tracer.getCurrentSpan() except when no value available when the return value is null
// for the direct access and BlankSpan when Tracer API is used.
- final ClientCallTracer tracerFactory =
+ final CallAttemptsTracerFactory tracerFactory =
newClientCallTracer(ContextUtils.getValue(Context.current()), method);
ClientCall call =
next.newCall(
diff --git a/census/src/test/java/io/grpc/census/CensusModulesTest.java b/census/src/test/java/io/grpc/census/CensusModulesTest.java
index fd3a049f7a4..d285c8fe8c2 100644
--- a/census/src/test/java/io/grpc/census/CensusModulesTest.java
+++ b/census/src/test/java/io/grpc/census/CensusModulesTest.java
@@ -18,6 +18,9 @@
import static com.google.common.truth.Truth.assertThat;
import static com.google.common.truth.Truth.assertWithMessage;
+import static io.grpc.census.CensusStatsModule.CallAttemptsTracerFactory.RETRIES_PER_CALL;
+import static io.grpc.census.CensusStatsModule.CallAttemptsTracerFactory.RETRY_DELAY_PER_CALL;
+import static io.grpc.census.CensusStatsModule.CallAttemptsTracerFactory.TRANSPARENT_RETRIES_PER_CALL;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -58,6 +61,7 @@
import io.grpc.ServerStreamTracer;
import io.grpc.ServerStreamTracer.ServerCallInfo;
import io.grpc.Status;
+import io.grpc.census.CensusTracingModule.CallAttemptsTracerFactory;
import io.grpc.census.internal.DeprecatedCensusConstants;
import io.grpc.internal.FakeClock;
import io.grpc.internal.testing.StatsTestUtils;
@@ -81,6 +85,7 @@
import io.opencensus.stats.View;
import io.opencensus.tags.TagContext;
import io.opencensus.tags.TagValue;
+import io.opencensus.trace.AttributeValue;
import io.opencensus.trace.BlankSpan;
import io.opencensus.trace.EndSpanOptions;
import io.opencensus.trace.MessageEvent;
@@ -173,10 +178,12 @@ public String parse(InputStream stream) {
private final Random random = new Random(1234);
private final Span fakeClientParentSpan = MockableSpan.generateRandomSpan(random);
private final Span spyClientSpan = spy(MockableSpan.generateRandomSpan(random));
- private final SpanContext fakeClientSpanContext = spyClientSpan.getContext();
+ private final Span spyAttemptSpan = spy(MockableSpan.generateRandomSpan(random));
+ private final SpanContext fakeAttemptSpanContext = spyAttemptSpan.getContext();
private final Span spyServerSpan = spy(MockableSpan.generateRandomSpan(random));
private final byte[] binarySpanContext = new byte[]{3, 1, 5};
private final SpanBuilder spyClientSpanBuilder = spy(new MockableSpan.Builder());
+ private final SpanBuilder spyAttemptSpanBuilder = spy(new MockableSpan.Builder());
private final SpanBuilder spyServerSpanBuilder = spy(new MockableSpan.Builder());
@Rule
@@ -201,15 +208,20 @@ public String parse(InputStream stream) {
@Before
public void setUp() throws Exception {
when(spyClientSpanBuilder.startSpan()).thenReturn(spyClientSpan);
- when(tracer.spanBuilderWithExplicitParent(anyString(), ArgumentMatchers.any()))
+ when(spyAttemptSpanBuilder.startSpan()).thenReturn(spyAttemptSpan);
+ when(tracer.spanBuilderWithExplicitParent(
+ eq("Sent.package1.service2.method3"), ArgumentMatchers.any()))
.thenReturn(spyClientSpanBuilder);
+ when(tracer.spanBuilderWithExplicitParent(
+ eq("Attempt.package1.service2.method3"), ArgumentMatchers.any()))
+ .thenReturn(spyAttemptSpanBuilder);
when(spyServerSpanBuilder.startSpan()).thenReturn(spyServerSpan);
when(tracer.spanBuilderWithRemoteParent(anyString(), ArgumentMatchers.any()))
.thenReturn(spyServerSpanBuilder);
when(mockTracingPropagationHandler.toByteArray(any(SpanContext.class)))
.thenReturn(binarySpanContext);
when(mockTracingPropagationHandler.fromByteArray(any(byte[].class)))
- .thenReturn(fakeClientSpanContext);
+ .thenReturn(fakeAttemptSpanContext);
censusStats =
new CensusStatsModule(
tagger, tagCtxSerializer, statsRecorder, fakeClock.getStopwatchSupplier(),
@@ -292,7 +304,7 @@ public ClientCall interceptCall(
assertEquals(2, capturedCallOptions.get().getStreamTracerFactories().size());
assertTrue(
capturedCallOptions.get().getStreamTracerFactories().get(0)
- instanceof CensusTracingModule.ClientCallTracer);
+ instanceof CallAttemptsTracerFactory);
assertTrue(
capturedCallOptions.get().getStreamTracerFactories().get(1)
instanceof CensusStatsModule.CallAttemptsTracerFactory);
@@ -355,6 +367,7 @@ record = statsRecorder.pollRecord();
.setSampleToLocalSpanStore(false)
.build());
verify(spyClientSpan, never()).end();
+ assertZeroRetryRecorded();
}
@Test
@@ -489,11 +502,200 @@ private void subtestClientBasicStatsDefaultContext(
DeprecatedCensusConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES));
assertEquals(30 + 100 + 16 + 24,
record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_ROUNDTRIP_LATENCY));
+ assertZeroRetryRecorded();
} else {
assertNull(statsRecorder.pollRecord());
}
}
+ // This test is only unit-testing the stat recording logic. The retry behavior is faked.
+ @Test
+ public void recordRetryStats() {
+ CensusStatsModule localCensusStats =
+ new CensusStatsModule(
+ tagger, tagCtxSerializer, statsRecorder, fakeClock.getStopwatchSupplier(),
+ true, true, true, true);
+ CensusStatsModule.CallAttemptsTracerFactory callAttemptsTracerFactory =
+ new CensusStatsModule.CallAttemptsTracerFactory(
+ localCensusStats, tagger.empty(), method.getFullMethodName());
+ ClientStreamTracer tracer =
+ callAttemptsTracerFactory.newClientStreamTracer(STREAM_INFO, new Metadata());
+
+ StatsTestUtils.MetricsRecord record = statsRecorder.pollRecord();
+ assertEquals(1, record.tags.size());
+ TagValue methodTag = record.tags.get(RpcMeasureConstants.GRPC_CLIENT_METHOD);
+ assertEquals(method.getFullMethodName(), methodTag.asString());
+ assertEquals(
+ 1, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_STARTED_COUNT));
+
+ fakeClock.forwardTime(30, MILLISECONDS);
+ tracer.outboundHeaders();
+ fakeClock.forwardTime(100, MILLISECONDS);
+ tracer.outboundMessage(0);
+ assertRealTimeMetric(
+ RpcMeasureConstants.GRPC_CLIENT_SENT_MESSAGES_PER_METHOD, 1, true, true);
+ tracer.outboundMessage(1);
+ assertRealTimeMetric(
+ RpcMeasureConstants.GRPC_CLIENT_SENT_MESSAGES_PER_METHOD, 1, true, true);
+ tracer.outboundWireSize(1028);
+ assertRealTimeMetric(
+ RpcMeasureConstants.GRPC_CLIENT_SENT_BYTES_PER_METHOD, 1028, true, true);
+ tracer.outboundUncompressedSize(1128);
+ fakeClock.forwardTime(24, MILLISECONDS);
+ tracer.streamClosed(Status.UNAVAILABLE);
+ record = statsRecorder.pollRecord();
+ methodTag = record.tags.get(RpcMeasureConstants.GRPC_CLIENT_METHOD);
+ assertEquals(method.getFullMethodName(), methodTag.asString());
+ TagValue statusTag = record.tags.get(RpcMeasureConstants.GRPC_CLIENT_STATUS);
+ assertEquals(Status.Code.UNAVAILABLE.toString(), statusTag.asString());
+ assertEquals(
+ 1, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_FINISHED_COUNT));
+ assertEquals(1, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_ERROR_COUNT));
+ assertEquals(
+ 2, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_REQUEST_COUNT));
+ assertEquals(
+ 1028, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_REQUEST_BYTES));
+ assertEquals(
+ 1128,
+ record.getMetricAsLongOrFail(
+ DeprecatedCensusConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES));
+ assertEquals(
+ 30 + 100 + 24,
+ record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_ROUNDTRIP_LATENCY));
+
+ // faking retry
+ fakeClock.forwardTime(1000, MILLISECONDS);
+ tracer = callAttemptsTracerFactory.newClientStreamTracer(STREAM_INFO, new Metadata());
+ record = statsRecorder.pollRecord();
+ assertEquals(1, record.tags.size());
+ methodTag = record.tags.get(RpcMeasureConstants.GRPC_CLIENT_METHOD);
+ assertEquals(method.getFullMethodName(), methodTag.asString());
+ assertEquals(
+ 1, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_STARTED_COUNT));
+ tracer.outboundHeaders();
+ tracer.outboundMessage(0);
+ assertRealTimeMetric(
+ RpcMeasureConstants.GRPC_CLIENT_SENT_MESSAGES_PER_METHOD, 1, true, true);
+ tracer.outboundMessage(1);
+ assertRealTimeMetric(
+ RpcMeasureConstants.GRPC_CLIENT_SENT_MESSAGES_PER_METHOD, 1, true, true);
+ tracer.outboundWireSize(1028);
+ assertRealTimeMetric(
+ RpcMeasureConstants.GRPC_CLIENT_SENT_BYTES_PER_METHOD, 1028, true, true);
+ tracer.outboundUncompressedSize(1128);
+ fakeClock.forwardTime(100, MILLISECONDS);
+ tracer.streamClosed(Status.NOT_FOUND);
+ record = statsRecorder.pollRecord();
+ methodTag = record.tags.get(RpcMeasureConstants.GRPC_CLIENT_METHOD);
+ assertEquals(method.getFullMethodName(), methodTag.asString());
+ statusTag = record.tags.get(RpcMeasureConstants.GRPC_CLIENT_STATUS);
+ assertEquals(Status.Code.NOT_FOUND.toString(), statusTag.asString());
+ assertEquals(
+ 1, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_FINISHED_COUNT));
+ assertEquals(1, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_ERROR_COUNT));
+ assertEquals(
+ 2, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_REQUEST_COUNT));
+ assertEquals(
+ 1028, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_REQUEST_BYTES));
+ assertEquals(
+ 1128,
+ record.getMetricAsLongOrFail(
+ DeprecatedCensusConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES));
+ assertEquals(
+ 100 ,
+ record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_ROUNDTRIP_LATENCY));
+
+ // fake transparent retry
+ fakeClock.forwardTime(10, MILLISECONDS);
+ tracer = callAttemptsTracerFactory.newClientStreamTracer(
+ STREAM_INFO.toBuilder().setIsTransparentRetry(true).build(), new Metadata());
+ record = statsRecorder.pollRecord();
+ assertEquals(1, record.tags.size());
+ methodTag = record.tags.get(RpcMeasureConstants.GRPC_CLIENT_METHOD);
+ assertEquals(method.getFullMethodName(), methodTag.asString());
+ assertEquals(
+ 1, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_STARTED_COUNT));
+ tracer.streamClosed(Status.UNAVAILABLE);
+ record = statsRecorder.pollRecord();
+ statusTag = record.tags.get(RpcMeasureConstants.GRPC_CLIENT_STATUS);
+ assertEquals(Status.Code.UNAVAILABLE.toString(), statusTag.asString());
+ assertEquals(
+ 1, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_FINISHED_COUNT));
+ assertEquals(1, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_ERROR_COUNT));
+ assertEquals(
+ 0, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_REQUEST_COUNT));
+ assertEquals(
+ 0, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_REQUEST_BYTES));
+
+ // fake another transparent retry
+ fakeClock.forwardTime(10, MILLISECONDS);
+ tracer = callAttemptsTracerFactory.newClientStreamTracer(
+ STREAM_INFO.toBuilder().setIsTransparentRetry(true).build(), new Metadata());
+ record = statsRecorder.pollRecord();
+ assertEquals(1, record.tags.size());
+ assertEquals(
+ 1, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_STARTED_COUNT));
+ tracer.outboundHeaders();
+ tracer.outboundMessage(0);
+ assertRealTimeMetric(
+ RpcMeasureConstants.GRPC_CLIENT_SENT_MESSAGES_PER_METHOD, 1, true, true);
+ tracer.outboundMessage(1);
+ assertRealTimeMetric(
+ RpcMeasureConstants.GRPC_CLIENT_SENT_MESSAGES_PER_METHOD, 1, true, true);
+ tracer.outboundWireSize(1028);
+ assertRealTimeMetric(
+ RpcMeasureConstants.GRPC_CLIENT_SENT_BYTES_PER_METHOD, 1028, true, true);
+ tracer.outboundUncompressedSize(1128);
+ fakeClock.forwardTime(16, MILLISECONDS);
+ tracer.inboundMessage(0);
+ assertRealTimeMetric(
+ RpcMeasureConstants.GRPC_CLIENT_RECEIVED_MESSAGES_PER_METHOD, 1, true, true);
+ tracer.inboundWireSize(33);
+ assertRealTimeMetric(
+ RpcMeasureConstants.GRPC_CLIENT_RECEIVED_BYTES_PER_METHOD, 33, true, true);
+ tracer.inboundUncompressedSize(67);
+ fakeClock.forwardTime(24, MILLISECONDS);
+ // RPC succeeded
+ tracer.streamClosed(Status.OK);
+ callAttemptsTracerFactory.callEnded(Status.OK);
+
+ record = statsRecorder.pollRecord();
+ statusTag = record.tags.get(RpcMeasureConstants.GRPC_CLIENT_STATUS);
+ assertEquals(Status.Code.OK.toString(), statusTag.asString());
+ assertEquals(
+ 1, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_FINISHED_COUNT));
+ assertThat(record.getMetric(DeprecatedCensusConstants.RPC_CLIENT_ERROR_COUNT)).isNull();
+ assertEquals(
+ 2, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_REQUEST_COUNT));
+ assertEquals(
+ 1028, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_REQUEST_BYTES));
+ assertEquals(
+ 1128,
+ record.getMetricAsLongOrFail(
+ DeprecatedCensusConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES));
+ assertEquals(
+ 1, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_RESPONSE_COUNT));
+ assertEquals(
+ 33,
+ record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_RESPONSE_BYTES));
+ assertEquals(
+ 67,
+ record.getMetricAsLongOrFail(
+ DeprecatedCensusConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES));
+ assertEquals(
+ 16 + 24 ,
+ record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_ROUNDTRIP_LATENCY));
+
+ record = statsRecorder.pollRecord();
+ methodTag = record.tags.get(RpcMeasureConstants.GRPC_CLIENT_METHOD);
+ assertEquals(method.getFullMethodName(), methodTag.asString());
+ statusTag = record.tags.get(RpcMeasureConstants.GRPC_CLIENT_STATUS);
+ assertEquals(Status.Code.OK.toString(), statusTag.asString());
+ assertThat(record.getMetric(RETRIES_PER_CALL)).isEqualTo(1);
+ assertThat(record.getMetric(TRANSPARENT_RETRIES_PER_CALL)).isEqualTo(2);
+ assertThat(record.getMetric(RETRY_DELAY_PER_CALL)).isEqualTo(1000D + 10 + 10);
+ }
+
private void assertRealTimeMetric(
Measure measure, long expectedValue, boolean recordRealTimeMetrics, boolean clientSide) {
StatsTestUtils.MetricsRecord record = statsRecorder.pollRecord();
@@ -517,16 +719,28 @@ private void assertRealTimeMetric(
assertEquals(expectedValue, record.getMetricAsLongOrFail(measure));
}
+ private void assertZeroRetryRecorded() {
+ StatsTestUtils.MetricsRecord record = statsRecorder.pollRecord();
+ TagValue methodTag = record.tags.get(RpcMeasureConstants.GRPC_CLIENT_METHOD);
+ assertEquals(method.getFullMethodName(), methodTag.asString());
+ assertThat(record.getMetric(RETRIES_PER_CALL)).isEqualTo(0);
+ assertThat(record.getMetric(TRANSPARENT_RETRIES_PER_CALL)).isEqualTo(0);
+ assertThat(record.getMetric(RETRY_DELAY_PER_CALL)).isEqualTo(0D);
+ }
+
@Test
public void clientBasicTracingDefaultSpan() {
- CensusTracingModule.ClientCallTracer callTracer =
+ CallAttemptsTracerFactory callTracer =
censusTracing.newClientCallTracer(null, method);
Metadata headers = new Metadata();
ClientStreamTracer clientStreamTracer = callTracer.newClientStreamTracer(STREAM_INFO, headers);
clientStreamTracer.streamCreated(Attributes.EMPTY, headers);
verify(tracer).spanBuilderWithExplicitParent(
eq("Sent.package1.service2.method3"), ArgumentMatchers.isNull());
+ verify(tracer).spanBuilderWithExplicitParent(
+ eq("Attempt.package1.service2.method3"), eq(spyClientSpan));
verify(spyClientSpan, never()).end(any(EndSpanOptions.class));
+ verify(spyAttemptSpan, never()).end(any(EndSpanOptions.class));
clientStreamTracer.outboundMessage(0);
clientStreamTracer.outboundMessageSent(0, 882, -1);
@@ -538,8 +752,12 @@ public void clientBasicTracingDefaultSpan() {
clientStreamTracer.streamClosed(Status.OK);
callTracer.callEnded(Status.OK);
- InOrder inOrder = inOrder(spyClientSpan);
- inOrder.verify(spyClientSpan, times(3)).addMessageEvent(messageEventCaptor.capture());
+ InOrder inOrder = inOrder(spyClientSpan, spyAttemptSpan);
+ inOrder.verify(spyAttemptSpan)
+ .putAttribute("previous-rpc-attempts", AttributeValue.longAttributeValue(0));
+ inOrder.verify(spyAttemptSpan)
+ .putAttribute("transparent-retry", AttributeValue.booleanAttributeValue(false));
+ inOrder.verify(spyAttemptSpan, times(3)).addMessageEvent(messageEventCaptor.capture());
List events = messageEventCaptor.getAllValues();
assertEquals(
MessageEvent.builder(MessageEvent.Type.SENT, 0).setCompressedMessageSize(882).build(),
@@ -553,18 +771,23 @@ public void clientBasicTracingDefaultSpan() {
.setUncompressedMessageSize(90)
.build(),
events.get(2));
+ inOrder.verify(spyAttemptSpan).end(
+ EndSpanOptions.builder()
+ .setStatus(io.opencensus.trace.Status.OK)
+ .setSampleToLocalSpanStore(false)
+ .build());
inOrder.verify(spyClientSpan).end(
EndSpanOptions.builder()
.setStatus(io.opencensus.trace.Status.OK)
.setSampleToLocalSpanStore(false)
.build());
- verifyNoMoreInteractions(spyClientSpan);
+ inOrder.verifyNoMoreInteractions();
verifyNoMoreInteractions(tracer);
}
@Test
public void clientTracingSampledToLocalSpanStore() {
- CensusTracingModule.ClientCallTracer callTracer =
+ CallAttemptsTracerFactory callTracer =
censusTracing.newClientCallTracer(null, sampledMethod);
callTracer.callEnded(Status.OK);
@@ -631,11 +854,12 @@ record = statsRecorder.pollRecord();
3000,
record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_ROUNDTRIP_LATENCY));
assertNull(record.getMetric(DeprecatedCensusConstants.RPC_CLIENT_SERVER_ELAPSED_TIME));
+ assertZeroRetryRecorded();
}
@Test
public void clientStreamNeverCreatedStillRecordTracing() {
- CensusTracingModule.ClientCallTracer callTracer =
+ CallAttemptsTracerFactory callTracer =
censusTracing.newClientCallTracer(fakeClientParentSpan, method);
verify(tracer).spanBuilderWithExplicitParent(
eq("Sent.package1.service2.method3"), same(fakeClientParentSpan));
@@ -770,6 +994,7 @@ private void subtestStatsHeadersPropagateTags(boolean propagate, boolean recordS
assertNull(clientRecord.getMetric(DeprecatedCensusConstants.RPC_CLIENT_ERROR_COUNT));
TagValue clientPropagatedTag = clientRecord.tags.get(StatsTestUtils.EXTRA_TAG);
assertEquals("extra-tag-value-897", clientPropagatedTag.asString());
+ assertZeroRetryRecorded();
}
if (!recordStats) {
@@ -812,16 +1037,18 @@ public void statsHeaderMalformed() {
@Test
public void traceHeadersPropagateSpanContext() throws Exception {
- CensusTracingModule.ClientCallTracer callTracer =
+ CallAttemptsTracerFactory callTracer =
censusTracing.newClientCallTracer(fakeClientParentSpan, method);
Metadata headers = new Metadata();
ClientStreamTracer streamTracer = callTracer.newClientStreamTracer(STREAM_INFO, headers);
streamTracer.streamCreated(Attributes.EMPTY, headers);
- verify(mockTracingPropagationHandler).toByteArray(same(fakeClientSpanContext));
+ verify(mockTracingPropagationHandler).toByteArray(same(fakeAttemptSpanContext));
verifyNoMoreInteractions(mockTracingPropagationHandler);
verify(tracer).spanBuilderWithExplicitParent(
eq("Sent.package1.service2.method3"), same(fakeClientParentSpan));
+ verify(tracer).spanBuilderWithExplicitParent(
+ eq("Attempt.package1.service2.method3"), same(spyClientSpan));
verify(spyClientSpanBuilder).setRecordEvents(eq(true));
verifyNoMoreInteractions(tracer);
assertTrue(headers.containsKey(censusTracing.tracingHeader));
@@ -831,7 +1058,7 @@ public void traceHeadersPropagateSpanContext() throws Exception {
method.getFullMethodName(), headers);
verify(mockTracingPropagationHandler).fromByteArray(same(binarySpanContext));
verify(tracer).spanBuilderWithRemoteParent(
- eq("Recv.package1.service2.method3"), same(spyClientSpan.getContext()));
+ eq("Recv.package1.service2.method3"), same(spyAttemptSpan.getContext()));
verify(spyServerSpanBuilder).setRecordEvents(eq(true));
Context filteredContext = serverTracer.filterContext(Context.ROOT);
@@ -840,7 +1067,7 @@ public void traceHeadersPropagateSpanContext() throws Exception {
@Test
public void traceHeaders_propagateSpanContext() throws Exception {
- CensusTracingModule.ClientCallTracer callTracer =
+ CallAttemptsTracerFactory callTracer =
censusTracing.newClientCallTracer(fakeClientParentSpan, method);
Metadata headers = new Metadata();
@@ -854,10 +1081,12 @@ public void traceHeaders_propagateSpanContext() throws Exception {
public void traceHeaders_missingCensusImpl_notPropagateSpanContext()
throws Exception {
reset(spyClientSpanBuilder);
+ reset(spyAttemptSpanBuilder);
when(spyClientSpanBuilder.startSpan()).thenReturn(BlankSpan.INSTANCE);
+ when(spyAttemptSpanBuilder.startSpan()).thenReturn(BlankSpan.INSTANCE);
Metadata headers = new Metadata();
- CensusTracingModule.ClientCallTracer callTracer =
+ CallAttemptsTracerFactory callTracer =
censusTracing.newClientCallTracer(BlankSpan.INSTANCE, method);
callTracer.newClientStreamTracer(STREAM_INFO, headers).streamCreated(Attributes.EMPTY, headers);
@@ -867,14 +1096,16 @@ public void traceHeaders_missingCensusImpl_notPropagateSpanContext()
@Test
public void traceHeaders_clientMissingCensusImpl_preservingHeaders() throws Exception {
reset(spyClientSpanBuilder);
+ reset(spyAttemptSpanBuilder);
when(spyClientSpanBuilder.startSpan()).thenReturn(BlankSpan.INSTANCE);
+ when(spyAttemptSpanBuilder.startSpan()).thenReturn(BlankSpan.INSTANCE);
Metadata headers = new Metadata();
headers.put(
Metadata.Key.of("never-used-key-bin", Metadata.BINARY_BYTE_MARSHALLER),
new byte[] {});
Set originalHeaderKeys = new HashSet<>(headers.keys());
- CensusTracingModule.ClientCallTracer callTracer =
+ CallAttemptsTracerFactory callTracer =
censusTracing.newClientCallTracer(BlankSpan.INSTANCE, method);
callTracer.newClientStreamTracer(STREAM_INFO, headers).streamCreated(Attributes.EMPTY, headers);
@@ -885,9 +1116,9 @@ public void traceHeaders_clientMissingCensusImpl_preservingHeaders() throws Exce
public void traceHeaderMalformed() throws Exception {
// As comparison, normal header parsing
Metadata headers = new Metadata();
- headers.put(censusTracing.tracingHeader, fakeClientSpanContext);
+ headers.put(censusTracing.tracingHeader, fakeAttemptSpanContext);
// mockTracingPropagationHandler was stubbed to always return fakeServerParentSpanContext
- assertSame(spyClientSpan.getContext(), headers.get(censusTracing.tracingHeader));
+ assertSame(spyAttemptSpan.getContext(), headers.get(censusTracing.tracingHeader));
// Make BinaryPropagationHandler always throw when parsing the header
when(mockTracingPropagationHandler.fromByteArray(any(byte[].class)))
@@ -895,7 +1126,7 @@ public void traceHeaderMalformed() throws Exception {
headers = new Metadata();
assertNull(headers.get(censusTracing.tracingHeader));
- headers.put(censusTracing.tracingHeader, fakeClientSpanContext);
+ headers.put(censusTracing.tracingHeader, fakeAttemptSpanContext);
assertSame(SpanContext.INVALID, headers.get(censusTracing.tracingHeader));
assertNotSame(spyClientSpan.getContext(), SpanContext.INVALID);
diff --git a/core/src/main/java/io/grpc/internal/ClientCallImpl.java b/core/src/main/java/io/grpc/internal/ClientCallImpl.java
index 28cd3351203..dd17244e2a5 100644
--- a/core/src/main/java/io/grpc/internal/ClientCallImpl.java
+++ b/core/src/main/java/io/grpc/internal/ClientCallImpl.java
@@ -255,7 +255,8 @@ public void runInContext() {
effectiveDeadline, context.getDeadline(), callOptions.getDeadline());
stream = clientStreamProvider.newStream(method, callOptions, headers, context);
} else {
- ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(callOptions, headers, false);
+ ClientStreamTracer[] tracers =
+ GrpcUtil.getClientStreamTracers(callOptions, headers, 0, false);
stream = new FailingClientStream(
DEADLINE_EXCEEDED.withDescription(
"ClientCall started after deadline exceeded: " + effectiveDeadline),
diff --git a/core/src/main/java/io/grpc/internal/GrpcUtil.java b/core/src/main/java/io/grpc/internal/GrpcUtil.java
index 0bb83a56d75..13fc786e383 100644
--- a/core/src/main/java/io/grpc/internal/GrpcUtil.java
+++ b/core/src/main/java/io/grpc/internal/GrpcUtil.java
@@ -757,11 +757,12 @@ public ListenableFuture getStats() {
/** Gets stream tracers based on CallOptions. */
public static ClientStreamTracer[] getClientStreamTracers(
- CallOptions callOptions, Metadata headers, boolean isTransparentRetry) {
+ CallOptions callOptions, Metadata headers, int previousAttempts, boolean isTransparentRetry) {
List factories = callOptions.getStreamTracerFactories();
ClientStreamTracer[] tracers = new ClientStreamTracer[factories.size() + 1];
StreamInfo streamInfo = StreamInfo.newBuilder()
.setCallOptions(callOptions)
+ .setPreviousAttempts(previousAttempts)
.setIsTransparentRetry(isTransparentRetry)
.build();
for (int i = 0; i < factories.size(); i++) {
diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
index 87162d9aba2..6cd5598e2a6 100644
--- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
+++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
@@ -533,7 +533,7 @@ public ClientStream newStream(
getTransport(new PickSubchannelArgsImpl(method, headers, callOptions));
Context origContext = context.attach();
ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
- callOptions, headers, /* isTransparentRetry= */ false);
+ callOptions, headers, 0, /* isTransparentRetry= */ false);
try {
return transport.newStream(method, headers, callOptions, tracers);
} finally {
@@ -572,10 +572,11 @@ void postCommit() {
@Override
ClientStream newSubstream(
- Metadata newHeaders, ClientStreamTracer.Factory factory, boolean isTransparentRetry) {
+ Metadata newHeaders, ClientStreamTracer.Factory factory, int previousAttempts,
+ boolean isTransparentRetry) {
CallOptions newOptions = callOptions.withStreamTracerFactory(factory);
- ClientStreamTracer[] tracers =
- GrpcUtil.getClientStreamTracers(newOptions, newHeaders, isTransparentRetry);
+ ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
+ newOptions, newHeaders, previousAttempts, isTransparentRetry);
ClientTransport transport =
getTransport(new PickSubchannelArgsImpl(method, newHeaders, newOptions));
Context origContext = context.attach();
@@ -624,7 +625,7 @@ ClientStream newSubstream(
channelLogger = new ChannelLoggerImpl(channelTracer, timeProvider);
ProxyDetector proxyDetector =
builder.proxyDetector != null ? builder.proxyDetector : GrpcUtil.DEFAULT_PROXY_DETECTOR;
- this.retryEnabled = builder.retryEnabled && !builder.temporarilyDisableRetry;
+ this.retryEnabled = builder.retryEnabled;
this.loadBalancerFactory = new AutoConfiguredLoadBalancerFactory(builder.defaultLbPolicy);
this.offloadExecutorHolder =
new ExecutorHolder(
diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImplBuilder.java b/core/src/main/java/io/grpc/internal/ManagedChannelImplBuilder.java
index d42b3832136..26c48fc8596 100644
--- a/core/src/main/java/io/grpc/internal/ManagedChannelImplBuilder.java
+++ b/core/src/main/java/io/grpc/internal/ManagedChannelImplBuilder.java
@@ -142,11 +142,7 @@ public static ManagedChannelBuilder> forTarget(String target) {
int maxHedgedAttempts = 5;
long retryBufferSize = DEFAULT_RETRY_BUFFER_SIZE_IN_BYTES;
long perRpcBufferLimit = DEFAULT_PER_RPC_BUFFER_LIMIT_IN_BYTES;
- boolean retryEnabled = false; // TODO(zdapeng): default to true
- // Temporarily disable retry when stats or tracing is enabled to avoid breakage, until we know
- // what should be the desired behavior for retry + stats/tracing.
- // TODO(zdapeng): delete me
- boolean temporarilyDisableRetry;
+ boolean retryEnabled = true;
InternalChannelz channelz = InternalChannelz.instance();
int maxTraceEvents;
@@ -460,8 +456,6 @@ public ManagedChannelImplBuilder disableRetry() {
@Override
public ManagedChannelImplBuilder enableRetry() {
retryEnabled = true;
- statsEnabled = false;
- tracingEnabled = false;
return this;
}
@@ -592,9 +586,6 @@ public void setStatsRecordRealTimeMetrics(boolean value) {
/**
* Disable or enable tracing features. Enabled by default.
- *
- * For the current release, calling {@code setTracingEnabled(true)} may have a side effect that
- * disables retry.
*/
public void setTracingEnabled(boolean value) {
tracingEnabled = value;
@@ -642,9 +633,7 @@ public ManagedChannel build() {
List getEffectiveInterceptors() {
List effectiveInterceptors =
new ArrayList<>(this.interceptors);
- temporarilyDisableRetry = false;
if (statsEnabled) {
- temporarilyDisableRetry = true;
ClientInterceptor statsInterceptor = null;
try {
Class> censusStatsAccessor =
@@ -679,7 +668,6 @@ List getEffectiveInterceptors() {
}
}
if (tracingEnabled) {
- temporarilyDisableRetry = true;
ClientInterceptor tracingInterceptor = null;
try {
Class> censusTracingAccessor =
diff --git a/core/src/main/java/io/grpc/internal/OobChannel.java b/core/src/main/java/io/grpc/internal/OobChannel.java
index b628842efe4..589824ae10e 100644
--- a/core/src/main/java/io/grpc/internal/OobChannel.java
+++ b/core/src/main/java/io/grpc/internal/OobChannel.java
@@ -88,7 +88,7 @@ final class OobChannel extends ManagedChannel implements InternalInstrumented method,
CallOptions callOptions, Metadata headers, Context context) {
ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
- callOptions, headers, /* isTransparentRetry= */ false);
+ callOptions, headers, 0, /* isTransparentRetry= */ false);
Context origContext = context.attach();
// delayed transport's newStream() always acquires a lock, but concurrent performance doesn't
// matter here because OOB communication should be sparse, and it's not on application RPC's
diff --git a/core/src/main/java/io/grpc/internal/RetriableStream.java b/core/src/main/java/io/grpc/internal/RetriableStream.java
index d19a260049b..1fb8d3c43bd 100644
--- a/core/src/main/java/io/grpc/internal/RetriableStream.java
+++ b/core/src/main/java/io/grpc/internal/RetriableStream.java
@@ -30,8 +30,10 @@
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
+import io.grpc.SynchronizationContext;
import io.grpc.internal.ClientStreamListener.RpcProgress;
import java.io.InputStream;
+import java.lang.Thread.UncaughtExceptionHandler;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -64,6 +66,16 @@ abstract class RetriableStream implements ClientStream {
private final MethodDescriptor method;
private final Executor callExecutor;
+ private final Executor listenerSerializeExecutor = new SynchronizationContext(
+ new UncaughtExceptionHandler() {
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ throw Status.fromThrowable(e)
+ .withDescription("Uncaught exception in the SynchronizationContext. Re-thrown.")
+ .asRuntimeException();
+ }
+ }
+ );
private final ScheduledExecutorService scheduledExecutorService;
// Must not modify it.
private final Metadata headers;
@@ -105,6 +117,7 @@ abstract class RetriableStream implements ClientStream {
private FutureCanceller scheduledHedging;
private long nextBackoffIntervalNanos;
private Status cancellationStatus;
+ private boolean isClosed;
RetriableStream(
MethodDescriptor method, Metadata headers,
@@ -218,7 +231,7 @@ public ClientStreamTracer newClientStreamTracer(
Metadata newHeaders = updateHeaders(headers, previousAttemptCount);
// NOTICE: This set _must_ be done before stream.start() and it actually is.
- sub.stream = newSubstream(newHeaders, tracerFactory, isTransparentRetry);
+ sub.stream = newSubstream(newHeaders, tracerFactory, previousAttemptCount, isTransparentRetry);
return sub;
}
@@ -227,7 +240,8 @@ public ClientStreamTracer newClientStreamTracer(
* Client stream is not yet started.
*/
abstract ClientStream newSubstream(
- Metadata headers, ClientStreamTracer.Factory tracerFactory, boolean isTransparentRetry);
+ Metadata headers, ClientStreamTracer.Factory tracerFactory, int previousAttempts,
+ boolean isTransparentRetry);
/** Adds grpc-previous-rpc-attempts in the headers of a retry/hedging RPC. */
@VisibleForTesting
@@ -246,6 +260,7 @@ private void drain(Substream substream) {
int chunk = 0x80;
List list = null;
boolean streamStarted = false;
+ Runnable onReadyRunnable = null;
while (true) {
State savedState;
@@ -263,7 +278,18 @@ private void drain(Substream substream) {
}
if (index == savedState.buffer.size()) { // I'm drained
state = savedState.substreamDrained(substream);
- return;
+ if (!isReady()) {
+ return;
+ }
+ onReadyRunnable = new Runnable() {
+ @Override
+ public void run() {
+ if (!isClosed) {
+ masterListener.onReady();
+ }
+ }
+ };
+ break;
}
if (substream.closed) {
@@ -298,6 +324,11 @@ private void drain(Substream substream) {
}
}
+ if (onReadyRunnable != null) {
+ listenerSerializeExecutor.execute(onReadyRunnable);
+ return;
+ }
+
substream.stream.cancel(
state.winningSubstream == substream ? cancellationStatus : CANCELLED_BECAUSE_COMMITTED);
}
@@ -449,14 +480,22 @@ public void run() {
}
@Override
- public final void cancel(Status reason) {
+ public final void cancel(final Status reason) {
Substream noopSubstream = new Substream(0 /* previousAttempts doesn't matter here */);
noopSubstream.stream = new NoopClientStream();
Runnable runnable = commit(noopSubstream);
if (runnable != null) {
- masterListener.closed(reason, RpcProgress.PROCESSED, new Metadata());
runnable.run();
+ listenerSerializeExecutor.execute(
+ new Runnable() {
+ @Override
+ public void run() {
+ isClosed = true;
+ masterListener.closed(reason, RpcProgress.PROCESSED, new Metadata());
+
+ }
+ });
return;
}
@@ -770,18 +809,25 @@ private final class Sublistener implements ClientStreamListener {
}
@Override
- public void headersRead(Metadata headers) {
+ public void headersRead(final Metadata headers) {
commitAndRun(substream);
if (state.winningSubstream == substream) {
- masterListener.headersRead(headers);
if (throttle != null) {
throttle.onSuccess();
}
+ listenerSerializeExecutor.execute(
+ new Runnable() {
+ @Override
+ public void run() {
+ masterListener.headersRead(headers);
+ }
+ });
}
}
@Override
- public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) {
+ public void closed(
+ final Status status, final RpcProgress rpcProgress, final Metadata trailers) {
synchronized (lock) {
state = state.substreamClosed(substream);
closedSubstreamsInsight.append(status.getCode());
@@ -792,7 +838,14 @@ public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) {
if (substream.bufferLimitExceeded) {
commitAndRun(substream);
if (state.winningSubstream == substream) {
- masterListener.closed(status, rpcProgress, trailers);
+ listenerSerializeExecutor.execute(
+ new Runnable() {
+ @Override
+ public void run() {
+ isClosed = true;
+ masterListener.closed(status, rpcProgress, trailers);
+ }
+ });
}
return;
}
@@ -869,24 +922,26 @@ public void run() {
synchronized (lock) {
scheduledRetry = scheduledRetryCopy = new FutureCanceller(lock);
}
- scheduledRetryCopy.setFuture(
- scheduledExecutorService.schedule(
+ class RetryBackoffRunnable implements Runnable {
+ @Override
+ public void run() {
+ callExecutor.execute(
new Runnable() {
@Override
public void run() {
- callExecutor.execute(
- new Runnable() {
- @Override
- public void run() {
- // retry
- Substream newSubstream = createSubstream(
- substream.previousAttemptCount + 1,
- false);
- drain(newSubstream);
- }
- });
+ // retry
+ Substream newSubstream = createSubstream(
+ substream.previousAttemptCount + 1,
+ false);
+ drain(newSubstream);
}
- },
+ });
+ }
+ }
+
+ scheduledRetryCopy.setFuture(
+ scheduledExecutorService.schedule(
+ new RetryBackoffRunnable(),
retryPlan.backoffNanos,
TimeUnit.NANOSECONDS));
return;
@@ -897,7 +952,14 @@ public void run() {
commitAndRun(substream);
if (state.winningSubstream == substream) {
- masterListener.closed(status, rpcProgress, trailers);
+ listenerSerializeExecutor.execute(
+ new Runnable() {
+ @Override
+ public void run() {
+ isClosed = true;
+ masterListener.closed(status, rpcProgress, trailers);
+ }
+ });
}
}
@@ -967,22 +1029,37 @@ private Integer getPushbackMills(Metadata trailer) {
}
@Override
- public void messagesAvailable(MessageProducer producer) {
+ public void messagesAvailable(final MessageProducer producer) {
State savedState = state;
checkState(
savedState.winningSubstream != null, "Headers should be received prior to messages.");
if (savedState.winningSubstream != substream) {
return;
}
- masterListener.messagesAvailable(producer);
+ listenerSerializeExecutor.execute(
+ new Runnable() {
+ @Override
+ public void run() {
+ masterListener.messagesAvailable(producer);
+ }
+ });
}
@Override
public void onReady() {
// FIXME(#7089): hedging case is broken.
- // TODO(zdapeng): optimization: if the substream is not drained yet, delay onReady() once
- // drained and if is still ready.
- masterListener.onReady();
+ if (!isReady()) {
+ return;
+ }
+ listenerSerializeExecutor.execute(
+ new Runnable() {
+ @Override
+ public void run() {
+ if (!isClosed) {
+ masterListener.onReady();
+ }
+ }
+ });
}
}
diff --git a/core/src/main/java/io/grpc/internal/SubchannelChannel.java b/core/src/main/java/io/grpc/internal/SubchannelChannel.java
index 1380a6bc716..a1d454ed2fb 100644
--- a/core/src/main/java/io/grpc/internal/SubchannelChannel.java
+++ b/core/src/main/java/io/grpc/internal/SubchannelChannel.java
@@ -59,7 +59,7 @@ public ClientStream newStream(MethodDescriptor, ?> method,
transport = notReadyTransport;
}
ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
- callOptions, headers, /* isTransparentRetry= */ false);
+ callOptions, headers, 0, /* isTransparentRetry= */ false);
Context origContext = context.attach();
try {
return transport.newStream(method, headers, callOptions, tracers);
diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
index ccfb5f074c5..668411d7ecc 100644
--- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
+++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
@@ -355,6 +355,7 @@ public void close() throws SecurityException {
channelBuilder = new ManagedChannelImplBuilder(TARGET,
new UnsupportedClientTransportFactoryBuilder(), new FixedPortProvider(DEFAULT_PORT));
+ channelBuilder.disableRetry();
configureBuilder(channelBuilder);
}
@@ -1881,6 +1882,7 @@ public void oobChannelHasNoChannelCallCredentials() {
TARGET, InsecureChannelCredentials.create(),
new FakeCallCredentials(metadataKey, channelCredValue),
new UnsupportedClientTransportFactoryBuilder(), new FixedPortProvider(DEFAULT_PORT));
+ channelBuilder.disableRetry();
configureBuilder(channelBuilder);
createChannel();
@@ -1933,6 +1935,7 @@ public void oobChannelHasNoChannelCallCredentials() {
new FakeNameResolverFactory.Builder(URI.create("oobauthority")).build())
.defaultLoadBalancingPolicy(MOCK_POLICY_NAME)
.idleTimeout(ManagedChannelImplBuilder.IDLE_MODE_MAX_TIMEOUT_DAYS, TimeUnit.DAYS)
+ .disableRetry() // irrelevant to what we test, disable retry to make verification easy
.build();
oob.getState(true);
ArgumentCaptor helperCaptor = ArgumentCaptor.forClass(Helper.class);
@@ -1980,6 +1983,7 @@ public SwapChannelCredentialsResult answer(InvocationOnMock invocation) {
TARGET, InsecureChannelCredentials.create(),
new FakeCallCredentials(metadataKey, channelCredValue),
new UnsupportedClientTransportFactoryBuilder(), new FixedPortProvider(DEFAULT_PORT));
+ channelBuilder.disableRetry();
configureBuilder(channelBuilder);
createChannel();
@@ -2017,6 +2021,7 @@ public SwapChannelCredentialsResult answer(InvocationOnMock invocation) {
new FakeNameResolverFactory.Builder(URI.create("fake://oobauthority/")).build())
.defaultLoadBalancingPolicy(MOCK_POLICY_NAME)
.idleTimeout(ManagedChannelImplBuilder.IDLE_MODE_MAX_TIMEOUT_DAYS, TimeUnit.DAYS)
+ .disableRetry() // irrelevant to what we test, disable retry to make verification easy
.build();
oob.getState(true);
ArgumentCaptor helperCaptor = ArgumentCaptor.forClass(Helper.class);
diff --git a/core/src/test/java/io/grpc/internal/RetriableStreamTest.java b/core/src/test/java/io/grpc/internal/RetriableStreamTest.java
index 95d2c2ba8b5..8b851573b21 100644
--- a/core/src/test/java/io/grpc/internal/RetriableStreamTest.java
+++ b/core/src/test/java/io/grpc/internal/RetriableStreamTest.java
@@ -164,7 +164,8 @@ void postCommit() {
@Override
ClientStream newSubstream(
- Metadata metadata, ClientStreamTracer.Factory tracerFactory, boolean isTransparentRetry) {
+ Metadata metadata, ClientStreamTracer.Factory tracerFactory, int previousAttempts,
+ boolean isTransparentRetry) {
bufferSizeTracer =
tracerFactory.newClientStreamTracer(STREAM_INFO, metadata);
int actualPreviousRpcAttemptsInHeader = metadata.get(GRPC_PREVIOUS_RPC_ATTEMPTS) == null
@@ -255,6 +256,7 @@ public Void answer(InvocationOnMock in) {
ArgumentCaptor sublistenerCaptor1 =
ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream1).start(sublistenerCaptor1.capture());
+ inOrder.verify(mockStream1).isReady();
inOrder.verifyNoMoreInteractions();
retriableStream.sendMessage("msg1");
@@ -307,6 +309,7 @@ public Void answer(InvocationOnMock in) {
inOrder.verify(mockStream2).writeMessage(any(InputStream.class));
inOrder.verify(mockStream2).request(456);
inOrder.verify(mockStream2, times(2)).writeMessage(any(InputStream.class));
+ inOrder.verify(mockStream2).isReady();
inOrder.verifyNoMoreInteractions();
// send more messages
@@ -355,6 +358,7 @@ public Void answer(InvocationOnMock in) {
inOrder.verify(mockStream3).writeMessage(any(InputStream.class));
inOrder.verify(mockStream3).request(456);
inOrder.verify(mockStream3, times(7)).writeMessage(any(InputStream.class));
+ inOrder.verify(mockStream3).isReady();
inOrder.verifyNoMoreInteractions();
InsightBuilder insight = new InsightBuilder();
@@ -636,6 +640,7 @@ public void retry_cancelWhileBackoff() {
ArgumentCaptor sublistenerCaptor1 =
ArgumentCaptor.forClass(ClientStreamListener.class);
verify(mockStream1).start(sublistenerCaptor1.capture());
+ verify(mockStream1).isReady();
// retry
ClientStream mockStream2 = mock(ClientStream.class);
@@ -655,7 +660,7 @@ public void retry_cancelWhileBackoff() {
@Test
public void operationsWhileDraining() {
- ArgumentCaptor sublistenerCaptor1 =
+ final ArgumentCaptor sublistenerCaptor1 =
ArgumentCaptor.forClass(ClientStreamListener.class);
final AtomicReference sublistenerCaptor2 =
new AtomicReference<>();
@@ -668,10 +673,16 @@ public void operationsWhileDraining() {
@Override
public void request(int numMessages) {
retriableStream.sendMessage("substream1 request " + numMessages);
+ sublistenerCaptor1.getValue().onReady();
if (numMessages > 1) {
retriableStream.request(--numMessages);
}
}
+
+ @Override
+ public boolean isReady() {
+ return true;
+ }
}));
final ClientStream mockStream2 =
@@ -687,7 +698,7 @@ public void start(ClientStreamListener listener) {
@Override
public void request(int numMessages) {
retriableStream.sendMessage("substream2 request " + numMessages);
-
+ sublistenerCaptor2.get().onReady();
if (numMessages == 3) {
sublistenerCaptor2.get().headersRead(new Metadata());
}
@@ -698,9 +709,14 @@ public void request(int numMessages) {
retriableStream.cancel(cancelStatus);
}
}
+
+ @Override
+ public boolean isReady() {
+ return true;
+ }
}));
- InOrder inOrder = inOrder(retriableStreamRecorder, mockStream1, mockStream2);
+ InOrder inOrder = inOrder(retriableStreamRecorder, mockStream1, mockStream2, masterListener);
doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0);
retriableStream.start(masterListener);
@@ -715,6 +731,7 @@ public void request(int numMessages) {
inOrder.verify(mockStream1).writeMessage(any(InputStream.class)); // msg "substream1 request 2"
inOrder.verify(mockStream1).request(1);
inOrder.verify(mockStream1).writeMessage(any(InputStream.class)); // msg "substream1 request 1"
+ inOrder.verify(masterListener).onReady();
// retry
doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1);
@@ -742,8 +759,8 @@ public void request(int numMessages) {
// msg "substream2 request 2"
inOrder.verify(mockStream2, times(2)).writeMessage(any(InputStream.class));
inOrder.verify(mockStream2).request(100);
-
- verify(mockStream2).cancel(cancelStatus);
+ inOrder.verify(mockStream2).cancel(cancelStatus);
+ inOrder.verify(masterListener, never()).onReady();
// "substream2 request 1" will never be sent
inOrder.verify(mockStream2, never()).writeMessage(any(InputStream.class));
@@ -1072,6 +1089,7 @@ public void perRpcBufferLimitExceededDuringBackoff() {
ArgumentCaptor sublistenerCaptor1 =
ArgumentCaptor.forClass(ClientStreamListener.class);
verify(mockStream1).start(sublistenerCaptor1.capture());
+ verify(mockStream1).isReady();
bufferSizeTracer.outboundWireSize(PER_RPC_BUFFER_LIMIT - 1);
@@ -1088,6 +1106,7 @@ public void perRpcBufferLimitExceededDuringBackoff() {
fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS);
verify(mockStream2).start(any(ClientStreamListener.class));
+ verify(mockStream2).isReady();
// bufferLimitExceeded
bufferSizeTracer.outboundWireSize(PER_RPC_BUFFER_LIMIT - 1);
@@ -1151,6 +1170,7 @@ public void expBackoff_maxBackoff_maxRetryAttempts() {
ArgumentCaptor sublistenerCaptor1 =
ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream1).start(sublistenerCaptor1.capture());
+ inOrder.verify(mockStream1).isReady();
inOrder.verifyNoMoreInteractions();
@@ -1166,6 +1186,7 @@ public void expBackoff_maxBackoff_maxRetryAttempts() {
ArgumentCaptor sublistenerCaptor2 =
ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream2).start(sublistenerCaptor2.capture());
+ inOrder.verify(mockStream2).isReady();
inOrder.verifyNoMoreInteractions();
// retry2
@@ -1182,6 +1203,7 @@ public void expBackoff_maxBackoff_maxRetryAttempts() {
ArgumentCaptor sublistenerCaptor3 =
ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream3).start(sublistenerCaptor3.capture());
+ inOrder.verify(mockStream3).isReady();
inOrder.verifyNoMoreInteractions();
// retry3
@@ -1199,6 +1221,7 @@ public void expBackoff_maxBackoff_maxRetryAttempts() {
ArgumentCaptor sublistenerCaptor4 =
ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream4).start(sublistenerCaptor4.capture());
+ inOrder.verify(mockStream4).isReady();
inOrder.verifyNoMoreInteractions();
// retry4
@@ -1213,6 +1236,7 @@ public void expBackoff_maxBackoff_maxRetryAttempts() {
ArgumentCaptor sublistenerCaptor5 =
ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream5).start(sublistenerCaptor5.capture());
+ inOrder.verify(mockStream5).isReady();
inOrder.verifyNoMoreInteractions();
// retry5
@@ -1227,6 +1251,7 @@ public void expBackoff_maxBackoff_maxRetryAttempts() {
ArgumentCaptor sublistenerCaptor6 =
ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream6).start(sublistenerCaptor6.capture());
+ inOrder.verify(mockStream6).isReady();
inOrder.verifyNoMoreInteractions();
// can not retry any more
@@ -1257,6 +1282,7 @@ public void pushback() {
ArgumentCaptor sublistenerCaptor1 =
ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream1).start(sublistenerCaptor1.capture());
+ inOrder.verify(mockStream1).isReady();
inOrder.verifyNoMoreInteractions();
@@ -1275,6 +1301,7 @@ public void pushback() {
ArgumentCaptor sublistenerCaptor2 =
ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream2).start(sublistenerCaptor2.capture());
+ inOrder.verify(mockStream2).isReady();
inOrder.verifyNoMoreInteractions();
// retry2
@@ -1292,6 +1319,7 @@ public void pushback() {
ArgumentCaptor sublistenerCaptor3 =
ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream3).start(sublistenerCaptor3.capture());
+ inOrder.verify(mockStream3).isReady();
inOrder.verifyNoMoreInteractions();
// retry3
@@ -1306,6 +1334,7 @@ public void pushback() {
ArgumentCaptor sublistenerCaptor4 =
ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream4).start(sublistenerCaptor4.capture());
+ inOrder.verify(mockStream4).isReady();
inOrder.verifyNoMoreInteractions();
// retry4
@@ -1322,6 +1351,7 @@ public void pushback() {
ArgumentCaptor sublistenerCaptor5 =
ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream5).start(sublistenerCaptor5.capture());
+ inOrder.verify(mockStream5).isReady();
inOrder.verifyNoMoreInteractions();
// retry5
@@ -1339,6 +1369,7 @@ public void pushback() {
ArgumentCaptor sublistenerCaptor6 =
ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream6).start(sublistenerCaptor6.capture());
+ inOrder.verify(mockStream6).isReady();
inOrder.verifyNoMoreInteractions();
// can not retry any more even pushback is positive
@@ -1596,6 +1627,7 @@ public void transparentRetry() {
ArgumentCaptor sublistenerCaptor1 =
ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream1).start(sublistenerCaptor1.capture());
+ inOrder.verify(mockStream1).isReady();
inOrder.verifyNoMoreInteractions();
// transparent retry
@@ -1607,6 +1639,7 @@ public void transparentRetry() {
ArgumentCaptor sublistenerCaptor2 =
ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream2).start(sublistenerCaptor2.capture());
+ inOrder.verify(mockStream2).isReady();
inOrder.verifyNoMoreInteractions();
verify(retriableStreamRecorder, never()).postCommit();
assertEquals(0, fakeClock.numPendingTasks());
@@ -1622,6 +1655,7 @@ public void transparentRetry() {
ArgumentCaptor sublistenerCaptor3 =
ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream3).start(sublistenerCaptor3.capture());
+ inOrder.verify(mockStream3).isReady();
inOrder.verifyNoMoreInteractions();
verify(retriableStreamRecorder, never()).postCommit();
assertEquals(0, fakeClock.numPendingTasks());
@@ -1644,6 +1678,7 @@ public void normalRetry_thenNoTransparentRetry_butNormalRetry() {
ArgumentCaptor sublistenerCaptor1 =
ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream1).start(sublistenerCaptor1.capture());
+ inOrder.verify(mockStream1).isReady();
inOrder.verifyNoMoreInteractions();
// normal retry
@@ -1657,6 +1692,7 @@ public void normalRetry_thenNoTransparentRetry_butNormalRetry() {
ArgumentCaptor sublistenerCaptor2 =
ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream2).start(sublistenerCaptor2.capture());
+ inOrder.verify(mockStream2).isReady();
inOrder.verifyNoMoreInteractions();
verify(retriableStreamRecorder, never()).postCommit();
assertEquals(0, fakeClock.numPendingTasks());
@@ -1673,6 +1709,7 @@ public void normalRetry_thenNoTransparentRetry_butNormalRetry() {
ArgumentCaptor sublistenerCaptor3 =
ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream3).start(sublistenerCaptor3.capture());
+ inOrder.verify(mockStream3).isReady();
inOrder.verifyNoMoreInteractions();
verify(retriableStreamRecorder, never()).postCommit();
}
@@ -1694,6 +1731,7 @@ public void normalRetry_thenNoTransparentRetry_andNoMoreRetry() {
ArgumentCaptor sublistenerCaptor1 =
ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream1).start(sublistenerCaptor1.capture());
+ inOrder.verify(mockStream1).isReady();
inOrder.verifyNoMoreInteractions();
// normal retry
@@ -1707,6 +1745,7 @@ public void normalRetry_thenNoTransparentRetry_andNoMoreRetry() {
ArgumentCaptor sublistenerCaptor2 =
ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream2).start(sublistenerCaptor2.capture());
+ inOrder.verify(mockStream2).isReady();
inOrder.verifyNoMoreInteractions();
verify(retriableStreamRecorder, never()).postCommit();
assertEquals(0, fakeClock.numPendingTasks());
@@ -1737,6 +1776,7 @@ method, new Metadata(), channelBufferUsed, PER_RPC_BUFFER_LIMIT, CHANNEL_BUFFER_
ArgumentCaptor sublistenerCaptor1 =
ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream1).start(sublistenerCaptor1.capture());
+ inOrder.verify(mockStream1).isReady();
inOrder.verifyNoMoreInteractions();
// transparent retry
@@ -1749,6 +1789,7 @@ method, new Metadata(), channelBufferUsed, PER_RPC_BUFFER_LIMIT, CHANNEL_BUFFER_
ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(retriableStreamRecorder).postCommit();
inOrder.verify(mockStream2).start(sublistenerCaptor2.capture());
+ inOrder.verify(mockStream2).isReady();
inOrder.verifyNoMoreInteractions();
assertEquals(0, fakeClock.numPendingTasks());
}
@@ -1767,6 +1808,7 @@ public void droppedShouldNeverRetry() {
ArgumentCaptor sublistenerCaptor1 =
ArgumentCaptor.forClass(ClientStreamListener.class);
verify(mockStream1).start(sublistenerCaptor1.capture());
+ verify(mockStream1).isReady();
// drop and verify no retry
Status status = Status.fromCode(RETRIABLE_STATUS_CODE_1);
@@ -1838,6 +1880,7 @@ public Void answer(InvocationOnMock in) {
ArgumentCaptor sublistenerCaptor1 =
ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream1).start(sublistenerCaptor1.capture());
+ inOrder.verify(mockStream1).isReady();
inOrder.verifyNoMoreInteractions();
hedgingStream.sendMessage("msg1");
@@ -1879,6 +1922,8 @@ public Void answer(InvocationOnMock in) {
inOrder.verify(mockStream2, times(2)).flush();
inOrder.verify(mockStream2).writeMessage(any(InputStream.class));
inOrder.verify(mockStream2).request(456);
+ inOrder.verify(mockStream1).isReady();
+ inOrder.verify(mockStream2).isReady();
inOrder.verifyNoMoreInteractions();
// send more messages
@@ -1916,6 +1961,9 @@ public Void answer(InvocationOnMock in) {
inOrder.verify(mockStream3).writeMessage(any(InputStream.class));
inOrder.verify(mockStream3).request(456);
inOrder.verify(mockStream3, times(2)).writeMessage(any(InputStream.class));
+ inOrder.verify(mockStream1).isReady();
+ inOrder.verify(mockStream2).isReady();
+ inOrder.verify(mockStream3).isReady();
inOrder.verifyNoMoreInteractions();
// send one more message
@@ -1958,6 +2006,9 @@ public Void answer(InvocationOnMock in) {
inOrder.verify(mockStream4).writeMessage(any(InputStream.class));
inOrder.verify(mockStream4).request(456);
inOrder.verify(mockStream4, times(4)).writeMessage(any(InputStream.class));
+ inOrder.verify(mockStream1).isReady();
+ inOrder.verify(mockStream2).isReady();
+ inOrder.verify(mockStream4).isReady();
inOrder.verifyNoMoreInteractions();
InsightBuilder insight = new InsightBuilder();
@@ -2008,6 +2059,7 @@ public void hedging_maxAttempts() {
ArgumentCaptor sublistenerCaptor1 =
ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream1).start(sublistenerCaptor1.capture());
+ inOrder.verify(mockStream1).isReady();
inOrder.verifyNoMoreInteractions();
fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS);
@@ -2015,6 +2067,7 @@ public void hedging_maxAttempts() {
ArgumentCaptor sublistenerCaptor2 =
ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream2).start(sublistenerCaptor2.capture());
+ inOrder.verify(mockStream2).isReady();
inOrder.verifyNoMoreInteractions();
fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS);
@@ -2022,6 +2075,7 @@ public void hedging_maxAttempts() {
ArgumentCaptor sublistenerCaptor3 =
ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream3).start(sublistenerCaptor3.capture());
+ inOrder.verify(mockStream3).isReady();
inOrder.verifyNoMoreInteractions();
fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS);
@@ -2029,6 +2083,7 @@ public void hedging_maxAttempts() {
ArgumentCaptor sublistenerCaptor4 =
ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream4).start(sublistenerCaptor4.capture());
+ inOrder.verify(mockStream4).isReady();
inOrder.verifyNoMoreInteractions();
// a random one of the hedges fails
@@ -2040,6 +2095,7 @@ public void hedging_maxAttempts() {
ArgumentCaptor sublistenerCaptor5 =
ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream5).start(sublistenerCaptor5.capture());
+ inOrder.verify(mockStream5).isReady();
inOrder.verifyNoMoreInteractions();
fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS);
@@ -2047,6 +2103,7 @@ public void hedging_maxAttempts() {
ArgumentCaptor sublistenerCaptor6 =
ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream6).start(sublistenerCaptor6.capture());
+ inOrder.verify(mockStream6).isReady();
inOrder.verifyNoMoreInteractions();
fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS);
@@ -2091,6 +2148,7 @@ public void hedging_receiveHeaders() {
ArgumentCaptor sublistenerCaptor1 =
ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream1).start(sublistenerCaptor1.capture());
+ inOrder.verify(mockStream1).isReady();
inOrder.verifyNoMoreInteractions();
fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS);
@@ -2098,6 +2156,7 @@ public void hedging_receiveHeaders() {
ArgumentCaptor sublistenerCaptor2 =
ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream2).start(sublistenerCaptor2.capture());
+ inOrder.verify(mockStream2).isReady();
inOrder.verifyNoMoreInteractions();
fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS);
@@ -2105,6 +2164,7 @@ public void hedging_receiveHeaders() {
ArgumentCaptor sublistenerCaptor3 =
ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream3).start(sublistenerCaptor3.capture());
+ inOrder.verify(mockStream3).isReady();
inOrder.verifyNoMoreInteractions();
// a random one of the hedges receives headers
@@ -2142,6 +2202,7 @@ public void hedging_pushback_negative() {
ArgumentCaptor sublistenerCaptor1 =
ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream1).start(sublistenerCaptor1.capture());
+ inOrder.verify(mockStream1).isReady();
inOrder.verifyNoMoreInteractions();
fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS);
@@ -2149,6 +2210,7 @@ public void hedging_pushback_negative() {
ArgumentCaptor sublistenerCaptor2 =
ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream2).start(sublistenerCaptor2.capture());
+ inOrder.verify(mockStream2).isReady();
inOrder.verifyNoMoreInteractions();
fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS);
@@ -2156,6 +2218,7 @@ public void hedging_pushback_negative() {
ArgumentCaptor sublistenerCaptor3 =
ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream3).start(sublistenerCaptor3.capture());
+ inOrder.verify(mockStream3).isReady();
inOrder.verifyNoMoreInteractions();
// a random one of the hedges receives a negative pushback
@@ -2187,6 +2250,7 @@ public void hedging_pushback_positive() {
ArgumentCaptor sublistenerCaptor1 =
ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream1).start(sublistenerCaptor1.capture());
+ inOrder.verify(mockStream1).isReady();
inOrder.verifyNoMoreInteractions();
fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS);
@@ -2194,6 +2258,7 @@ public void hedging_pushback_positive() {
ArgumentCaptor sublistenerCaptor2 =
ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream2).start(sublistenerCaptor2.capture());
+ inOrder.verify(mockStream2).isReady();
inOrder.verifyNoMoreInteractions();
@@ -2211,6 +2276,7 @@ public void hedging_pushback_positive() {
ArgumentCaptor sublistenerCaptor3 =
ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream3).start(sublistenerCaptor3.capture());
+ inOrder.verify(mockStream3).isReady();
inOrder.verifyNoMoreInteractions();
// hedge2 receives a pushback for HEDGING_DELAY_IN_SECONDS - 1 second
@@ -2224,6 +2290,7 @@ public void hedging_pushback_positive() {
ArgumentCaptor sublistenerCaptor4 =
ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream4).start(sublistenerCaptor4.capture());
+ inOrder.verify(mockStream4).isReady();
inOrder.verifyNoMoreInteractions();
// commit
@@ -2253,6 +2320,7 @@ public void hedging_cancelled() {
ArgumentCaptor sublistenerCaptor1 =
ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream1).start(sublistenerCaptor1.capture());
+ inOrder.verify(mockStream1).isReady();
inOrder.verifyNoMoreInteractions();
fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS);
@@ -2260,6 +2328,8 @@ public void hedging_cancelled() {
ArgumentCaptor sublistenerCaptor2 =
ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream2).start(sublistenerCaptor2.capture());
+ inOrder.verify(mockStream1).isReady();
+ inOrder.verify(mockStream2).isReady();
inOrder.verifyNoMoreInteractions();
Status status = Status.CANCELLED.withDescription("cancelled");
@@ -2274,6 +2344,8 @@ public void hedging_cancelled() {
assertEquals(CANCELLED_BECAUSE_COMMITTED, statusCaptor.getValue().getDescription());
inOrder.verify(retriableStreamRecorder).postCommit();
+ inOrder.verify(masterListener).closed(
+ any(Status.class), any(RpcProgress.class), any(Metadata.class));
inOrder.verifyNoMoreInteractions();
}
@@ -2288,6 +2360,7 @@ public void hedging_perRpcBufferLimitExceeded() {
ArgumentCaptor sublistenerCaptor1 =
ArgumentCaptor.forClass(ClientStreamListener.class);
verify(mockStream1).start(sublistenerCaptor1.capture());
+ verify(mockStream1).isReady();
ClientStreamTracer bufferSizeTracer1 = bufferSizeTracer;
bufferSizeTracer1.outboundWireSize(PER_RPC_BUFFER_LIMIT - 1);
@@ -2296,6 +2369,8 @@ public void hedging_perRpcBufferLimitExceeded() {
ArgumentCaptor sublistenerCaptor2 =
ArgumentCaptor.forClass(ClientStreamListener.class);
verify(mockStream2).start(sublistenerCaptor2.capture());
+ verify(mockStream1, times(2)).isReady();
+ verify(mockStream2).isReady();
ClientStreamTracer bufferSizeTracer2 = bufferSizeTracer;
bufferSizeTracer2.outboundWireSize(PER_RPC_BUFFER_LIMIT - 1);
@@ -2312,6 +2387,7 @@ public void hedging_perRpcBufferLimitExceeded() {
verify(retriableStreamRecorder).postCommit();
verifyNoMoreInteractions(mockStream1);
+ verify(mockStream2).isReady();
verifyNoMoreInteractions(mockStream2);
}
@@ -2326,6 +2402,7 @@ public void hedging_channelBufferLimitExceeded() {
ArgumentCaptor sublistenerCaptor1 =
ArgumentCaptor.forClass(ClientStreamListener.class);
verify(mockStream1).start(sublistenerCaptor1.capture());
+ verify(mockStream1).isReady();
ClientStreamTracer bufferSizeTracer1 = bufferSizeTracer;
bufferSizeTracer1.outboundWireSize(100);
@@ -2334,6 +2411,8 @@ public void hedging_channelBufferLimitExceeded() {
ArgumentCaptor sublistenerCaptor2 =
ArgumentCaptor.forClass(ClientStreamListener.class);
verify(mockStream2).start(sublistenerCaptor2.capture());
+ verify(mockStream1, times(2)).isReady();
+ verify(mockStream2).isReady();
ClientStreamTracer bufferSizeTracer2 = bufferSizeTracer;
bufferSizeTracer2.outboundWireSize(100);
diff --git a/interop-testing/build.gradle b/interop-testing/build.gradle
index 79aa5356ecd..852d5882cce 100644
--- a/interop-testing/build.gradle
+++ b/interop-testing/build.gradle
@@ -43,6 +43,8 @@ dependencies {
libraries.netty_tcnative,
project(':grpc-grpclb')
testImplementation project(':grpc-context').sourceSets.test.output,
+ project(':grpc-api').sourceSets.test.output,
+ project(':grpc-core').sourceSets.test.output,
libraries.mockito
alpnagent libraries.jetty_alpn_agent
}
diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java
index 1b447a63c32..be3e759e2d0 100644
--- a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java
+++ b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java
@@ -92,6 +92,9 @@
import io.grpc.testing.integration.Messages.StreamingOutputCallRequest;
import io.grpc.testing.integration.Messages.StreamingOutputCallResponse;
import io.opencensus.contrib.grpc.metrics.RpcMeasureConstants;
+import io.opencensus.stats.Measure;
+import io.opencensus.stats.Measure.MeasureDouble;
+import io.opencensus.stats.Measure.MeasureLong;
import io.opencensus.tags.TagKey;
import io.opencensus.tags.TagValue;
import io.opencensus.trace.Span;
@@ -152,6 +155,15 @@ public abstract class AbstractInteropTest {
* SETTINGS/WINDOW_UPDATE exchange.
*/
public static final int TEST_FLOW_CONTROL_WINDOW = 65 * 1024;
+ private static final MeasureLong RETRIES_PER_CALL =
+ Measure.MeasureLong.create(
+ "grpc.io/client/retries_per_call", "Number of retries per call", "1");
+ private static final MeasureLong TRANSPARENT_RETRIES_PER_CALL =
+ Measure.MeasureLong.create(
+ "grpc.io/client/transparent_retries_per_call", "Transparent retries per call", "1");
+ private static final MeasureDouble RETRY_DELAY_PER_CALL =
+ Measure.MeasureDouble.create(
+ "grpc.io/client/retry_delay_per_call", "Retry delay per call", "ms");
private static final FakeTagger tagger = new FakeTagger();
private static final FakeTagContextBinarySerializer tagContextBinarySerializer =
@@ -1236,6 +1248,7 @@ public void deadlineInPast() throws Exception {
checkEndTags(
clientEndRecord, "grpc.testing.TestService/EmptyCall",
Status.DEADLINE_EXCEEDED.getCode(), true);
+ assertZeroRetryRecorded();
}
// warm up the channel
@@ -1245,6 +1258,7 @@ public void deadlineInPast() throws Exception {
clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
// clientEndRecord
clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
+ assertZeroRetryRecorded();
}
try {
blockingStub
@@ -1263,6 +1277,7 @@ public void deadlineInPast() throws Exception {
checkEndTags(
clientEndRecord, "grpc.testing.TestService/EmptyCall",
Status.DEADLINE_EXCEEDED.getCode(), true);
+ assertZeroRetryRecorded();
}
}
@@ -1980,6 +1995,13 @@ private void assertStatsTrace(String method, Status.Code status) {
assertStatsTrace(method, status, null, null);
}
+ private void assertZeroRetryRecorded() {
+ MetricsRecord retryRecord = clientStatsRecorder.pollRecord();
+ assertThat(retryRecord.getMetric(RETRIES_PER_CALL)).isEqualTo(0);
+ assertThat(retryRecord.getMetric(TRANSPARENT_RETRIES_PER_CALL)).isEqualTo(0);
+ assertThat(retryRecord.getMetric(RETRY_DELAY_PER_CALL)).isEqualTo(0D);
+ }
+
private void assertClientStatsTrace(String method, Status.Code code,
Collection extends MessageLite> requests, Collection extends MessageLite> responses) {
// Tracer-based stats
@@ -2009,6 +2031,7 @@ private void assertClientStatsTrace(String method, Status.Code code,
if (requests != null && responses != null) {
checkCensus(clientEndRecord, false, requests, responses);
}
+ assertZeroRetryRecorded();
}
}
diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/RetryTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/RetryTest.java
new file mode 100644
index 00000000000..bdf39e8546a
--- /dev/null
+++ b/interop-testing/src/test/java/io/grpc/testing/integration/RetryTest.java
@@ -0,0 +1,450 @@
+/*
+ * Copyright 2021 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.testing.integration;
+
+import static com.google.common.truth.Truth.assertThat;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+
+import com.google.common.collect.ImmutableMap;
+import io.grpc.Attributes;
+import io.grpc.CallOptions;
+import io.grpc.ClientCall;
+import io.grpc.ClientInterceptor;
+import io.grpc.ClientStreamTracer;
+import io.grpc.ClientStreamTracer.StreamInfo;
+import io.grpc.Deadline;
+import io.grpc.Deadline.Ticker;
+import io.grpc.IntegerMarshaller;
+import io.grpc.ManagedChannel;
+import io.grpc.Metadata;
+import io.grpc.MethodDescriptor;
+import io.grpc.MethodDescriptor.MethodType;
+import io.grpc.Server;
+import io.grpc.ServerCall;
+import io.grpc.ServerCall.Listener;
+import io.grpc.ServerCallHandler;
+import io.grpc.ServerMethodDefinition;
+import io.grpc.ServerServiceDefinition;
+import io.grpc.Status;
+import io.grpc.Status.Code;
+import io.grpc.StringMarshaller;
+import io.grpc.census.InternalCensusStatsAccessor;
+import io.grpc.census.internal.DeprecatedCensusConstants;
+import io.grpc.internal.FakeClock;
+import io.grpc.internal.testing.StatsTestUtils.FakeStatsRecorder;
+import io.grpc.internal.testing.StatsTestUtils.FakeTagContextBinarySerializer;
+import io.grpc.internal.testing.StatsTestUtils.FakeTagger;
+import io.grpc.internal.testing.StatsTestUtils.MetricsRecord;
+import io.grpc.netty.NettyChannelBuilder;
+import io.grpc.netty.NettyServerBuilder;
+import io.grpc.testing.GrpcCleanupRule;
+import io.netty.channel.DefaultEventLoopGroup;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.local.LocalAddress;
+import io.netty.channel.local.LocalChannel;
+import io.netty.channel.local.LocalServerChannel;
+import io.netty.util.concurrent.ScheduledFuture;
+import io.opencensus.contrib.grpc.metrics.RpcMeasureConstants;
+import io.opencensus.stats.Measure;
+import io.opencensus.stats.Measure.MeasureDouble;
+import io.opencensus.stats.Measure.MeasureLong;
+import io.opencensus.tags.TagValue;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+
+@RunWith(JUnit4.class)
+public class RetryTest {
+ private static final FakeTagger tagger = new FakeTagger();
+ private static final FakeTagContextBinarySerializer tagContextBinarySerializer =
+ new FakeTagContextBinarySerializer();
+ private static final MeasureLong RETRIES_PER_CALL =
+ Measure.MeasureLong.create(
+ "grpc.io/client/retries_per_call", "Number of retries per call", "1");
+ private static final MeasureLong TRANSPARENT_RETRIES_PER_CALL =
+ Measure.MeasureLong.create(
+ "grpc.io/client/transparent_retries_per_call", "Transparent retries per call", "1");
+ private static final MeasureDouble RETRY_DELAY_PER_CALL =
+ Measure.MeasureDouble.create(
+ "grpc.io/client/retry_delay_per_call", "Retry delay per call", "ms");
+
+ @Rule
+ public final MockitoRule mocks = MockitoJUnit.rule();
+ @Rule
+ public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule();
+ private final FakeClock fakeClock = new FakeClock();
+ @Mock
+ private ClientCall.Listener mockCallListener;
+ private CountDownLatch backoffLatch = new CountDownLatch(1);
+ private final EventLoopGroup group = new DefaultEventLoopGroup() {
+ @SuppressWarnings("FutureReturnValueIgnored")
+ @Override
+ public ScheduledFuture> schedule(
+ final Runnable command, final long delay, final TimeUnit unit) {
+ if (!command.getClass().getName().contains("RetryBackoffRunnable")) {
+ return super.schedule(command, delay, unit);
+ }
+ fakeClock.getScheduledExecutorService().schedule(
+ new Runnable() {
+ @Override
+ public void run() {
+ group.execute(command);
+ }
+ },
+ delay,
+ unit);
+ backoffLatch.countDown();
+ return super.schedule(
+ new Runnable() {
+ @Override
+ public void run() {} // no-op
+ },
+ 0,
+ TimeUnit.NANOSECONDS);
+ }
+ };
+ private final FakeStatsRecorder clientStatsRecorder = new FakeStatsRecorder();
+ private final ClientInterceptor statsInterceptor =
+ InternalCensusStatsAccessor.getClientInterceptor(
+ tagger, tagContextBinarySerializer, clientStatsRecorder,
+ fakeClock.getStopwatchSupplier(), true, true, true,
+ /* recordRealTimeMetrics= */ true);
+ private final MethodDescriptor clientStreamingMethod =
+ MethodDescriptor.newBuilder()
+ .setType(MethodType.CLIENT_STREAMING)
+ .setFullMethodName("service/method")
+ .setRequestMarshaller(new StringMarshaller())
+ .setResponseMarshaller(new IntegerMarshaller())
+ .build();
+ private final LinkedBlockingQueue> serverCalls =
+ new LinkedBlockingQueue<>();
+ private final ServerMethodDefinition methodDefinition =
+ ServerMethodDefinition.create(
+ clientStreamingMethod,
+ new ServerCallHandler() {
+ @Override
+ public Listener startCall(ServerCall call, Metadata headers) {
+ serverCalls.offer(call);
+ return new Listener() {};
+ }
+ }
+ );
+ private final ServerServiceDefinition serviceDefinition =
+ ServerServiceDefinition.builder(clientStreamingMethod.getServiceName())
+ .addMethod(methodDefinition)
+ .build();
+ private final LocalAddress localAddress = new LocalAddress(this.getClass().getName());
+ private Server localServer;
+ private ManagedChannel channel;
+ private Map retryPolicy = null;
+ private long bufferLimit = 1L << 20; // 1M
+
+ private void startNewServer() throws Exception {
+ localServer = cleanupRule.register(NettyServerBuilder.forAddress(localAddress)
+ .channelType(LocalServerChannel.class)
+ .bossEventLoopGroup(group)
+ .workerEventLoopGroup(group)
+ .addService(serviceDefinition)
+ .build());
+ localServer.start();
+ }
+
+ private void createNewChannel() {
+ Map methodConfig = new HashMap<>();
+ Map name = new HashMap<>();
+ name.put("service", "service");
+ methodConfig.put("name", Arrays.asList(name));
+ if (retryPolicy != null) {
+ methodConfig.put("retryPolicy", retryPolicy);
+ }
+ Map rawServiceConfig = new HashMap<>();
+ rawServiceConfig.put("methodConfig", Arrays.asList(methodConfig));
+ channel = cleanupRule.register(
+ NettyChannelBuilder.forAddress(localAddress)
+ .channelType(LocalChannel.class)
+ .eventLoopGroup(group)
+ .usePlaintext()
+ .enableRetry()
+ .perRpcBufferLimit(bufferLimit)
+ .defaultServiceConfig(rawServiceConfig)
+ .intercept(statsInterceptor)
+ .build());
+ }
+
+ private void elapseBackoff(long time, TimeUnit unit) throws Exception {
+ assertThat(backoffLatch.await(5, SECONDS)).isTrue();
+ backoffLatch = new CountDownLatch(1);
+ fakeClock.forwardTime(time, unit);
+ }
+
+ private void assertRpcStartedRecorded() throws Exception {
+ MetricsRecord record = clientStatsRecorder.pollRecord(5, SECONDS);
+ assertThat(record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_STARTED_COUNT))
+ .isEqualTo(1);
+ }
+
+ private void assertOutboundMessageRecorded() throws Exception {
+ MetricsRecord record = clientStatsRecorder.pollRecord(5, SECONDS);
+ assertThat(
+ record.getMetricAsLongOrFail(
+ RpcMeasureConstants.GRPC_CLIENT_SENT_MESSAGES_PER_METHOD))
+ .isEqualTo(1);
+ }
+
+ private void assertInboundMessageRecorded() throws Exception {
+ MetricsRecord record = clientStatsRecorder.pollRecord(5, SECONDS);
+ assertThat(
+ record.getMetricAsLongOrFail(
+ RpcMeasureConstants.GRPC_CLIENT_RECEIVED_MESSAGES_PER_METHOD))
+ .isEqualTo(1);
+ }
+
+ private void assertOutboundWireSizeRecorded(long length) throws Exception {
+ MetricsRecord record = clientStatsRecorder.pollRecord(5, SECONDS);
+ assertThat(record.getMetricAsLongOrFail(RpcMeasureConstants.GRPC_CLIENT_SENT_BYTES_PER_METHOD))
+ .isEqualTo(length);
+ }
+
+ private void assertInboundWireSizeRecorded(long length) throws Exception {
+ MetricsRecord record = clientStatsRecorder.pollRecord(5, SECONDS);
+ assertThat(
+ record.getMetricAsLongOrFail(RpcMeasureConstants.GRPC_CLIENT_RECEIVED_BYTES_PER_METHOD))
+ .isEqualTo(length);
+ }
+
+ private void assertRpcStatusRecorded(
+ Status.Code code, long roundtripLatencyMs, long outboundMessages) throws Exception {
+ MetricsRecord record = clientStatsRecorder.pollRecord(5, SECONDS);
+ TagValue statusTag = record.tags.get(RpcMeasureConstants.GRPC_CLIENT_STATUS);
+ assertThat(statusTag.asString()).isEqualTo(code.toString());
+ assertThat(record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_FINISHED_COUNT))
+ .isEqualTo(1);
+ assertThat(record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_ROUNDTRIP_LATENCY))
+ .isEqualTo(roundtripLatencyMs);
+ assertThat(record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_REQUEST_COUNT))
+ .isEqualTo(outboundMessages);
+ }
+
+ private void assertRetryStatsRecorded(
+ int numRetries, int numTransparentRetries, long retryDelayMs) throws Exception {
+ MetricsRecord record = clientStatsRecorder.pollRecord(5, SECONDS);
+ assertThat(record.getMetricAsLongOrFail(RETRIES_PER_CALL)).isEqualTo(numRetries);
+ assertThat(record.getMetricAsLongOrFail(TRANSPARENT_RETRIES_PER_CALL))
+ .isEqualTo(numTransparentRetries);
+ assertThat(record.getMetricAsLongOrFail(RETRY_DELAY_PER_CALL)).isEqualTo(retryDelayMs);
+ }
+
+ @Test
+ public void retryUntilBufferLimitExceeded() throws Exception {
+ String message = "String of length 20.";
+
+ startNewServer();
+ bufferLimit = message.length() * 2L - 1; // Can buffer no more than 1 message.
+ retryPolicy = ImmutableMap.builder()
+ .put("maxAttempts", 4D)
+ .put("initialBackoff", "10s")
+ .put("maxBackoff", "10s")
+ .put("backoffMultiplier", 1D)
+ .put("retryableStatusCodes", Arrays.asList("UNAVAILABLE"))
+ .build();
+ createNewChannel();
+ ClientCall call = channel.newCall(clientStreamingMethod, CallOptions.DEFAULT);
+ call.start(mockCallListener, new Metadata());
+ call.sendMessage(message);
+
+ ServerCall serverCall = serverCalls.poll(5, SECONDS);
+ serverCall.request(2);
+ // trigger retry
+ serverCall.close(
+ Status.UNAVAILABLE.withDescription("original attempt failed"),
+ new Metadata());
+ elapseBackoff(10, SECONDS);
+ // 2nd attempt received
+ serverCall = serverCalls.poll(5, SECONDS);
+ serverCall.request(2);
+ verify(mockCallListener, never()).onClose(any(Status.class), any(Metadata.class));
+ // send one more message, should exceed buffer limit
+ call.sendMessage(message);
+ // let attempt fail
+ serverCall.close(
+ Status.UNAVAILABLE.withDescription("2nd attempt failed"),
+ new Metadata());
+ // no more retry
+ ArgumentCaptor statusCaptor = ArgumentCaptor.forClass(null);
+ verify(mockCallListener, timeout(5000)).onClose(statusCaptor.capture(), any(Metadata.class));
+ assertThat(statusCaptor.getValue().getDescription()).contains("2nd attempt failed");
+ }
+
+ @Test
+ public void statsRecorded() throws Exception {
+ startNewServer();
+ retryPolicy = ImmutableMap.builder()
+ .put("maxAttempts", 4D)
+ .put("initialBackoff", "10s")
+ .put("maxBackoff", "10s")
+ .put("backoffMultiplier", 1D)
+ .put("retryableStatusCodes", Arrays.asList("UNAVAILABLE"))
+ .build();
+ createNewChannel();
+
+ ClientCall call = channel.newCall(clientStreamingMethod, CallOptions.DEFAULT);
+ call.start(mockCallListener, new Metadata());
+ assertRpcStartedRecorded();
+ String message = "String of length 20.";
+ call.sendMessage(message);
+ assertOutboundMessageRecorded();
+ ServerCall serverCall = serverCalls.poll(5, SECONDS);
+ serverCall.request(2);
+ assertOutboundWireSizeRecorded(message.length());
+ // original attempt latency
+ fakeClock.forwardTime(1, SECONDS);
+ // trigger retry
+ serverCall.close(
+ Status.UNAVAILABLE.withDescription("original attempt failed"),
+ new Metadata());
+ assertRpcStatusRecorded(Status.Code.UNAVAILABLE, 1000, 1);
+ elapseBackoff(10, SECONDS);
+ assertRpcStartedRecorded();
+ assertOutboundMessageRecorded();
+ serverCall = serverCalls.poll(5, SECONDS);
+ serverCall.request(2);
+ assertOutboundWireSizeRecorded(message.length());
+ message = "new message";
+ call.sendMessage(message);
+ assertOutboundMessageRecorded();
+ assertOutboundWireSizeRecorded(message.length());
+ // retry attempt latency
+ fakeClock.forwardTime(2, SECONDS);
+ serverCall.sendHeaders(new Metadata());
+ serverCall.sendMessage(3);
+ call.request(1);
+ assertInboundMessageRecorded();
+ assertInboundWireSizeRecorded(1);
+ serverCall.close(Status.OK, new Metadata());
+ assertRpcStatusRecorded(Status.Code.OK, 2000, 2);
+ assertRetryStatsRecorded(1, 0, 10_000);
+ }
+
+ @Test
+ public void serverCancelledAndClientDeadlineExceeded() throws Exception {
+ startNewServer();
+ createNewChannel();
+
+ class CloseDelayedTracer extends ClientStreamTracer {
+ @Override
+ public void streamClosed(Status status) {
+ fakeClock.forwardTime(10, SECONDS);
+ }
+ }
+
+ class CloseDelayedTracerFactory extends ClientStreamTracer.InternalLimitedInfoFactory {
+ @Override
+ public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) {
+ return new CloseDelayedTracer();
+ }
+ }
+
+ CallOptions callOptions = CallOptions.DEFAULT
+ .withDeadline(Deadline.after(
+ 10,
+ SECONDS,
+ new Ticker() {
+ @Override
+ public long nanoTime() {
+ return fakeClock.getTicker().read();
+ }
+ }))
+ .withStreamTracerFactory(new CloseDelayedTracerFactory());
+ ClientCall call = channel.newCall(clientStreamingMethod, callOptions);
+ call.start(mockCallListener, new Metadata());
+ assertRpcStartedRecorded();
+ ServerCall serverCall = serverCalls.poll(5, SECONDS);
+ serverCall.close(Status.CANCELLED, new Metadata());
+ assertRpcStatusRecorded(Code.DEADLINE_EXCEEDED, 10_000, 0);
+ assertRetryStatsRecorded(0, 0, 0);
+ }
+
+ @Ignore("flaky because old transportReportStatus() is not completely migrated yet")
+ @Test
+ public void transparentRetryStatsRecorded() throws Exception {
+ startNewServer();
+ createNewChannel();
+
+ final AtomicBoolean transparentRetryTriggered = new AtomicBoolean();
+ class TransparentRetryTriggeringTracer extends ClientStreamTracer {
+
+ @Override
+ public void streamCreated(Attributes transportAttrs, Metadata metadata) {
+ if (transparentRetryTriggered.get()) {
+ return;
+ }
+ localServer.shutdownNow();
+ }
+
+ @Override
+ public void streamClosed(Status status) {
+ if (transparentRetryTriggered.get()) {
+ return;
+ }
+ transparentRetryTriggered.set(true);
+ try {
+ startNewServer();
+ channel.resetConnectBackoff();
+ channel.getState(true);
+ } catch (Exception e) {
+ throw new AssertionError("local server can not be restarted", e);
+ }
+ }
+ }
+
+ class TransparentRetryTracerFactory extends ClientStreamTracer.InternalLimitedInfoFactory {
+ @Override
+ public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) {
+ return new TransparentRetryTriggeringTracer();
+ }
+ }
+
+ CallOptions callOptions = CallOptions.DEFAULT
+ .withWaitForReady()
+ .withStreamTracerFactory(new TransparentRetryTracerFactory());
+ ClientCall call = channel.newCall(clientStreamingMethod, callOptions);
+ call.start(mockCallListener, new Metadata());
+ assertRpcStartedRecorded();
+ assertRpcStatusRecorded(Code.UNAVAILABLE, 0, 0);
+ assertRpcStartedRecorded();
+ call.cancel("cancel", null);
+ assertRpcStatusRecorded(Code.CANCELLED, 0, 0);
+ assertRetryStatsRecorded(0, 1, 0);
+ }
+}
diff --git a/xds/src/main/java/io/grpc/xds/ClientXdsClient.java b/xds/src/main/java/io/grpc/xds/ClientXdsClient.java
index 4ae6651784f..93bfeb36b34 100644
--- a/xds/src/main/java/io/grpc/xds/ClientXdsClient.java
+++ b/xds/src/main/java/io/grpc/xds/ClientXdsClient.java
@@ -134,8 +134,8 @@ final class ClientXdsClient extends AbstractXdsClient {
|| Boolean.parseBoolean(System.getenv("GRPC_XDS_EXPERIMENTAL_FAULT_INJECTION"));
@VisibleForTesting
static boolean enableRetry =
- !Strings.isNullOrEmpty(System.getenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RETRY"))
- && Boolean.parseBoolean(System.getenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RETRY"));
+ Strings.isNullOrEmpty(System.getenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RETRY"))
+ || Boolean.parseBoolean(System.getenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RETRY"));
private static final String TYPE_URL_HTTP_CONNECTION_MANAGER_V2 =
"type.googleapis.com/envoy.config.filter.network.http_connection_manager.v2"
diff --git a/xds/src/test/java/io/grpc/xds/ClientXdsClientDataTest.java b/xds/src/test/java/io/grpc/xds/ClientXdsClientDataTest.java
index 4b12ebd71c8..60ce2befe45 100644
--- a/xds/src/test/java/io/grpc/xds/ClientXdsClientDataTest.java
+++ b/xds/src/test/java/io/grpc/xds/ClientXdsClientDataTest.java
@@ -136,7 +136,7 @@ public class ClientXdsClientDataTest {
@Before
public void setUp() {
originalEnableRetry = ClientXdsClient.enableRetry;
- assertThat(originalEnableRetry).isFalse();
+ assertThat(originalEnableRetry).isTrue();
}
@After