From 2c7f56abf8e637e73b2779bcd5895cbed9ad213f Mon Sep 17 00:00:00 2001 From: ZHANG Dapeng Date: Wed, 11 Aug 2021 10:24:37 -0700 Subject: [PATCH 1/4] all: implement retry stats (#8362) --- .../main/java/io/grpc/ClientStreamTracer.java | 30 +- .../java/io/grpc/ManagedChannelBuilder.java | 3 - .../io/grpc/census/CensusStatsModule.java | 230 ++++++--- .../io/grpc/census/CensusTracingModule.java | 47 +- .../io/grpc/census/CensusModulesTest.java | 269 ++++++++++- .../java/io/grpc/internal/ClientCallImpl.java | 3 +- .../main/java/io/grpc/internal/GrpcUtil.java | 3 +- .../io/grpc/internal/ManagedChannelImpl.java | 11 +- .../internal/ManagedChannelImplBuilder.java | 12 - .../java/io/grpc/internal/OobChannel.java | 2 +- .../io/grpc/internal/RetriableStream.java | 35 +- .../io/grpc/internal/SubchannelChannel.java | 2 +- .../io/grpc/internal/RetriableStreamTest.java | 3 +- interop-testing/build.gradle | 2 + .../integration/AbstractInteropTest.java | 23 + .../grpc/testing/integration/RetryTest.java | 450 ++++++++++++++++++ 16 files changed, 979 insertions(+), 146 deletions(-) create mode 100644 interop-testing/src/test/java/io/grpc/testing/integration/RetryTest.java diff --git a/api/src/main/java/io/grpc/ClientStreamTracer.java b/api/src/main/java/io/grpc/ClientStreamTracer.java index 6a5d3cc3397..bb836ac82e1 100644 --- a/api/src/main/java/io/grpc/ClientStreamTracer.java +++ b/api/src/main/java/io/grpc/ClientStreamTracer.java @@ -97,11 +97,15 @@ public abstract static class InternalLimitedInfoFactory extends Factory {} public static final class StreamInfo { private final Attributes transportAttrs; private final CallOptions callOptions; + private final int previousAttempts; private final boolean isTransparentRetry; - StreamInfo(Attributes transportAttrs, CallOptions callOptions, boolean isTransparentRetry) { + StreamInfo( + Attributes transportAttrs, CallOptions callOptions, int previousAttempts, + boolean isTransparentRetry) { this.transportAttrs = checkNotNull(transportAttrs, "transportAttrs"); this.callOptions = checkNotNull(callOptions, "callOptions"); + this.previousAttempts = previousAttempts; this.isTransparentRetry = isTransparentRetry; } @@ -124,6 +128,15 @@ public CallOptions getCallOptions() { return callOptions; } + /** + * Returns the number of preceding attempts for the RPC. + * + * @since 1.40.0 + */ + public int getPreviousAttempts() { + return previousAttempts; + } + /** * Whether the stream is a transparent retry. * @@ -142,6 +155,7 @@ public Builder toBuilder() { return new Builder() .setCallOptions(callOptions) .setTransportAttrs(transportAttrs) + .setPreviousAttempts(previousAttempts) .setIsTransparentRetry(isTransparentRetry); } @@ -159,6 +173,7 @@ public String toString() { return MoreObjects.toStringHelper(this) .add("transportAttrs", transportAttrs) .add("callOptions", callOptions) + .add("previousAttempts", previousAttempts) .add("isTransparentRetry", isTransparentRetry) .toString(); } @@ -171,6 +186,7 @@ public String toString() { public static final class Builder { private Attributes transportAttrs = Attributes.EMPTY; private CallOptions callOptions = CallOptions.DEFAULT; + private int previousAttempts; private boolean isTransparentRetry; Builder() { @@ -197,6 +213,16 @@ public Builder setCallOptions(CallOptions callOptions) { return this; } + /** + * Set the number of preceding attempts of the RPC. + * + * @since 1.40.0 + */ + public Builder setPreviousAttempts(int previousAttempts) { + this.previousAttempts = previousAttempts; + return this; + } + /** * Sets whether the stream is a transparent retry. * @@ -211,7 +237,7 @@ public Builder setIsTransparentRetry(boolean isTransparentRetry) { * Builds a new StreamInfo. */ public StreamInfo build() { - return new StreamInfo(transportAttrs, callOptions, isTransparentRetry); + return new StreamInfo(transportAttrs, callOptions, previousAttempts, isTransparentRetry); } } } diff --git a/api/src/main/java/io/grpc/ManagedChannelBuilder.java b/api/src/main/java/io/grpc/ManagedChannelBuilder.java index e4a4611541d..73e66ed6dc4 100644 --- a/api/src/main/java/io/grpc/ManagedChannelBuilder.java +++ b/api/src/main/java/io/grpc/ManagedChannelBuilder.java @@ -479,9 +479,6 @@ public T disableRetry() { * transparent retries, which are safe for non-idempotent RPCs. Service config is ideally provided * by the name resolver, but may also be specified via {@link #defaultServiceConfig}. * - *

For the current release, this method may have a side effect that disables Census stats and - * tracing. - * * @return this * @since 1.11.0 */ diff --git a/census/src/main/java/io/grpc/census/CensusStatsModule.java b/census/src/main/java/io/grpc/census/CensusStatsModule.java index ac5f4e705e3..6faeb575ccc 100644 --- a/census/src/main/java/io/grpc/census/CensusStatsModule.java +++ b/census/src/main/java/io/grpc/census/CensusStatsModule.java @@ -17,7 +17,6 @@ package io.grpc.census; import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; @@ -28,16 +27,20 @@ import io.grpc.ClientCall; import io.grpc.ClientInterceptor; import io.grpc.ClientStreamTracer; +import io.grpc.ClientStreamTracer.StreamInfo; import io.grpc.Context; +import io.grpc.Deadline; import io.grpc.ForwardingClientCall.SimpleForwardingClientCall; import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener; import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.ServerStreamTracer; import io.grpc.Status; +import io.grpc.Status.Code; import io.grpc.StreamTracer; import io.grpc.census.internal.DeprecatedCensusConstants; import io.opencensus.contrib.grpc.metrics.RpcMeasureConstants; +import io.opencensus.stats.Measure; import io.opencensus.stats.Measure.MeasureDouble; import io.opencensus.stats.Measure.MeasureLong; import io.opencensus.stats.MeasureMap; @@ -51,9 +54,11 @@ import io.opencensus.tags.propagation.TagContextSerializationException; import io.opencensus.tags.unsafe.ContextUtils; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLongFieldUpdater; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Nullable; @@ -61,9 +66,10 @@ /** * Provides factories for {@link StreamTracer} that records stats to Census. * - *

On the client-side, a factory is created for each call, because ClientCall starts earlier than - * the ClientStream, and in some cases may even not create a ClientStream at all. Therefore, it's - * the factory that reports the summary to Census. + *

On the client-side, a factory is created for each call, and the factory creates a stream + * tracer for each attempt. If there is no stream created when the call is ended, we still create a + * tracer. It's the tracer that reports per-attempt stats, and the factory that reports the stats + * of the overall RPC, such as RETRIES_PER_CALL, to Census. * *

On the server-side, there is only one ServerStream per each ServerCall, and ServerStream * starts earlier than the ServerCall. Therefore, only one tracer is created per stream/call and @@ -168,7 +174,6 @@ private void recordRealTimeMetric(TagContext ctx, MeasureLong measure, long valu } private static final class ClientTracer extends ClientStreamTracer { - @Nullable private static final AtomicLongFieldUpdater outboundMessageCountUpdater; @Nullable private static final AtomicLongFieldUpdater inboundMessageCountUpdater; @Nullable private static final AtomicLongFieldUpdater outboundWireSizeUpdater; @@ -222,21 +227,31 @@ private static final class ClientTracer extends ClientStreamTracer { inboundUncompressedSizeUpdater = tmpInboundUncompressedSizeUpdater; } - private final CensusStatsModule module; + final Stopwatch stopwatch; + final CallAttemptsTracerFactory attemptsState; + final AtomicBoolean inboundReceivedOrClosed = new AtomicBoolean(); + final CensusStatsModule module; final TagContext parentCtx; - private final TagContext startCtx; - + final TagContext startCtx; + final StreamInfo info; volatile long outboundMessageCount; volatile long inboundMessageCount; volatile long outboundWireSize; volatile long inboundWireSize; volatile long outboundUncompressedSize; volatile long inboundUncompressedSize; - - ClientTracer(CensusStatsModule module, TagContext parentCtx, TagContext startCtx) { - this.module = checkNotNull(module, "module"); + long roundtripNanos; + Code statusCode; + + ClientTracer( + CallAttemptsTracerFactory attemptsState, CensusStatsModule module, TagContext parentCtx, + TagContext startCtx, StreamInfo info) { + this.attemptsState = attemptsState; + this.module = module; this.parentCtx = parentCtx; - this.startCtx = checkNotNull(startCtx, "startCtx"); + this.startCtx = startCtx; + this.info = info; + this.stopwatch = module.stopwatchSupplier.get().start(); } @Override @@ -296,6 +311,11 @@ public void inboundUncompressedSize(long bytes) { @Override @SuppressWarnings("NonAtomicVolatileUpdate") public void inboundMessage(int seqNo) { + if (inboundReceivedOrClosed.compareAndSet(false, true)) { + // Because inboundUncompressedSize() might be called after streamClosed(), + // we will report stats in callEnded(). Note that this attempt is already committed. + attemptsState.inboundMetricTracer = this; + } if (inboundMessageCountUpdater != null) { inboundMessageCountUpdater.getAndIncrement(this); } else { @@ -316,14 +336,74 @@ public void outboundMessage(int seqNo) { module.recordRealTimeMetric( startCtx, RpcMeasureConstants.GRPC_CLIENT_SENT_MESSAGES_PER_METHOD, 1); } + + @Override + public void streamClosed(Status status) { + attemptsState.attemptEnded(); + stopwatch.stop(); + roundtripNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS); + Deadline deadline = info.getCallOptions().getDeadline(); + statusCode = status.getCode(); + if (statusCode == Status.Code.CANCELLED && deadline != null) { + // When the server's deadline expires, it can only reset the stream with CANCEL and no + // description. Since our timer may be delayed in firing, we double-check the deadline and + // turn the failure into the likely more helpful DEADLINE_EXCEEDED status. + if (deadline.isExpired()) { + statusCode = Code.DEADLINE_EXCEEDED; + } + } + if (inboundReceivedOrClosed.compareAndSet(false, true)) { + if (module.recordFinishedRpcs) { + // Stream is closed early. So no need to record metrics for any inbound events after this + // point. + recordFinishedRpc(); + } + } // Otherwise will report stats in callEnded() to guarantee all inbound metrics are recorded. + } + + void recordFinishedRpc() { + MeasureMap measureMap = module.statsRecorder.newMeasureMap() + // TODO(songya): remove the deprecated measure constants once they are completed removed. + .put(DeprecatedCensusConstants.RPC_CLIENT_FINISHED_COUNT, 1) + // The latency is double value + .put( + DeprecatedCensusConstants.RPC_CLIENT_ROUNDTRIP_LATENCY, + roundtripNanos / NANOS_PER_MILLI) + .put(DeprecatedCensusConstants.RPC_CLIENT_REQUEST_COUNT, outboundMessageCount) + .put(DeprecatedCensusConstants.RPC_CLIENT_RESPONSE_COUNT, inboundMessageCount) + .put(DeprecatedCensusConstants.RPC_CLIENT_REQUEST_BYTES, outboundWireSize) + .put(DeprecatedCensusConstants.RPC_CLIENT_RESPONSE_BYTES, inboundWireSize) + .put( + DeprecatedCensusConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES, + outboundUncompressedSize) + .put( + DeprecatedCensusConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES, + inboundUncompressedSize); + if (statusCode != Code.OK) { + measureMap.put(DeprecatedCensusConstants.RPC_CLIENT_ERROR_COUNT, 1); + } + TagValue statusTag = TagValue.create(statusCode.toString()); + measureMap.record( + module + .tagger + .toBuilder(startCtx) + .putLocal(RpcMeasureConstants.GRPC_CLIENT_STATUS, statusTag) + .build()); + } } @VisibleForTesting static final class CallAttemptsTracerFactory extends ClientStreamTracer.InternalLimitedInfoFactory { - @Nullable - private static final AtomicReferenceFieldUpdater - streamTracerUpdater; + static final MeasureLong RETRIES_PER_CALL = + Measure.MeasureLong.create( + "grpc.io/client/retries_per_call", "Number of retries per call", "1"); + static final MeasureLong TRANSPARENT_RETRIES_PER_CALL = + Measure.MeasureLong.create( + "grpc.io/client/transparent_retries_per_call", "Transparent retries per call", "1"); + static final MeasureDouble RETRY_DELAY_PER_CALL = + Measure.MeasureDouble.create( + "grpc.io/client/retry_delay_per_call", "Retry delay per call", "ms"); @Nullable private static final AtomicIntegerFieldUpdater callEndedUpdater; @@ -334,40 +414,45 @@ static final class CallAttemptsTracerFactory extends * (potentially racy) direct updates of the volatile variables. */ static { - AtomicReferenceFieldUpdater tmpStreamTracerUpdater; AtomicIntegerFieldUpdater tmpCallEndedUpdater; try { - tmpStreamTracerUpdater = - AtomicReferenceFieldUpdater.newUpdater( - CallAttemptsTracerFactory.class, ClientTracer.class, "streamTracer"); tmpCallEndedUpdater = AtomicIntegerFieldUpdater.newUpdater(CallAttemptsTracerFactory.class, "callEnded"); } catch (Throwable t) { logger.log(Level.SEVERE, "Creating atomic field updaters failed", t); - tmpStreamTracerUpdater = null; tmpCallEndedUpdater = null; } - streamTracerUpdater = tmpStreamTracerUpdater; callEndedUpdater = tmpCallEndedUpdater; } + ClientTracer inboundMetricTracer; private final CensusStatsModule module; private final Stopwatch stopwatch; - private volatile ClientTracer streamTracer; private volatile int callEnded; private final TagContext parentCtx; private final TagContext startCtx; + private final String fullMethodName; + + // TODO(zdapeng): optimize memory allocation using AtomicFieldUpdater. + private final AtomicLong attemptsPerCall = new AtomicLong(); + private final AtomicLong transparentRetriesPerCall = new AtomicLong(); + private final AtomicLong retryDelayNanos = new AtomicLong(); + private final AtomicLong lastInactiveTimeStamp = new AtomicLong(); + private final AtomicInteger activeStreams = new AtomicInteger(); + private final AtomicBoolean activated = new AtomicBoolean(); CallAttemptsTracerFactory( CensusStatsModule module, TagContext parentCtx, String fullMethodName) { - this.module = checkNotNull(module); - this.parentCtx = checkNotNull(parentCtx); + this.module = checkNotNull(module, "module"); + this.parentCtx = checkNotNull(parentCtx, "parentCtx"); + this.fullMethodName = checkNotNull(fullMethodName, "fullMethodName"); + this.stopwatch = module.stopwatchSupplier.get().start(); TagValue methodTag = TagValue.create(fullMethodName); - this.startCtx = module.tagger.toBuilder(parentCtx) + startCtx = module.tagger.toBuilder(parentCtx) .putLocal(RpcMeasureConstants.GRPC_CLIENT_METHOD, methodTag) .build(); - this.stopwatch = module.stopwatchSupplier.get().start(); if (module.recordStartedRpcs) { + // Record here in case newClientStreamTracer() would never be called. module.statsRecorder.newMeasureMap() .put(DeprecatedCensusConstants.RPC_CLIENT_STARTED_COUNT, 1) .record(startCtx); @@ -375,30 +460,37 @@ static final class CallAttemptsTracerFactory extends } @Override - public ClientStreamTracer newClientStreamTracer( - ClientStreamTracer.StreamInfo info, Metadata headers) { - ClientTracer tracer = new ClientTracer(module, parentCtx, startCtx); - // TODO(zhangkun83): Once retry or hedging is implemented, a ClientCall may start more than - // one streams. We will need to update this file to support them. - if (streamTracerUpdater != null) { - checkState( - streamTracerUpdater.compareAndSet(this, null, tracer), - "Are you creating multiple streams per call? This class doesn't yet support this case"); + public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata metadata) { + ClientTracer tracer = new ClientTracer(this, module, parentCtx, startCtx, info); + if (activeStreams.incrementAndGet() == 1) { + if (!activated.compareAndSet(false, true)) { + retryDelayNanos.addAndGet(stopwatch.elapsed(TimeUnit.NANOSECONDS)); + } + } + if (module.recordStartedRpcs && attemptsPerCall.get() > 0) { + module.statsRecorder.newMeasureMap() + .put(DeprecatedCensusConstants.RPC_CLIENT_STARTED_COUNT, 1) + .record(startCtx); + } + if (info.isTransparentRetry()) { + transparentRetriesPerCall.incrementAndGet(); } else { - checkState( - streamTracer == null, - "Are you creating multiple streams per call? This class doesn't yet support this case"); - streamTracer = tracer; + attemptsPerCall.incrementAndGet(); } return tracer; } - /** - * Record a finished call and mark the current time as the end time. - * - *

Can be called from any thread without synchronization. Calling it the second time or more - * is a no-op. - */ + // Called whenever each attempt is ended. + void attemptEnded() { + if (activeStreams.decrementAndGet() == 0) { + // Race condition between two extremely close events does not matter because the difference + // in the result would be very small. + long lastInactiveTimeStamp = + this.lastInactiveTimeStamp.getAndSet(stopwatch.elapsed(TimeUnit.NANOSECONDS)); + retryDelayNanos.addAndGet(-lastInactiveTimeStamp); + } + } + void callEnded(Status status) { if (callEndedUpdater != null) { if (callEndedUpdater.getAndSet(this, 1) != 0) { @@ -414,36 +506,30 @@ void callEnded(Status status) { return; } stopwatch.stop(); - long roundtripNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS); - ClientTracer tracer = streamTracer; - if (tracer == null) { - tracer = new ClientTracer(module, parentCtx, startCtx); + if (attemptsPerCall.get() == 0) { + ClientTracer tracer = new ClientTracer(this, module, parentCtx, startCtx, null); + tracer.roundtripNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS); + tracer.statusCode = status.getCode(); + tracer.recordFinishedRpc(); + } else if (inboundMetricTracer != null) { + inboundMetricTracer.recordFinishedRpc(); } - MeasureMap measureMap = module.statsRecorder.newMeasureMap() - // TODO(songya): remove the deprecated measure constants once they are completed removed. - .put(DeprecatedCensusConstants.RPC_CLIENT_FINISHED_COUNT, 1) - // The latency is double value - .put( - DeprecatedCensusConstants.RPC_CLIENT_ROUNDTRIP_LATENCY, - roundtripNanos / NANOS_PER_MILLI) - .put(DeprecatedCensusConstants.RPC_CLIENT_REQUEST_COUNT, tracer.outboundMessageCount) - .put(DeprecatedCensusConstants.RPC_CLIENT_RESPONSE_COUNT, tracer.inboundMessageCount) - .put(DeprecatedCensusConstants.RPC_CLIENT_REQUEST_BYTES, tracer.outboundWireSize) - .put(DeprecatedCensusConstants.RPC_CLIENT_RESPONSE_BYTES, tracer.inboundWireSize) - .put( - DeprecatedCensusConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES, - tracer.outboundUncompressedSize) - .put( - DeprecatedCensusConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES, - tracer.inboundUncompressedSize); - if (!status.isOk()) { - measureMap.put(DeprecatedCensusConstants.RPC_CLIENT_ERROR_COUNT, 1); + + long retriesPerCall = 0; + long attempts = attemptsPerCall.get(); + if (attempts > 0) { + retriesPerCall = attempts - 1; } + MeasureMap measureMap = module.statsRecorder.newMeasureMap() + .put(RETRIES_PER_CALL, retriesPerCall) + .put(TRANSPARENT_RETRIES_PER_CALL, transparentRetriesPerCall.get()) + .put(RETRY_DELAY_PER_CALL, retryDelayNanos.get() / NANOS_PER_MILLI); + TagValue methodTag = TagValue.create(fullMethodName); TagValue statusTag = TagValue.create(status.getCode().toString()); measureMap.record( - module - .tagger - .toBuilder(startCtx) + module.tagger + .toBuilder(parentCtx) + .putLocal(RpcMeasureConstants.GRPC_CLIENT_METHOD, methodTag) .putLocal(RpcMeasureConstants.GRPC_CLIENT_STATUS, statusTag) .build()); } diff --git a/census/src/main/java/io/grpc/census/CensusTracingModule.java b/census/src/main/java/io/grpc/census/CensusTracingModule.java index dac62206fd2..08d5fe3ca97 100644 --- a/census/src/main/java/io/grpc/census/CensusTracingModule.java +++ b/census/src/main/java/io/grpc/census/CensusTracingModule.java @@ -32,6 +32,7 @@ import io.grpc.MethodDescriptor; import io.grpc.ServerStreamTracer; import io.grpc.StreamTracer; +import io.opencensus.trace.AttributeValue; import io.opencensus.trace.BlankSpan; import io.opencensus.trace.EndSpanOptions; import io.opencensus.trace.MessageEvent; @@ -60,7 +61,8 @@ final class CensusTracingModule { private static final Logger logger = Logger.getLogger(CensusTracingModule.class.getName()); - @Nullable private static final AtomicIntegerFieldUpdater callEndedUpdater; + @Nullable + private static final AtomicIntegerFieldUpdater callEndedUpdater; @Nullable private static final AtomicIntegerFieldUpdater streamClosedUpdater; @@ -70,11 +72,11 @@ final class CensusTracingModule { * (potentially racy) direct updates of the volatile variables. */ static { - AtomicIntegerFieldUpdater tmpCallEndedUpdater; + AtomicIntegerFieldUpdater tmpCallEndedUpdater; AtomicIntegerFieldUpdater tmpStreamClosedUpdater; try { tmpCallEndedUpdater = - AtomicIntegerFieldUpdater.newUpdater(ClientCallTracer.class, "callEnded"); + AtomicIntegerFieldUpdater.newUpdater(CallAttemptsTracerFactory.class, "callEnded"); tmpStreamClosedUpdater = AtomicIntegerFieldUpdater.newUpdater(ServerTracer.class, "streamClosed"); } catch (Throwable t) { @@ -116,11 +118,12 @@ public SpanContext parseBytes(byte[] serialized) { } /** - * Creates a {@link ClientCallTracer} for a new call. + * Creates a {@link CallAttemptsTracerFactory} for a new call. */ @VisibleForTesting - ClientCallTracer newClientCallTracer(@Nullable Span parentSpan, MethodDescriptor method) { - return new ClientCallTracer(parentSpan, method); + CallAttemptsTracerFactory newClientCallTracer( + @Nullable Span parentSpan, MethodDescriptor method) { + return new CallAttemptsTracerFactory(parentSpan, method); } /** @@ -223,19 +226,21 @@ private static void recordMessageEvent( } @VisibleForTesting - final class ClientCallTracer extends ClientStreamTracer.InternalLimitedInfoFactory { + final class CallAttemptsTracerFactory extends ClientStreamTracer.InternalLimitedInfoFactory { volatile int callEnded; private final boolean isSampledToLocalTracing; private final Span span; + private final String fullMethodName; - ClientCallTracer(@Nullable Span parentSpan, MethodDescriptor method) { + CallAttemptsTracerFactory(@Nullable Span parentSpan, MethodDescriptor method) { checkNotNull(method, "method"); this.isSampledToLocalTracing = method.isSampledToLocalTracing(); + this.fullMethodName = method.getFullMethodName(); this.span = censusTracer .spanBuilderWithExplicitParent( - generateTraceSpanName(false, method.getFullMethodName()), + generateTraceSpanName(false, fullMethodName), parentSpan) .setRecordEvents(true) .startSpan(); @@ -244,7 +249,17 @@ final class ClientCallTracer extends ClientStreamTracer.InternalLimitedInfoFacto @Override public ClientStreamTracer newClientStreamTracer( ClientStreamTracer.StreamInfo info, Metadata headers) { - return new ClientTracer(span, tracingHeader); + Span attemptSpan = censusTracer + .spanBuilderWithExplicitParent( + "Attempt." + fullMethodName.replace('/', '.'), + span) + .setRecordEvents(true) + .startSpan(); + attemptSpan.putAttribute( + "previous-rpc-attempts", AttributeValue.longAttributeValue(info.getPreviousAttempts())); + attemptSpan.putAttribute( + "transparent-retry", AttributeValue.booleanAttributeValue(info.isTransparentRetry())); + return new ClientTracer(attemptSpan, tracingHeader, isSampledToLocalTracing); } /** @@ -271,10 +286,13 @@ void callEnded(io.grpc.Status status) { private static final class ClientTracer extends ClientStreamTracer { private final Span span; final Metadata.Key tracingHeader; + final boolean isSampledToLocalTracing; - ClientTracer(Span span, Metadata.Key tracingHeader) { + ClientTracer( + Span span, Metadata.Key tracingHeader, boolean isSampledToLocalTracing) { this.span = checkNotNull(span, "span"); this.tracingHeader = tracingHeader; + this.isSampledToLocalTracing = isSampledToLocalTracing; } @Override @@ -298,6 +316,11 @@ public void inboundMessageRead( recordMessageEvent( span, MessageEvent.Type.RECEIVED, seqNo, optionalWireSize, optionalUncompressedSize); } + + @Override + public void streamClosed(io.grpc.Status status) { + span.end(createEndSpanOptions(status, isSampledToLocalTracing)); + } } @@ -388,7 +411,7 @@ public ClientCall interceptCall( // Safe usage of the unsafe trace API because CONTEXT_SPAN_KEY.get() returns the same value // as Tracer.getCurrentSpan() except when no value available when the return value is null // for the direct access and BlankSpan when Tracer API is used. - final ClientCallTracer tracerFactory = + final CallAttemptsTracerFactory tracerFactory = newClientCallTracer(ContextUtils.getValue(Context.current()), method); ClientCall call = next.newCall( diff --git a/census/src/test/java/io/grpc/census/CensusModulesTest.java b/census/src/test/java/io/grpc/census/CensusModulesTest.java index fd3a049f7a4..d285c8fe8c2 100644 --- a/census/src/test/java/io/grpc/census/CensusModulesTest.java +++ b/census/src/test/java/io/grpc/census/CensusModulesTest.java @@ -18,6 +18,9 @@ import static com.google.common.truth.Truth.assertThat; import static com.google.common.truth.Truth.assertWithMessage; +import static io.grpc.census.CensusStatsModule.CallAttemptsTracerFactory.RETRIES_PER_CALL; +import static io.grpc.census.CensusStatsModule.CallAttemptsTracerFactory.RETRY_DELAY_PER_CALL; +import static io.grpc.census.CensusStatsModule.CallAttemptsTracerFactory.TRANSPARENT_RETRIES_PER_CALL; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -58,6 +61,7 @@ import io.grpc.ServerStreamTracer; import io.grpc.ServerStreamTracer.ServerCallInfo; import io.grpc.Status; +import io.grpc.census.CensusTracingModule.CallAttemptsTracerFactory; import io.grpc.census.internal.DeprecatedCensusConstants; import io.grpc.internal.FakeClock; import io.grpc.internal.testing.StatsTestUtils; @@ -81,6 +85,7 @@ import io.opencensus.stats.View; import io.opencensus.tags.TagContext; import io.opencensus.tags.TagValue; +import io.opencensus.trace.AttributeValue; import io.opencensus.trace.BlankSpan; import io.opencensus.trace.EndSpanOptions; import io.opencensus.trace.MessageEvent; @@ -173,10 +178,12 @@ public String parse(InputStream stream) { private final Random random = new Random(1234); private final Span fakeClientParentSpan = MockableSpan.generateRandomSpan(random); private final Span spyClientSpan = spy(MockableSpan.generateRandomSpan(random)); - private final SpanContext fakeClientSpanContext = spyClientSpan.getContext(); + private final Span spyAttemptSpan = spy(MockableSpan.generateRandomSpan(random)); + private final SpanContext fakeAttemptSpanContext = spyAttemptSpan.getContext(); private final Span spyServerSpan = spy(MockableSpan.generateRandomSpan(random)); private final byte[] binarySpanContext = new byte[]{3, 1, 5}; private final SpanBuilder spyClientSpanBuilder = spy(new MockableSpan.Builder()); + private final SpanBuilder spyAttemptSpanBuilder = spy(new MockableSpan.Builder()); private final SpanBuilder spyServerSpanBuilder = spy(new MockableSpan.Builder()); @Rule @@ -201,15 +208,20 @@ public String parse(InputStream stream) { @Before public void setUp() throws Exception { when(spyClientSpanBuilder.startSpan()).thenReturn(spyClientSpan); - when(tracer.spanBuilderWithExplicitParent(anyString(), ArgumentMatchers.any())) + when(spyAttemptSpanBuilder.startSpan()).thenReturn(spyAttemptSpan); + when(tracer.spanBuilderWithExplicitParent( + eq("Sent.package1.service2.method3"), ArgumentMatchers.any())) .thenReturn(spyClientSpanBuilder); + when(tracer.spanBuilderWithExplicitParent( + eq("Attempt.package1.service2.method3"), ArgumentMatchers.any())) + .thenReturn(spyAttemptSpanBuilder); when(spyServerSpanBuilder.startSpan()).thenReturn(spyServerSpan); when(tracer.spanBuilderWithRemoteParent(anyString(), ArgumentMatchers.any())) .thenReturn(spyServerSpanBuilder); when(mockTracingPropagationHandler.toByteArray(any(SpanContext.class))) .thenReturn(binarySpanContext); when(mockTracingPropagationHandler.fromByteArray(any(byte[].class))) - .thenReturn(fakeClientSpanContext); + .thenReturn(fakeAttemptSpanContext); censusStats = new CensusStatsModule( tagger, tagCtxSerializer, statsRecorder, fakeClock.getStopwatchSupplier(), @@ -292,7 +304,7 @@ public ClientCall interceptCall( assertEquals(2, capturedCallOptions.get().getStreamTracerFactories().size()); assertTrue( capturedCallOptions.get().getStreamTracerFactories().get(0) - instanceof CensusTracingModule.ClientCallTracer); + instanceof CallAttemptsTracerFactory); assertTrue( capturedCallOptions.get().getStreamTracerFactories().get(1) instanceof CensusStatsModule.CallAttemptsTracerFactory); @@ -355,6 +367,7 @@ record = statsRecorder.pollRecord(); .setSampleToLocalSpanStore(false) .build()); verify(spyClientSpan, never()).end(); + assertZeroRetryRecorded(); } @Test @@ -489,11 +502,200 @@ private void subtestClientBasicStatsDefaultContext( DeprecatedCensusConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES)); assertEquals(30 + 100 + 16 + 24, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_ROUNDTRIP_LATENCY)); + assertZeroRetryRecorded(); } else { assertNull(statsRecorder.pollRecord()); } } + // This test is only unit-testing the stat recording logic. The retry behavior is faked. + @Test + public void recordRetryStats() { + CensusStatsModule localCensusStats = + new CensusStatsModule( + tagger, tagCtxSerializer, statsRecorder, fakeClock.getStopwatchSupplier(), + true, true, true, true); + CensusStatsModule.CallAttemptsTracerFactory callAttemptsTracerFactory = + new CensusStatsModule.CallAttemptsTracerFactory( + localCensusStats, tagger.empty(), method.getFullMethodName()); + ClientStreamTracer tracer = + callAttemptsTracerFactory.newClientStreamTracer(STREAM_INFO, new Metadata()); + + StatsTestUtils.MetricsRecord record = statsRecorder.pollRecord(); + assertEquals(1, record.tags.size()); + TagValue methodTag = record.tags.get(RpcMeasureConstants.GRPC_CLIENT_METHOD); + assertEquals(method.getFullMethodName(), methodTag.asString()); + assertEquals( + 1, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_STARTED_COUNT)); + + fakeClock.forwardTime(30, MILLISECONDS); + tracer.outboundHeaders(); + fakeClock.forwardTime(100, MILLISECONDS); + tracer.outboundMessage(0); + assertRealTimeMetric( + RpcMeasureConstants.GRPC_CLIENT_SENT_MESSAGES_PER_METHOD, 1, true, true); + tracer.outboundMessage(1); + assertRealTimeMetric( + RpcMeasureConstants.GRPC_CLIENT_SENT_MESSAGES_PER_METHOD, 1, true, true); + tracer.outboundWireSize(1028); + assertRealTimeMetric( + RpcMeasureConstants.GRPC_CLIENT_SENT_BYTES_PER_METHOD, 1028, true, true); + tracer.outboundUncompressedSize(1128); + fakeClock.forwardTime(24, MILLISECONDS); + tracer.streamClosed(Status.UNAVAILABLE); + record = statsRecorder.pollRecord(); + methodTag = record.tags.get(RpcMeasureConstants.GRPC_CLIENT_METHOD); + assertEquals(method.getFullMethodName(), methodTag.asString()); + TagValue statusTag = record.tags.get(RpcMeasureConstants.GRPC_CLIENT_STATUS); + assertEquals(Status.Code.UNAVAILABLE.toString(), statusTag.asString()); + assertEquals( + 1, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_FINISHED_COUNT)); + assertEquals(1, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_ERROR_COUNT)); + assertEquals( + 2, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_REQUEST_COUNT)); + assertEquals( + 1028, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_REQUEST_BYTES)); + assertEquals( + 1128, + record.getMetricAsLongOrFail( + DeprecatedCensusConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES)); + assertEquals( + 30 + 100 + 24, + record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_ROUNDTRIP_LATENCY)); + + // faking retry + fakeClock.forwardTime(1000, MILLISECONDS); + tracer = callAttemptsTracerFactory.newClientStreamTracer(STREAM_INFO, new Metadata()); + record = statsRecorder.pollRecord(); + assertEquals(1, record.tags.size()); + methodTag = record.tags.get(RpcMeasureConstants.GRPC_CLIENT_METHOD); + assertEquals(method.getFullMethodName(), methodTag.asString()); + assertEquals( + 1, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_STARTED_COUNT)); + tracer.outboundHeaders(); + tracer.outboundMessage(0); + assertRealTimeMetric( + RpcMeasureConstants.GRPC_CLIENT_SENT_MESSAGES_PER_METHOD, 1, true, true); + tracer.outboundMessage(1); + assertRealTimeMetric( + RpcMeasureConstants.GRPC_CLIENT_SENT_MESSAGES_PER_METHOD, 1, true, true); + tracer.outboundWireSize(1028); + assertRealTimeMetric( + RpcMeasureConstants.GRPC_CLIENT_SENT_BYTES_PER_METHOD, 1028, true, true); + tracer.outboundUncompressedSize(1128); + fakeClock.forwardTime(100, MILLISECONDS); + tracer.streamClosed(Status.NOT_FOUND); + record = statsRecorder.pollRecord(); + methodTag = record.tags.get(RpcMeasureConstants.GRPC_CLIENT_METHOD); + assertEquals(method.getFullMethodName(), methodTag.asString()); + statusTag = record.tags.get(RpcMeasureConstants.GRPC_CLIENT_STATUS); + assertEquals(Status.Code.NOT_FOUND.toString(), statusTag.asString()); + assertEquals( + 1, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_FINISHED_COUNT)); + assertEquals(1, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_ERROR_COUNT)); + assertEquals( + 2, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_REQUEST_COUNT)); + assertEquals( + 1028, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_REQUEST_BYTES)); + assertEquals( + 1128, + record.getMetricAsLongOrFail( + DeprecatedCensusConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES)); + assertEquals( + 100 , + record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_ROUNDTRIP_LATENCY)); + + // fake transparent retry + fakeClock.forwardTime(10, MILLISECONDS); + tracer = callAttemptsTracerFactory.newClientStreamTracer( + STREAM_INFO.toBuilder().setIsTransparentRetry(true).build(), new Metadata()); + record = statsRecorder.pollRecord(); + assertEquals(1, record.tags.size()); + methodTag = record.tags.get(RpcMeasureConstants.GRPC_CLIENT_METHOD); + assertEquals(method.getFullMethodName(), methodTag.asString()); + assertEquals( + 1, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_STARTED_COUNT)); + tracer.streamClosed(Status.UNAVAILABLE); + record = statsRecorder.pollRecord(); + statusTag = record.tags.get(RpcMeasureConstants.GRPC_CLIENT_STATUS); + assertEquals(Status.Code.UNAVAILABLE.toString(), statusTag.asString()); + assertEquals( + 1, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_FINISHED_COUNT)); + assertEquals(1, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_ERROR_COUNT)); + assertEquals( + 0, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_REQUEST_COUNT)); + assertEquals( + 0, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_REQUEST_BYTES)); + + // fake another transparent retry + fakeClock.forwardTime(10, MILLISECONDS); + tracer = callAttemptsTracerFactory.newClientStreamTracer( + STREAM_INFO.toBuilder().setIsTransparentRetry(true).build(), new Metadata()); + record = statsRecorder.pollRecord(); + assertEquals(1, record.tags.size()); + assertEquals( + 1, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_STARTED_COUNT)); + tracer.outboundHeaders(); + tracer.outboundMessage(0); + assertRealTimeMetric( + RpcMeasureConstants.GRPC_CLIENT_SENT_MESSAGES_PER_METHOD, 1, true, true); + tracer.outboundMessage(1); + assertRealTimeMetric( + RpcMeasureConstants.GRPC_CLIENT_SENT_MESSAGES_PER_METHOD, 1, true, true); + tracer.outboundWireSize(1028); + assertRealTimeMetric( + RpcMeasureConstants.GRPC_CLIENT_SENT_BYTES_PER_METHOD, 1028, true, true); + tracer.outboundUncompressedSize(1128); + fakeClock.forwardTime(16, MILLISECONDS); + tracer.inboundMessage(0); + assertRealTimeMetric( + RpcMeasureConstants.GRPC_CLIENT_RECEIVED_MESSAGES_PER_METHOD, 1, true, true); + tracer.inboundWireSize(33); + assertRealTimeMetric( + RpcMeasureConstants.GRPC_CLIENT_RECEIVED_BYTES_PER_METHOD, 33, true, true); + tracer.inboundUncompressedSize(67); + fakeClock.forwardTime(24, MILLISECONDS); + // RPC succeeded + tracer.streamClosed(Status.OK); + callAttemptsTracerFactory.callEnded(Status.OK); + + record = statsRecorder.pollRecord(); + statusTag = record.tags.get(RpcMeasureConstants.GRPC_CLIENT_STATUS); + assertEquals(Status.Code.OK.toString(), statusTag.asString()); + assertEquals( + 1, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_FINISHED_COUNT)); + assertThat(record.getMetric(DeprecatedCensusConstants.RPC_CLIENT_ERROR_COUNT)).isNull(); + assertEquals( + 2, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_REQUEST_COUNT)); + assertEquals( + 1028, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_REQUEST_BYTES)); + assertEquals( + 1128, + record.getMetricAsLongOrFail( + DeprecatedCensusConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES)); + assertEquals( + 1, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_RESPONSE_COUNT)); + assertEquals( + 33, + record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_RESPONSE_BYTES)); + assertEquals( + 67, + record.getMetricAsLongOrFail( + DeprecatedCensusConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES)); + assertEquals( + 16 + 24 , + record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_ROUNDTRIP_LATENCY)); + + record = statsRecorder.pollRecord(); + methodTag = record.tags.get(RpcMeasureConstants.GRPC_CLIENT_METHOD); + assertEquals(method.getFullMethodName(), methodTag.asString()); + statusTag = record.tags.get(RpcMeasureConstants.GRPC_CLIENT_STATUS); + assertEquals(Status.Code.OK.toString(), statusTag.asString()); + assertThat(record.getMetric(RETRIES_PER_CALL)).isEqualTo(1); + assertThat(record.getMetric(TRANSPARENT_RETRIES_PER_CALL)).isEqualTo(2); + assertThat(record.getMetric(RETRY_DELAY_PER_CALL)).isEqualTo(1000D + 10 + 10); + } + private void assertRealTimeMetric( Measure measure, long expectedValue, boolean recordRealTimeMetrics, boolean clientSide) { StatsTestUtils.MetricsRecord record = statsRecorder.pollRecord(); @@ -517,16 +719,28 @@ private void assertRealTimeMetric( assertEquals(expectedValue, record.getMetricAsLongOrFail(measure)); } + private void assertZeroRetryRecorded() { + StatsTestUtils.MetricsRecord record = statsRecorder.pollRecord(); + TagValue methodTag = record.tags.get(RpcMeasureConstants.GRPC_CLIENT_METHOD); + assertEquals(method.getFullMethodName(), methodTag.asString()); + assertThat(record.getMetric(RETRIES_PER_CALL)).isEqualTo(0); + assertThat(record.getMetric(TRANSPARENT_RETRIES_PER_CALL)).isEqualTo(0); + assertThat(record.getMetric(RETRY_DELAY_PER_CALL)).isEqualTo(0D); + } + @Test public void clientBasicTracingDefaultSpan() { - CensusTracingModule.ClientCallTracer callTracer = + CallAttemptsTracerFactory callTracer = censusTracing.newClientCallTracer(null, method); Metadata headers = new Metadata(); ClientStreamTracer clientStreamTracer = callTracer.newClientStreamTracer(STREAM_INFO, headers); clientStreamTracer.streamCreated(Attributes.EMPTY, headers); verify(tracer).spanBuilderWithExplicitParent( eq("Sent.package1.service2.method3"), ArgumentMatchers.isNull()); + verify(tracer).spanBuilderWithExplicitParent( + eq("Attempt.package1.service2.method3"), eq(spyClientSpan)); verify(spyClientSpan, never()).end(any(EndSpanOptions.class)); + verify(spyAttemptSpan, never()).end(any(EndSpanOptions.class)); clientStreamTracer.outboundMessage(0); clientStreamTracer.outboundMessageSent(0, 882, -1); @@ -538,8 +752,12 @@ public void clientBasicTracingDefaultSpan() { clientStreamTracer.streamClosed(Status.OK); callTracer.callEnded(Status.OK); - InOrder inOrder = inOrder(spyClientSpan); - inOrder.verify(spyClientSpan, times(3)).addMessageEvent(messageEventCaptor.capture()); + InOrder inOrder = inOrder(spyClientSpan, spyAttemptSpan); + inOrder.verify(spyAttemptSpan) + .putAttribute("previous-rpc-attempts", AttributeValue.longAttributeValue(0)); + inOrder.verify(spyAttemptSpan) + .putAttribute("transparent-retry", AttributeValue.booleanAttributeValue(false)); + inOrder.verify(spyAttemptSpan, times(3)).addMessageEvent(messageEventCaptor.capture()); List events = messageEventCaptor.getAllValues(); assertEquals( MessageEvent.builder(MessageEvent.Type.SENT, 0).setCompressedMessageSize(882).build(), @@ -553,18 +771,23 @@ public void clientBasicTracingDefaultSpan() { .setUncompressedMessageSize(90) .build(), events.get(2)); + inOrder.verify(spyAttemptSpan).end( + EndSpanOptions.builder() + .setStatus(io.opencensus.trace.Status.OK) + .setSampleToLocalSpanStore(false) + .build()); inOrder.verify(spyClientSpan).end( EndSpanOptions.builder() .setStatus(io.opencensus.trace.Status.OK) .setSampleToLocalSpanStore(false) .build()); - verifyNoMoreInteractions(spyClientSpan); + inOrder.verifyNoMoreInteractions(); verifyNoMoreInteractions(tracer); } @Test public void clientTracingSampledToLocalSpanStore() { - CensusTracingModule.ClientCallTracer callTracer = + CallAttemptsTracerFactory callTracer = censusTracing.newClientCallTracer(null, sampledMethod); callTracer.callEnded(Status.OK); @@ -631,11 +854,12 @@ record = statsRecorder.pollRecord(); 3000, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_ROUNDTRIP_LATENCY)); assertNull(record.getMetric(DeprecatedCensusConstants.RPC_CLIENT_SERVER_ELAPSED_TIME)); + assertZeroRetryRecorded(); } @Test public void clientStreamNeverCreatedStillRecordTracing() { - CensusTracingModule.ClientCallTracer callTracer = + CallAttemptsTracerFactory callTracer = censusTracing.newClientCallTracer(fakeClientParentSpan, method); verify(tracer).spanBuilderWithExplicitParent( eq("Sent.package1.service2.method3"), same(fakeClientParentSpan)); @@ -770,6 +994,7 @@ private void subtestStatsHeadersPropagateTags(boolean propagate, boolean recordS assertNull(clientRecord.getMetric(DeprecatedCensusConstants.RPC_CLIENT_ERROR_COUNT)); TagValue clientPropagatedTag = clientRecord.tags.get(StatsTestUtils.EXTRA_TAG); assertEquals("extra-tag-value-897", clientPropagatedTag.asString()); + assertZeroRetryRecorded(); } if (!recordStats) { @@ -812,16 +1037,18 @@ public void statsHeaderMalformed() { @Test public void traceHeadersPropagateSpanContext() throws Exception { - CensusTracingModule.ClientCallTracer callTracer = + CallAttemptsTracerFactory callTracer = censusTracing.newClientCallTracer(fakeClientParentSpan, method); Metadata headers = new Metadata(); ClientStreamTracer streamTracer = callTracer.newClientStreamTracer(STREAM_INFO, headers); streamTracer.streamCreated(Attributes.EMPTY, headers); - verify(mockTracingPropagationHandler).toByteArray(same(fakeClientSpanContext)); + verify(mockTracingPropagationHandler).toByteArray(same(fakeAttemptSpanContext)); verifyNoMoreInteractions(mockTracingPropagationHandler); verify(tracer).spanBuilderWithExplicitParent( eq("Sent.package1.service2.method3"), same(fakeClientParentSpan)); + verify(tracer).spanBuilderWithExplicitParent( + eq("Attempt.package1.service2.method3"), same(spyClientSpan)); verify(spyClientSpanBuilder).setRecordEvents(eq(true)); verifyNoMoreInteractions(tracer); assertTrue(headers.containsKey(censusTracing.tracingHeader)); @@ -831,7 +1058,7 @@ public void traceHeadersPropagateSpanContext() throws Exception { method.getFullMethodName(), headers); verify(mockTracingPropagationHandler).fromByteArray(same(binarySpanContext)); verify(tracer).spanBuilderWithRemoteParent( - eq("Recv.package1.service2.method3"), same(spyClientSpan.getContext())); + eq("Recv.package1.service2.method3"), same(spyAttemptSpan.getContext())); verify(spyServerSpanBuilder).setRecordEvents(eq(true)); Context filteredContext = serverTracer.filterContext(Context.ROOT); @@ -840,7 +1067,7 @@ public void traceHeadersPropagateSpanContext() throws Exception { @Test public void traceHeaders_propagateSpanContext() throws Exception { - CensusTracingModule.ClientCallTracer callTracer = + CallAttemptsTracerFactory callTracer = censusTracing.newClientCallTracer(fakeClientParentSpan, method); Metadata headers = new Metadata(); @@ -854,10 +1081,12 @@ public void traceHeaders_propagateSpanContext() throws Exception { public void traceHeaders_missingCensusImpl_notPropagateSpanContext() throws Exception { reset(spyClientSpanBuilder); + reset(spyAttemptSpanBuilder); when(spyClientSpanBuilder.startSpan()).thenReturn(BlankSpan.INSTANCE); + when(spyAttemptSpanBuilder.startSpan()).thenReturn(BlankSpan.INSTANCE); Metadata headers = new Metadata(); - CensusTracingModule.ClientCallTracer callTracer = + CallAttemptsTracerFactory callTracer = censusTracing.newClientCallTracer(BlankSpan.INSTANCE, method); callTracer.newClientStreamTracer(STREAM_INFO, headers).streamCreated(Attributes.EMPTY, headers); @@ -867,14 +1096,16 @@ public void traceHeaders_missingCensusImpl_notPropagateSpanContext() @Test public void traceHeaders_clientMissingCensusImpl_preservingHeaders() throws Exception { reset(spyClientSpanBuilder); + reset(spyAttemptSpanBuilder); when(spyClientSpanBuilder.startSpan()).thenReturn(BlankSpan.INSTANCE); + when(spyAttemptSpanBuilder.startSpan()).thenReturn(BlankSpan.INSTANCE); Metadata headers = new Metadata(); headers.put( Metadata.Key.of("never-used-key-bin", Metadata.BINARY_BYTE_MARSHALLER), new byte[] {}); Set originalHeaderKeys = new HashSet<>(headers.keys()); - CensusTracingModule.ClientCallTracer callTracer = + CallAttemptsTracerFactory callTracer = censusTracing.newClientCallTracer(BlankSpan.INSTANCE, method); callTracer.newClientStreamTracer(STREAM_INFO, headers).streamCreated(Attributes.EMPTY, headers); @@ -885,9 +1116,9 @@ public void traceHeaders_clientMissingCensusImpl_preservingHeaders() throws Exce public void traceHeaderMalformed() throws Exception { // As comparison, normal header parsing Metadata headers = new Metadata(); - headers.put(censusTracing.tracingHeader, fakeClientSpanContext); + headers.put(censusTracing.tracingHeader, fakeAttemptSpanContext); // mockTracingPropagationHandler was stubbed to always return fakeServerParentSpanContext - assertSame(spyClientSpan.getContext(), headers.get(censusTracing.tracingHeader)); + assertSame(spyAttemptSpan.getContext(), headers.get(censusTracing.tracingHeader)); // Make BinaryPropagationHandler always throw when parsing the header when(mockTracingPropagationHandler.fromByteArray(any(byte[].class))) @@ -895,7 +1126,7 @@ public void traceHeaderMalformed() throws Exception { headers = new Metadata(); assertNull(headers.get(censusTracing.tracingHeader)); - headers.put(censusTracing.tracingHeader, fakeClientSpanContext); + headers.put(censusTracing.tracingHeader, fakeAttemptSpanContext); assertSame(SpanContext.INVALID, headers.get(censusTracing.tracingHeader)); assertNotSame(spyClientSpan.getContext(), SpanContext.INVALID); diff --git a/core/src/main/java/io/grpc/internal/ClientCallImpl.java b/core/src/main/java/io/grpc/internal/ClientCallImpl.java index 28cd3351203..dd17244e2a5 100644 --- a/core/src/main/java/io/grpc/internal/ClientCallImpl.java +++ b/core/src/main/java/io/grpc/internal/ClientCallImpl.java @@ -255,7 +255,8 @@ public void runInContext() { effectiveDeadline, context.getDeadline(), callOptions.getDeadline()); stream = clientStreamProvider.newStream(method, callOptions, headers, context); } else { - ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(callOptions, headers, false); + ClientStreamTracer[] tracers = + GrpcUtil.getClientStreamTracers(callOptions, headers, 0, false); stream = new FailingClientStream( DEADLINE_EXCEEDED.withDescription( "ClientCall started after deadline exceeded: " + effectiveDeadline), diff --git a/core/src/main/java/io/grpc/internal/GrpcUtil.java b/core/src/main/java/io/grpc/internal/GrpcUtil.java index 0bb83a56d75..13fc786e383 100644 --- a/core/src/main/java/io/grpc/internal/GrpcUtil.java +++ b/core/src/main/java/io/grpc/internal/GrpcUtil.java @@ -757,11 +757,12 @@ public ListenableFuture getStats() { /** Gets stream tracers based on CallOptions. */ public static ClientStreamTracer[] getClientStreamTracers( - CallOptions callOptions, Metadata headers, boolean isTransparentRetry) { + CallOptions callOptions, Metadata headers, int previousAttempts, boolean isTransparentRetry) { List factories = callOptions.getStreamTracerFactories(); ClientStreamTracer[] tracers = new ClientStreamTracer[factories.size() + 1]; StreamInfo streamInfo = StreamInfo.newBuilder() .setCallOptions(callOptions) + .setPreviousAttempts(previousAttempts) .setIsTransparentRetry(isTransparentRetry) .build(); for (int i = 0; i < factories.size(); i++) { diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index 87162d9aba2..6cd5598e2a6 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -533,7 +533,7 @@ public ClientStream newStream( getTransport(new PickSubchannelArgsImpl(method, headers, callOptions)); Context origContext = context.attach(); ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers( - callOptions, headers, /* isTransparentRetry= */ false); + callOptions, headers, 0, /* isTransparentRetry= */ false); try { return transport.newStream(method, headers, callOptions, tracers); } finally { @@ -572,10 +572,11 @@ void postCommit() { @Override ClientStream newSubstream( - Metadata newHeaders, ClientStreamTracer.Factory factory, boolean isTransparentRetry) { + Metadata newHeaders, ClientStreamTracer.Factory factory, int previousAttempts, + boolean isTransparentRetry) { CallOptions newOptions = callOptions.withStreamTracerFactory(factory); - ClientStreamTracer[] tracers = - GrpcUtil.getClientStreamTracers(newOptions, newHeaders, isTransparentRetry); + ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers( + newOptions, newHeaders, previousAttempts, isTransparentRetry); ClientTransport transport = getTransport(new PickSubchannelArgsImpl(method, newHeaders, newOptions)); Context origContext = context.attach(); @@ -624,7 +625,7 @@ ClientStream newSubstream( channelLogger = new ChannelLoggerImpl(channelTracer, timeProvider); ProxyDetector proxyDetector = builder.proxyDetector != null ? builder.proxyDetector : GrpcUtil.DEFAULT_PROXY_DETECTOR; - this.retryEnabled = builder.retryEnabled && !builder.temporarilyDisableRetry; + this.retryEnabled = builder.retryEnabled; this.loadBalancerFactory = new AutoConfiguredLoadBalancerFactory(builder.defaultLbPolicy); this.offloadExecutorHolder = new ExecutorHolder( diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImplBuilder.java b/core/src/main/java/io/grpc/internal/ManagedChannelImplBuilder.java index d42b3832136..cad4ece233e 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImplBuilder.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImplBuilder.java @@ -143,10 +143,6 @@ public static ManagedChannelBuilder forTarget(String target) { long retryBufferSize = DEFAULT_RETRY_BUFFER_SIZE_IN_BYTES; long perRpcBufferLimit = DEFAULT_PER_RPC_BUFFER_LIMIT_IN_BYTES; boolean retryEnabled = false; // TODO(zdapeng): default to true - // Temporarily disable retry when stats or tracing is enabled to avoid breakage, until we know - // what should be the desired behavior for retry + stats/tracing. - // TODO(zdapeng): delete me - boolean temporarilyDisableRetry; InternalChannelz channelz = InternalChannelz.instance(); int maxTraceEvents; @@ -460,8 +456,6 @@ public ManagedChannelImplBuilder disableRetry() { @Override public ManagedChannelImplBuilder enableRetry() { retryEnabled = true; - statsEnabled = false; - tracingEnabled = false; return this; } @@ -592,9 +586,6 @@ public void setStatsRecordRealTimeMetrics(boolean value) { /** * Disable or enable tracing features. Enabled by default. - * - *

For the current release, calling {@code setTracingEnabled(true)} may have a side effect that - * disables retry. */ public void setTracingEnabled(boolean value) { tracingEnabled = value; @@ -642,9 +633,7 @@ public ManagedChannel build() { List getEffectiveInterceptors() { List effectiveInterceptors = new ArrayList<>(this.interceptors); - temporarilyDisableRetry = false; if (statsEnabled) { - temporarilyDisableRetry = true; ClientInterceptor statsInterceptor = null; try { Class censusStatsAccessor = @@ -679,7 +668,6 @@ List getEffectiveInterceptors() { } } if (tracingEnabled) { - temporarilyDisableRetry = true; ClientInterceptor tracingInterceptor = null; try { Class censusTracingAccessor = diff --git a/core/src/main/java/io/grpc/internal/OobChannel.java b/core/src/main/java/io/grpc/internal/OobChannel.java index b628842efe4..589824ae10e 100644 --- a/core/src/main/java/io/grpc/internal/OobChannel.java +++ b/core/src/main/java/io/grpc/internal/OobChannel.java @@ -88,7 +88,7 @@ final class OobChannel extends ManagedChannel implements InternalInstrumented method, CallOptions callOptions, Metadata headers, Context context) { ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers( - callOptions, headers, /* isTransparentRetry= */ false); + callOptions, headers, 0, /* isTransparentRetry= */ false); Context origContext = context.attach(); // delayed transport's newStream() always acquires a lock, but concurrent performance doesn't // matter here because OOB communication should be sparse, and it's not on application RPC's diff --git a/core/src/main/java/io/grpc/internal/RetriableStream.java b/core/src/main/java/io/grpc/internal/RetriableStream.java index d19a260049b..3d277bbe2fc 100644 --- a/core/src/main/java/io/grpc/internal/RetriableStream.java +++ b/core/src/main/java/io/grpc/internal/RetriableStream.java @@ -218,7 +218,7 @@ public ClientStreamTracer newClientStreamTracer( Metadata newHeaders = updateHeaders(headers, previousAttemptCount); // NOTICE: This set _must_ be done before stream.start() and it actually is. - sub.stream = newSubstream(newHeaders, tracerFactory, isTransparentRetry); + sub.stream = newSubstream(newHeaders, tracerFactory, previousAttemptCount, isTransparentRetry); return sub; } @@ -227,7 +227,8 @@ public ClientStreamTracer newClientStreamTracer( * Client stream is not yet started. */ abstract ClientStream newSubstream( - Metadata headers, ClientStreamTracer.Factory tracerFactory, boolean isTransparentRetry); + Metadata headers, ClientStreamTracer.Factory tracerFactory, int previousAttempts, + boolean isTransparentRetry); /** Adds grpc-previous-rpc-attempts in the headers of a retry/hedging RPC. */ @VisibleForTesting @@ -869,24 +870,26 @@ public void run() { synchronized (lock) { scheduledRetry = scheduledRetryCopy = new FutureCanceller(lock); } - scheduledRetryCopy.setFuture( - scheduledExecutorService.schedule( + class RetryBackoffRunnable implements Runnable { + @Override + public void run() { + callExecutor.execute( new Runnable() { @Override public void run() { - callExecutor.execute( - new Runnable() { - @Override - public void run() { - // retry - Substream newSubstream = createSubstream( - substream.previousAttemptCount + 1, - false); - drain(newSubstream); - } - }); + // retry + Substream newSubstream = createSubstream( + substream.previousAttemptCount + 1, + false); + drain(newSubstream); } - }, + }); + } + } + + scheduledRetryCopy.setFuture( + scheduledExecutorService.schedule( + new RetryBackoffRunnable(), retryPlan.backoffNanos, TimeUnit.NANOSECONDS)); return; diff --git a/core/src/main/java/io/grpc/internal/SubchannelChannel.java b/core/src/main/java/io/grpc/internal/SubchannelChannel.java index 1380a6bc716..a1d454ed2fb 100644 --- a/core/src/main/java/io/grpc/internal/SubchannelChannel.java +++ b/core/src/main/java/io/grpc/internal/SubchannelChannel.java @@ -59,7 +59,7 @@ public ClientStream newStream(MethodDescriptor method, transport = notReadyTransport; } ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers( - callOptions, headers, /* isTransparentRetry= */ false); + callOptions, headers, 0, /* isTransparentRetry= */ false); Context origContext = context.attach(); try { return transport.newStream(method, headers, callOptions, tracers); diff --git a/core/src/test/java/io/grpc/internal/RetriableStreamTest.java b/core/src/test/java/io/grpc/internal/RetriableStreamTest.java index 95d2c2ba8b5..c9ea504e18b 100644 --- a/core/src/test/java/io/grpc/internal/RetriableStreamTest.java +++ b/core/src/test/java/io/grpc/internal/RetriableStreamTest.java @@ -164,7 +164,8 @@ void postCommit() { @Override ClientStream newSubstream( - Metadata metadata, ClientStreamTracer.Factory tracerFactory, boolean isTransparentRetry) { + Metadata metadata, ClientStreamTracer.Factory tracerFactory, int previousAttempts, + boolean isTransparentRetry) { bufferSizeTracer = tracerFactory.newClientStreamTracer(STREAM_INFO, metadata); int actualPreviousRpcAttemptsInHeader = metadata.get(GRPC_PREVIOUS_RPC_ATTEMPTS) == null diff --git a/interop-testing/build.gradle b/interop-testing/build.gradle index 79aa5356ecd..852d5882cce 100644 --- a/interop-testing/build.gradle +++ b/interop-testing/build.gradle @@ -43,6 +43,8 @@ dependencies { libraries.netty_tcnative, project(':grpc-grpclb') testImplementation project(':grpc-context').sourceSets.test.output, + project(':grpc-api').sourceSets.test.output, + project(':grpc-core').sourceSets.test.output, libraries.mockito alpnagent libraries.jetty_alpn_agent } diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java index 1b447a63c32..be3e759e2d0 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java @@ -92,6 +92,9 @@ import io.grpc.testing.integration.Messages.StreamingOutputCallRequest; import io.grpc.testing.integration.Messages.StreamingOutputCallResponse; import io.opencensus.contrib.grpc.metrics.RpcMeasureConstants; +import io.opencensus.stats.Measure; +import io.opencensus.stats.Measure.MeasureDouble; +import io.opencensus.stats.Measure.MeasureLong; import io.opencensus.tags.TagKey; import io.opencensus.tags.TagValue; import io.opencensus.trace.Span; @@ -152,6 +155,15 @@ public abstract class AbstractInteropTest { * SETTINGS/WINDOW_UPDATE exchange. */ public static final int TEST_FLOW_CONTROL_WINDOW = 65 * 1024; + private static final MeasureLong RETRIES_PER_CALL = + Measure.MeasureLong.create( + "grpc.io/client/retries_per_call", "Number of retries per call", "1"); + private static final MeasureLong TRANSPARENT_RETRIES_PER_CALL = + Measure.MeasureLong.create( + "grpc.io/client/transparent_retries_per_call", "Transparent retries per call", "1"); + private static final MeasureDouble RETRY_DELAY_PER_CALL = + Measure.MeasureDouble.create( + "grpc.io/client/retry_delay_per_call", "Retry delay per call", "ms"); private static final FakeTagger tagger = new FakeTagger(); private static final FakeTagContextBinarySerializer tagContextBinarySerializer = @@ -1236,6 +1248,7 @@ public void deadlineInPast() throws Exception { checkEndTags( clientEndRecord, "grpc.testing.TestService/EmptyCall", Status.DEADLINE_EXCEEDED.getCode(), true); + assertZeroRetryRecorded(); } // warm up the channel @@ -1245,6 +1258,7 @@ public void deadlineInPast() throws Exception { clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); // clientEndRecord clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); + assertZeroRetryRecorded(); } try { blockingStub @@ -1263,6 +1277,7 @@ public void deadlineInPast() throws Exception { checkEndTags( clientEndRecord, "grpc.testing.TestService/EmptyCall", Status.DEADLINE_EXCEEDED.getCode(), true); + assertZeroRetryRecorded(); } } @@ -1980,6 +1995,13 @@ private void assertStatsTrace(String method, Status.Code status) { assertStatsTrace(method, status, null, null); } + private void assertZeroRetryRecorded() { + MetricsRecord retryRecord = clientStatsRecorder.pollRecord(); + assertThat(retryRecord.getMetric(RETRIES_PER_CALL)).isEqualTo(0); + assertThat(retryRecord.getMetric(TRANSPARENT_RETRIES_PER_CALL)).isEqualTo(0); + assertThat(retryRecord.getMetric(RETRY_DELAY_PER_CALL)).isEqualTo(0D); + } + private void assertClientStatsTrace(String method, Status.Code code, Collection requests, Collection responses) { // Tracer-based stats @@ -2009,6 +2031,7 @@ private void assertClientStatsTrace(String method, Status.Code code, if (requests != null && responses != null) { checkCensus(clientEndRecord, false, requests, responses); } + assertZeroRetryRecorded(); } } diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/RetryTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/RetryTest.java new file mode 100644 index 00000000000..bdf39e8546a --- /dev/null +++ b/interop-testing/src/test/java/io/grpc/testing/integration/RetryTest.java @@ -0,0 +1,450 @@ +/* + * Copyright 2021 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.testing.integration; + +import static com.google.common.truth.Truth.assertThat; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; + +import com.google.common.collect.ImmutableMap; +import io.grpc.Attributes; +import io.grpc.CallOptions; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.ClientStreamTracer; +import io.grpc.ClientStreamTracer.StreamInfo; +import io.grpc.Deadline; +import io.grpc.Deadline.Ticker; +import io.grpc.IntegerMarshaller; +import io.grpc.ManagedChannel; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.MethodDescriptor.MethodType; +import io.grpc.Server; +import io.grpc.ServerCall; +import io.grpc.ServerCall.Listener; +import io.grpc.ServerCallHandler; +import io.grpc.ServerMethodDefinition; +import io.grpc.ServerServiceDefinition; +import io.grpc.Status; +import io.grpc.Status.Code; +import io.grpc.StringMarshaller; +import io.grpc.census.InternalCensusStatsAccessor; +import io.grpc.census.internal.DeprecatedCensusConstants; +import io.grpc.internal.FakeClock; +import io.grpc.internal.testing.StatsTestUtils.FakeStatsRecorder; +import io.grpc.internal.testing.StatsTestUtils.FakeTagContextBinarySerializer; +import io.grpc.internal.testing.StatsTestUtils.FakeTagger; +import io.grpc.internal.testing.StatsTestUtils.MetricsRecord; +import io.grpc.netty.NettyChannelBuilder; +import io.grpc.netty.NettyServerBuilder; +import io.grpc.testing.GrpcCleanupRule; +import io.netty.channel.DefaultEventLoopGroup; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.local.LocalAddress; +import io.netty.channel.local.LocalChannel; +import io.netty.channel.local.LocalServerChannel; +import io.netty.util.concurrent.ScheduledFuture; +import io.opencensus.contrib.grpc.metrics.RpcMeasureConstants; +import io.opencensus.stats.Measure; +import io.opencensus.stats.Measure.MeasureDouble; +import io.opencensus.stats.Measure.MeasureLong; +import io.opencensus.tags.TagValue; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; + +@RunWith(JUnit4.class) +public class RetryTest { + private static final FakeTagger tagger = new FakeTagger(); + private static final FakeTagContextBinarySerializer tagContextBinarySerializer = + new FakeTagContextBinarySerializer(); + private static final MeasureLong RETRIES_PER_CALL = + Measure.MeasureLong.create( + "grpc.io/client/retries_per_call", "Number of retries per call", "1"); + private static final MeasureLong TRANSPARENT_RETRIES_PER_CALL = + Measure.MeasureLong.create( + "grpc.io/client/transparent_retries_per_call", "Transparent retries per call", "1"); + private static final MeasureDouble RETRY_DELAY_PER_CALL = + Measure.MeasureDouble.create( + "grpc.io/client/retry_delay_per_call", "Retry delay per call", "ms"); + + @Rule + public final MockitoRule mocks = MockitoJUnit.rule(); + @Rule + public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule(); + private final FakeClock fakeClock = new FakeClock(); + @Mock + private ClientCall.Listener mockCallListener; + private CountDownLatch backoffLatch = new CountDownLatch(1); + private final EventLoopGroup group = new DefaultEventLoopGroup() { + @SuppressWarnings("FutureReturnValueIgnored") + @Override + public ScheduledFuture schedule( + final Runnable command, final long delay, final TimeUnit unit) { + if (!command.getClass().getName().contains("RetryBackoffRunnable")) { + return super.schedule(command, delay, unit); + } + fakeClock.getScheduledExecutorService().schedule( + new Runnable() { + @Override + public void run() { + group.execute(command); + } + }, + delay, + unit); + backoffLatch.countDown(); + return super.schedule( + new Runnable() { + @Override + public void run() {} // no-op + }, + 0, + TimeUnit.NANOSECONDS); + } + }; + private final FakeStatsRecorder clientStatsRecorder = new FakeStatsRecorder(); + private final ClientInterceptor statsInterceptor = + InternalCensusStatsAccessor.getClientInterceptor( + tagger, tagContextBinarySerializer, clientStatsRecorder, + fakeClock.getStopwatchSupplier(), true, true, true, + /* recordRealTimeMetrics= */ true); + private final MethodDescriptor clientStreamingMethod = + MethodDescriptor.newBuilder() + .setType(MethodType.CLIENT_STREAMING) + .setFullMethodName("service/method") + .setRequestMarshaller(new StringMarshaller()) + .setResponseMarshaller(new IntegerMarshaller()) + .build(); + private final LinkedBlockingQueue> serverCalls = + new LinkedBlockingQueue<>(); + private final ServerMethodDefinition methodDefinition = + ServerMethodDefinition.create( + clientStreamingMethod, + new ServerCallHandler() { + @Override + public Listener startCall(ServerCall call, Metadata headers) { + serverCalls.offer(call); + return new Listener() {}; + } + } + ); + private final ServerServiceDefinition serviceDefinition = + ServerServiceDefinition.builder(clientStreamingMethod.getServiceName()) + .addMethod(methodDefinition) + .build(); + private final LocalAddress localAddress = new LocalAddress(this.getClass().getName()); + private Server localServer; + private ManagedChannel channel; + private Map retryPolicy = null; + private long bufferLimit = 1L << 20; // 1M + + private void startNewServer() throws Exception { + localServer = cleanupRule.register(NettyServerBuilder.forAddress(localAddress) + .channelType(LocalServerChannel.class) + .bossEventLoopGroup(group) + .workerEventLoopGroup(group) + .addService(serviceDefinition) + .build()); + localServer.start(); + } + + private void createNewChannel() { + Map methodConfig = new HashMap<>(); + Map name = new HashMap<>(); + name.put("service", "service"); + methodConfig.put("name", Arrays.asList(name)); + if (retryPolicy != null) { + methodConfig.put("retryPolicy", retryPolicy); + } + Map rawServiceConfig = new HashMap<>(); + rawServiceConfig.put("methodConfig", Arrays.asList(methodConfig)); + channel = cleanupRule.register( + NettyChannelBuilder.forAddress(localAddress) + .channelType(LocalChannel.class) + .eventLoopGroup(group) + .usePlaintext() + .enableRetry() + .perRpcBufferLimit(bufferLimit) + .defaultServiceConfig(rawServiceConfig) + .intercept(statsInterceptor) + .build()); + } + + private void elapseBackoff(long time, TimeUnit unit) throws Exception { + assertThat(backoffLatch.await(5, SECONDS)).isTrue(); + backoffLatch = new CountDownLatch(1); + fakeClock.forwardTime(time, unit); + } + + private void assertRpcStartedRecorded() throws Exception { + MetricsRecord record = clientStatsRecorder.pollRecord(5, SECONDS); + assertThat(record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_STARTED_COUNT)) + .isEqualTo(1); + } + + private void assertOutboundMessageRecorded() throws Exception { + MetricsRecord record = clientStatsRecorder.pollRecord(5, SECONDS); + assertThat( + record.getMetricAsLongOrFail( + RpcMeasureConstants.GRPC_CLIENT_SENT_MESSAGES_PER_METHOD)) + .isEqualTo(1); + } + + private void assertInboundMessageRecorded() throws Exception { + MetricsRecord record = clientStatsRecorder.pollRecord(5, SECONDS); + assertThat( + record.getMetricAsLongOrFail( + RpcMeasureConstants.GRPC_CLIENT_RECEIVED_MESSAGES_PER_METHOD)) + .isEqualTo(1); + } + + private void assertOutboundWireSizeRecorded(long length) throws Exception { + MetricsRecord record = clientStatsRecorder.pollRecord(5, SECONDS); + assertThat(record.getMetricAsLongOrFail(RpcMeasureConstants.GRPC_CLIENT_SENT_BYTES_PER_METHOD)) + .isEqualTo(length); + } + + private void assertInboundWireSizeRecorded(long length) throws Exception { + MetricsRecord record = clientStatsRecorder.pollRecord(5, SECONDS); + assertThat( + record.getMetricAsLongOrFail(RpcMeasureConstants.GRPC_CLIENT_RECEIVED_BYTES_PER_METHOD)) + .isEqualTo(length); + } + + private void assertRpcStatusRecorded( + Status.Code code, long roundtripLatencyMs, long outboundMessages) throws Exception { + MetricsRecord record = clientStatsRecorder.pollRecord(5, SECONDS); + TagValue statusTag = record.tags.get(RpcMeasureConstants.GRPC_CLIENT_STATUS); + assertThat(statusTag.asString()).isEqualTo(code.toString()); + assertThat(record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_FINISHED_COUNT)) + .isEqualTo(1); + assertThat(record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_ROUNDTRIP_LATENCY)) + .isEqualTo(roundtripLatencyMs); + assertThat(record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_REQUEST_COUNT)) + .isEqualTo(outboundMessages); + } + + private void assertRetryStatsRecorded( + int numRetries, int numTransparentRetries, long retryDelayMs) throws Exception { + MetricsRecord record = clientStatsRecorder.pollRecord(5, SECONDS); + assertThat(record.getMetricAsLongOrFail(RETRIES_PER_CALL)).isEqualTo(numRetries); + assertThat(record.getMetricAsLongOrFail(TRANSPARENT_RETRIES_PER_CALL)) + .isEqualTo(numTransparentRetries); + assertThat(record.getMetricAsLongOrFail(RETRY_DELAY_PER_CALL)).isEqualTo(retryDelayMs); + } + + @Test + public void retryUntilBufferLimitExceeded() throws Exception { + String message = "String of length 20."; + + startNewServer(); + bufferLimit = message.length() * 2L - 1; // Can buffer no more than 1 message. + retryPolicy = ImmutableMap.builder() + .put("maxAttempts", 4D) + .put("initialBackoff", "10s") + .put("maxBackoff", "10s") + .put("backoffMultiplier", 1D) + .put("retryableStatusCodes", Arrays.asList("UNAVAILABLE")) + .build(); + createNewChannel(); + ClientCall call = channel.newCall(clientStreamingMethod, CallOptions.DEFAULT); + call.start(mockCallListener, new Metadata()); + call.sendMessage(message); + + ServerCall serverCall = serverCalls.poll(5, SECONDS); + serverCall.request(2); + // trigger retry + serverCall.close( + Status.UNAVAILABLE.withDescription("original attempt failed"), + new Metadata()); + elapseBackoff(10, SECONDS); + // 2nd attempt received + serverCall = serverCalls.poll(5, SECONDS); + serverCall.request(2); + verify(mockCallListener, never()).onClose(any(Status.class), any(Metadata.class)); + // send one more message, should exceed buffer limit + call.sendMessage(message); + // let attempt fail + serverCall.close( + Status.UNAVAILABLE.withDescription("2nd attempt failed"), + new Metadata()); + // no more retry + ArgumentCaptor statusCaptor = ArgumentCaptor.forClass(null); + verify(mockCallListener, timeout(5000)).onClose(statusCaptor.capture(), any(Metadata.class)); + assertThat(statusCaptor.getValue().getDescription()).contains("2nd attempt failed"); + } + + @Test + public void statsRecorded() throws Exception { + startNewServer(); + retryPolicy = ImmutableMap.builder() + .put("maxAttempts", 4D) + .put("initialBackoff", "10s") + .put("maxBackoff", "10s") + .put("backoffMultiplier", 1D) + .put("retryableStatusCodes", Arrays.asList("UNAVAILABLE")) + .build(); + createNewChannel(); + + ClientCall call = channel.newCall(clientStreamingMethod, CallOptions.DEFAULT); + call.start(mockCallListener, new Metadata()); + assertRpcStartedRecorded(); + String message = "String of length 20."; + call.sendMessage(message); + assertOutboundMessageRecorded(); + ServerCall serverCall = serverCalls.poll(5, SECONDS); + serverCall.request(2); + assertOutboundWireSizeRecorded(message.length()); + // original attempt latency + fakeClock.forwardTime(1, SECONDS); + // trigger retry + serverCall.close( + Status.UNAVAILABLE.withDescription("original attempt failed"), + new Metadata()); + assertRpcStatusRecorded(Status.Code.UNAVAILABLE, 1000, 1); + elapseBackoff(10, SECONDS); + assertRpcStartedRecorded(); + assertOutboundMessageRecorded(); + serverCall = serverCalls.poll(5, SECONDS); + serverCall.request(2); + assertOutboundWireSizeRecorded(message.length()); + message = "new message"; + call.sendMessage(message); + assertOutboundMessageRecorded(); + assertOutboundWireSizeRecorded(message.length()); + // retry attempt latency + fakeClock.forwardTime(2, SECONDS); + serverCall.sendHeaders(new Metadata()); + serverCall.sendMessage(3); + call.request(1); + assertInboundMessageRecorded(); + assertInboundWireSizeRecorded(1); + serverCall.close(Status.OK, new Metadata()); + assertRpcStatusRecorded(Status.Code.OK, 2000, 2); + assertRetryStatsRecorded(1, 0, 10_000); + } + + @Test + public void serverCancelledAndClientDeadlineExceeded() throws Exception { + startNewServer(); + createNewChannel(); + + class CloseDelayedTracer extends ClientStreamTracer { + @Override + public void streamClosed(Status status) { + fakeClock.forwardTime(10, SECONDS); + } + } + + class CloseDelayedTracerFactory extends ClientStreamTracer.InternalLimitedInfoFactory { + @Override + public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) { + return new CloseDelayedTracer(); + } + } + + CallOptions callOptions = CallOptions.DEFAULT + .withDeadline(Deadline.after( + 10, + SECONDS, + new Ticker() { + @Override + public long nanoTime() { + return fakeClock.getTicker().read(); + } + })) + .withStreamTracerFactory(new CloseDelayedTracerFactory()); + ClientCall call = channel.newCall(clientStreamingMethod, callOptions); + call.start(mockCallListener, new Metadata()); + assertRpcStartedRecorded(); + ServerCall serverCall = serverCalls.poll(5, SECONDS); + serverCall.close(Status.CANCELLED, new Metadata()); + assertRpcStatusRecorded(Code.DEADLINE_EXCEEDED, 10_000, 0); + assertRetryStatsRecorded(0, 0, 0); + } + + @Ignore("flaky because old transportReportStatus() is not completely migrated yet") + @Test + public void transparentRetryStatsRecorded() throws Exception { + startNewServer(); + createNewChannel(); + + final AtomicBoolean transparentRetryTriggered = new AtomicBoolean(); + class TransparentRetryTriggeringTracer extends ClientStreamTracer { + + @Override + public void streamCreated(Attributes transportAttrs, Metadata metadata) { + if (transparentRetryTriggered.get()) { + return; + } + localServer.shutdownNow(); + } + + @Override + public void streamClosed(Status status) { + if (transparentRetryTriggered.get()) { + return; + } + transparentRetryTriggered.set(true); + try { + startNewServer(); + channel.resetConnectBackoff(); + channel.getState(true); + } catch (Exception e) { + throw new AssertionError("local server can not be restarted", e); + } + } + } + + class TransparentRetryTracerFactory extends ClientStreamTracer.InternalLimitedInfoFactory { + @Override + public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) { + return new TransparentRetryTriggeringTracer(); + } + } + + CallOptions callOptions = CallOptions.DEFAULT + .withWaitForReady() + .withStreamTracerFactory(new TransparentRetryTracerFactory()); + ClientCall call = channel.newCall(clientStreamingMethod, callOptions); + call.start(mockCallListener, new Metadata()); + assertRpcStartedRecorded(); + assertRpcStatusRecorded(Code.UNAVAILABLE, 0, 0); + assertRpcStartedRecorded(); + call.cancel("cancel", null); + assertRpcStatusRecorded(Code.CANCELLED, 0, 0); + assertRetryStatsRecorded(0, 1, 0); + } +} From 969a7b5186f9fb14961d1d98bc18e0386c46c6a9 Mon Sep 17 00:00:00 2001 From: ZHANG Dapeng Date: Wed, 11 Aug 2021 10:25:57 -0700 Subject: [PATCH 2/4] core: fix retry flow control issue (#8401) There has been an issue about flow control when retry is enabled. Currently we call `masterListener.onReady()` whenever `substreamListener.onReady()` is called. The user's `onReady()` implementation might do ``` while(observer.isReady()) { // send one more message. } ``` However, currently if the `RetriableStream` is still draining, `isReady()` is false, and user's `onReady()` exits immediately. And because `substreamListener.onReady()` is already called, it may not be called again after drained. This PR fixes the issue by - Use a SerializeExecutor to call all `masterListener` callbacks. - Once `RetriableStream` is drained, check `isReady()` and if so call `onReady()`. - Once `substreamListener.onReady()` is called, check `isReady()` and only if so we call `masterListener.onReady()`. --- .../io/grpc/internal/RetriableStream.java | 100 +++++++++++++++--- .../io/grpc/internal/RetriableStreamTest.java | 88 ++++++++++++++- 2 files changed, 170 insertions(+), 18 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/RetriableStream.java b/core/src/main/java/io/grpc/internal/RetriableStream.java index 3d277bbe2fc..1fb8d3c43bd 100644 --- a/core/src/main/java/io/grpc/internal/RetriableStream.java +++ b/core/src/main/java/io/grpc/internal/RetriableStream.java @@ -30,8 +30,10 @@ import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.Status; +import io.grpc.SynchronizationContext; import io.grpc.internal.ClientStreamListener.RpcProgress; import java.io.InputStream; +import java.lang.Thread.UncaughtExceptionHandler; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -64,6 +66,16 @@ abstract class RetriableStream implements ClientStream { private final MethodDescriptor method; private final Executor callExecutor; + private final Executor listenerSerializeExecutor = new SynchronizationContext( + new UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + throw Status.fromThrowable(e) + .withDescription("Uncaught exception in the SynchronizationContext. Re-thrown.") + .asRuntimeException(); + } + } + ); private final ScheduledExecutorService scheduledExecutorService; // Must not modify it. private final Metadata headers; @@ -105,6 +117,7 @@ abstract class RetriableStream implements ClientStream { private FutureCanceller scheduledHedging; private long nextBackoffIntervalNanos; private Status cancellationStatus; + private boolean isClosed; RetriableStream( MethodDescriptor method, Metadata headers, @@ -247,6 +260,7 @@ private void drain(Substream substream) { int chunk = 0x80; List list = null; boolean streamStarted = false; + Runnable onReadyRunnable = null; while (true) { State savedState; @@ -264,7 +278,18 @@ private void drain(Substream substream) { } if (index == savedState.buffer.size()) { // I'm drained state = savedState.substreamDrained(substream); - return; + if (!isReady()) { + return; + } + onReadyRunnable = new Runnable() { + @Override + public void run() { + if (!isClosed) { + masterListener.onReady(); + } + } + }; + break; } if (substream.closed) { @@ -299,6 +324,11 @@ private void drain(Substream substream) { } } + if (onReadyRunnable != null) { + listenerSerializeExecutor.execute(onReadyRunnable); + return; + } + substream.stream.cancel( state.winningSubstream == substream ? cancellationStatus : CANCELLED_BECAUSE_COMMITTED); } @@ -450,14 +480,22 @@ public void run() { } @Override - public final void cancel(Status reason) { + public final void cancel(final Status reason) { Substream noopSubstream = new Substream(0 /* previousAttempts doesn't matter here */); noopSubstream.stream = new NoopClientStream(); Runnable runnable = commit(noopSubstream); if (runnable != null) { - masterListener.closed(reason, RpcProgress.PROCESSED, new Metadata()); runnable.run(); + listenerSerializeExecutor.execute( + new Runnable() { + @Override + public void run() { + isClosed = true; + masterListener.closed(reason, RpcProgress.PROCESSED, new Metadata()); + + } + }); return; } @@ -771,18 +809,25 @@ private final class Sublistener implements ClientStreamListener { } @Override - public void headersRead(Metadata headers) { + public void headersRead(final Metadata headers) { commitAndRun(substream); if (state.winningSubstream == substream) { - masterListener.headersRead(headers); if (throttle != null) { throttle.onSuccess(); } + listenerSerializeExecutor.execute( + new Runnable() { + @Override + public void run() { + masterListener.headersRead(headers); + } + }); } } @Override - public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) { + public void closed( + final Status status, final RpcProgress rpcProgress, final Metadata trailers) { synchronized (lock) { state = state.substreamClosed(substream); closedSubstreamsInsight.append(status.getCode()); @@ -793,7 +838,14 @@ public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) { if (substream.bufferLimitExceeded) { commitAndRun(substream); if (state.winningSubstream == substream) { - masterListener.closed(status, rpcProgress, trailers); + listenerSerializeExecutor.execute( + new Runnable() { + @Override + public void run() { + isClosed = true; + masterListener.closed(status, rpcProgress, trailers); + } + }); } return; } @@ -900,7 +952,14 @@ public void run() { commitAndRun(substream); if (state.winningSubstream == substream) { - masterListener.closed(status, rpcProgress, trailers); + listenerSerializeExecutor.execute( + new Runnable() { + @Override + public void run() { + isClosed = true; + masterListener.closed(status, rpcProgress, trailers); + } + }); } } @@ -970,22 +1029,37 @@ private Integer getPushbackMills(Metadata trailer) { } @Override - public void messagesAvailable(MessageProducer producer) { + public void messagesAvailable(final MessageProducer producer) { State savedState = state; checkState( savedState.winningSubstream != null, "Headers should be received prior to messages."); if (savedState.winningSubstream != substream) { return; } - masterListener.messagesAvailable(producer); + listenerSerializeExecutor.execute( + new Runnable() { + @Override + public void run() { + masterListener.messagesAvailable(producer); + } + }); } @Override public void onReady() { // FIXME(#7089): hedging case is broken. - // TODO(zdapeng): optimization: if the substream is not drained yet, delay onReady() once - // drained and if is still ready. - masterListener.onReady(); + if (!isReady()) { + return; + } + listenerSerializeExecutor.execute( + new Runnable() { + @Override + public void run() { + if (!isClosed) { + masterListener.onReady(); + } + } + }); } } diff --git a/core/src/test/java/io/grpc/internal/RetriableStreamTest.java b/core/src/test/java/io/grpc/internal/RetriableStreamTest.java index c9ea504e18b..8b851573b21 100644 --- a/core/src/test/java/io/grpc/internal/RetriableStreamTest.java +++ b/core/src/test/java/io/grpc/internal/RetriableStreamTest.java @@ -256,6 +256,7 @@ public Void answer(InvocationOnMock in) { ArgumentCaptor sublistenerCaptor1 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream1).start(sublistenerCaptor1.capture()); + inOrder.verify(mockStream1).isReady(); inOrder.verifyNoMoreInteractions(); retriableStream.sendMessage("msg1"); @@ -308,6 +309,7 @@ public Void answer(InvocationOnMock in) { inOrder.verify(mockStream2).writeMessage(any(InputStream.class)); inOrder.verify(mockStream2).request(456); inOrder.verify(mockStream2, times(2)).writeMessage(any(InputStream.class)); + inOrder.verify(mockStream2).isReady(); inOrder.verifyNoMoreInteractions(); // send more messages @@ -356,6 +358,7 @@ public Void answer(InvocationOnMock in) { inOrder.verify(mockStream3).writeMessage(any(InputStream.class)); inOrder.verify(mockStream3).request(456); inOrder.verify(mockStream3, times(7)).writeMessage(any(InputStream.class)); + inOrder.verify(mockStream3).isReady(); inOrder.verifyNoMoreInteractions(); InsightBuilder insight = new InsightBuilder(); @@ -637,6 +640,7 @@ public void retry_cancelWhileBackoff() { ArgumentCaptor sublistenerCaptor1 = ArgumentCaptor.forClass(ClientStreamListener.class); verify(mockStream1).start(sublistenerCaptor1.capture()); + verify(mockStream1).isReady(); // retry ClientStream mockStream2 = mock(ClientStream.class); @@ -656,7 +660,7 @@ public void retry_cancelWhileBackoff() { @Test public void operationsWhileDraining() { - ArgumentCaptor sublistenerCaptor1 = + final ArgumentCaptor sublistenerCaptor1 = ArgumentCaptor.forClass(ClientStreamListener.class); final AtomicReference sublistenerCaptor2 = new AtomicReference<>(); @@ -669,10 +673,16 @@ public void operationsWhileDraining() { @Override public void request(int numMessages) { retriableStream.sendMessage("substream1 request " + numMessages); + sublistenerCaptor1.getValue().onReady(); if (numMessages > 1) { retriableStream.request(--numMessages); } } + + @Override + public boolean isReady() { + return true; + } })); final ClientStream mockStream2 = @@ -688,7 +698,7 @@ public void start(ClientStreamListener listener) { @Override public void request(int numMessages) { retriableStream.sendMessage("substream2 request " + numMessages); - + sublistenerCaptor2.get().onReady(); if (numMessages == 3) { sublistenerCaptor2.get().headersRead(new Metadata()); } @@ -699,9 +709,14 @@ public void request(int numMessages) { retriableStream.cancel(cancelStatus); } } + + @Override + public boolean isReady() { + return true; + } })); - InOrder inOrder = inOrder(retriableStreamRecorder, mockStream1, mockStream2); + InOrder inOrder = inOrder(retriableStreamRecorder, mockStream1, mockStream2, masterListener); doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0); retriableStream.start(masterListener); @@ -716,6 +731,7 @@ public void request(int numMessages) { inOrder.verify(mockStream1).writeMessage(any(InputStream.class)); // msg "substream1 request 2" inOrder.verify(mockStream1).request(1); inOrder.verify(mockStream1).writeMessage(any(InputStream.class)); // msg "substream1 request 1" + inOrder.verify(masterListener).onReady(); // retry doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1); @@ -743,8 +759,8 @@ public void request(int numMessages) { // msg "substream2 request 2" inOrder.verify(mockStream2, times(2)).writeMessage(any(InputStream.class)); inOrder.verify(mockStream2).request(100); - - verify(mockStream2).cancel(cancelStatus); + inOrder.verify(mockStream2).cancel(cancelStatus); + inOrder.verify(masterListener, never()).onReady(); // "substream2 request 1" will never be sent inOrder.verify(mockStream2, never()).writeMessage(any(InputStream.class)); @@ -1073,6 +1089,7 @@ public void perRpcBufferLimitExceededDuringBackoff() { ArgumentCaptor sublistenerCaptor1 = ArgumentCaptor.forClass(ClientStreamListener.class); verify(mockStream1).start(sublistenerCaptor1.capture()); + verify(mockStream1).isReady(); bufferSizeTracer.outboundWireSize(PER_RPC_BUFFER_LIMIT - 1); @@ -1089,6 +1106,7 @@ public void perRpcBufferLimitExceededDuringBackoff() { fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS); verify(mockStream2).start(any(ClientStreamListener.class)); + verify(mockStream2).isReady(); // bufferLimitExceeded bufferSizeTracer.outboundWireSize(PER_RPC_BUFFER_LIMIT - 1); @@ -1152,6 +1170,7 @@ public void expBackoff_maxBackoff_maxRetryAttempts() { ArgumentCaptor sublistenerCaptor1 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream1).start(sublistenerCaptor1.capture()); + inOrder.verify(mockStream1).isReady(); inOrder.verifyNoMoreInteractions(); @@ -1167,6 +1186,7 @@ public void expBackoff_maxBackoff_maxRetryAttempts() { ArgumentCaptor sublistenerCaptor2 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream2).start(sublistenerCaptor2.capture()); + inOrder.verify(mockStream2).isReady(); inOrder.verifyNoMoreInteractions(); // retry2 @@ -1183,6 +1203,7 @@ public void expBackoff_maxBackoff_maxRetryAttempts() { ArgumentCaptor sublistenerCaptor3 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream3).start(sublistenerCaptor3.capture()); + inOrder.verify(mockStream3).isReady(); inOrder.verifyNoMoreInteractions(); // retry3 @@ -1200,6 +1221,7 @@ public void expBackoff_maxBackoff_maxRetryAttempts() { ArgumentCaptor sublistenerCaptor4 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream4).start(sublistenerCaptor4.capture()); + inOrder.verify(mockStream4).isReady(); inOrder.verifyNoMoreInteractions(); // retry4 @@ -1214,6 +1236,7 @@ public void expBackoff_maxBackoff_maxRetryAttempts() { ArgumentCaptor sublistenerCaptor5 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream5).start(sublistenerCaptor5.capture()); + inOrder.verify(mockStream5).isReady(); inOrder.verifyNoMoreInteractions(); // retry5 @@ -1228,6 +1251,7 @@ public void expBackoff_maxBackoff_maxRetryAttempts() { ArgumentCaptor sublistenerCaptor6 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream6).start(sublistenerCaptor6.capture()); + inOrder.verify(mockStream6).isReady(); inOrder.verifyNoMoreInteractions(); // can not retry any more @@ -1258,6 +1282,7 @@ public void pushback() { ArgumentCaptor sublistenerCaptor1 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream1).start(sublistenerCaptor1.capture()); + inOrder.verify(mockStream1).isReady(); inOrder.verifyNoMoreInteractions(); @@ -1276,6 +1301,7 @@ public void pushback() { ArgumentCaptor sublistenerCaptor2 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream2).start(sublistenerCaptor2.capture()); + inOrder.verify(mockStream2).isReady(); inOrder.verifyNoMoreInteractions(); // retry2 @@ -1293,6 +1319,7 @@ public void pushback() { ArgumentCaptor sublistenerCaptor3 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream3).start(sublistenerCaptor3.capture()); + inOrder.verify(mockStream3).isReady(); inOrder.verifyNoMoreInteractions(); // retry3 @@ -1307,6 +1334,7 @@ public void pushback() { ArgumentCaptor sublistenerCaptor4 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream4).start(sublistenerCaptor4.capture()); + inOrder.verify(mockStream4).isReady(); inOrder.verifyNoMoreInteractions(); // retry4 @@ -1323,6 +1351,7 @@ public void pushback() { ArgumentCaptor sublistenerCaptor5 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream5).start(sublistenerCaptor5.capture()); + inOrder.verify(mockStream5).isReady(); inOrder.verifyNoMoreInteractions(); // retry5 @@ -1340,6 +1369,7 @@ public void pushback() { ArgumentCaptor sublistenerCaptor6 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream6).start(sublistenerCaptor6.capture()); + inOrder.verify(mockStream6).isReady(); inOrder.verifyNoMoreInteractions(); // can not retry any more even pushback is positive @@ -1597,6 +1627,7 @@ public void transparentRetry() { ArgumentCaptor sublistenerCaptor1 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream1).start(sublistenerCaptor1.capture()); + inOrder.verify(mockStream1).isReady(); inOrder.verifyNoMoreInteractions(); // transparent retry @@ -1608,6 +1639,7 @@ public void transparentRetry() { ArgumentCaptor sublistenerCaptor2 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream2).start(sublistenerCaptor2.capture()); + inOrder.verify(mockStream2).isReady(); inOrder.verifyNoMoreInteractions(); verify(retriableStreamRecorder, never()).postCommit(); assertEquals(0, fakeClock.numPendingTasks()); @@ -1623,6 +1655,7 @@ public void transparentRetry() { ArgumentCaptor sublistenerCaptor3 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream3).start(sublistenerCaptor3.capture()); + inOrder.verify(mockStream3).isReady(); inOrder.verifyNoMoreInteractions(); verify(retriableStreamRecorder, never()).postCommit(); assertEquals(0, fakeClock.numPendingTasks()); @@ -1645,6 +1678,7 @@ public void normalRetry_thenNoTransparentRetry_butNormalRetry() { ArgumentCaptor sublistenerCaptor1 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream1).start(sublistenerCaptor1.capture()); + inOrder.verify(mockStream1).isReady(); inOrder.verifyNoMoreInteractions(); // normal retry @@ -1658,6 +1692,7 @@ public void normalRetry_thenNoTransparentRetry_butNormalRetry() { ArgumentCaptor sublistenerCaptor2 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream2).start(sublistenerCaptor2.capture()); + inOrder.verify(mockStream2).isReady(); inOrder.verifyNoMoreInteractions(); verify(retriableStreamRecorder, never()).postCommit(); assertEquals(0, fakeClock.numPendingTasks()); @@ -1674,6 +1709,7 @@ public void normalRetry_thenNoTransparentRetry_butNormalRetry() { ArgumentCaptor sublistenerCaptor3 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream3).start(sublistenerCaptor3.capture()); + inOrder.verify(mockStream3).isReady(); inOrder.verifyNoMoreInteractions(); verify(retriableStreamRecorder, never()).postCommit(); } @@ -1695,6 +1731,7 @@ public void normalRetry_thenNoTransparentRetry_andNoMoreRetry() { ArgumentCaptor sublistenerCaptor1 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream1).start(sublistenerCaptor1.capture()); + inOrder.verify(mockStream1).isReady(); inOrder.verifyNoMoreInteractions(); // normal retry @@ -1708,6 +1745,7 @@ public void normalRetry_thenNoTransparentRetry_andNoMoreRetry() { ArgumentCaptor sublistenerCaptor2 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream2).start(sublistenerCaptor2.capture()); + inOrder.verify(mockStream2).isReady(); inOrder.verifyNoMoreInteractions(); verify(retriableStreamRecorder, never()).postCommit(); assertEquals(0, fakeClock.numPendingTasks()); @@ -1738,6 +1776,7 @@ method, new Metadata(), channelBufferUsed, PER_RPC_BUFFER_LIMIT, CHANNEL_BUFFER_ ArgumentCaptor sublistenerCaptor1 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream1).start(sublistenerCaptor1.capture()); + inOrder.verify(mockStream1).isReady(); inOrder.verifyNoMoreInteractions(); // transparent retry @@ -1750,6 +1789,7 @@ method, new Metadata(), channelBufferUsed, PER_RPC_BUFFER_LIMIT, CHANNEL_BUFFER_ ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(retriableStreamRecorder).postCommit(); inOrder.verify(mockStream2).start(sublistenerCaptor2.capture()); + inOrder.verify(mockStream2).isReady(); inOrder.verifyNoMoreInteractions(); assertEquals(0, fakeClock.numPendingTasks()); } @@ -1768,6 +1808,7 @@ public void droppedShouldNeverRetry() { ArgumentCaptor sublistenerCaptor1 = ArgumentCaptor.forClass(ClientStreamListener.class); verify(mockStream1).start(sublistenerCaptor1.capture()); + verify(mockStream1).isReady(); // drop and verify no retry Status status = Status.fromCode(RETRIABLE_STATUS_CODE_1); @@ -1839,6 +1880,7 @@ public Void answer(InvocationOnMock in) { ArgumentCaptor sublistenerCaptor1 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream1).start(sublistenerCaptor1.capture()); + inOrder.verify(mockStream1).isReady(); inOrder.verifyNoMoreInteractions(); hedgingStream.sendMessage("msg1"); @@ -1880,6 +1922,8 @@ public Void answer(InvocationOnMock in) { inOrder.verify(mockStream2, times(2)).flush(); inOrder.verify(mockStream2).writeMessage(any(InputStream.class)); inOrder.verify(mockStream2).request(456); + inOrder.verify(mockStream1).isReady(); + inOrder.verify(mockStream2).isReady(); inOrder.verifyNoMoreInteractions(); // send more messages @@ -1917,6 +1961,9 @@ public Void answer(InvocationOnMock in) { inOrder.verify(mockStream3).writeMessage(any(InputStream.class)); inOrder.verify(mockStream3).request(456); inOrder.verify(mockStream3, times(2)).writeMessage(any(InputStream.class)); + inOrder.verify(mockStream1).isReady(); + inOrder.verify(mockStream2).isReady(); + inOrder.verify(mockStream3).isReady(); inOrder.verifyNoMoreInteractions(); // send one more message @@ -1959,6 +2006,9 @@ public Void answer(InvocationOnMock in) { inOrder.verify(mockStream4).writeMessage(any(InputStream.class)); inOrder.verify(mockStream4).request(456); inOrder.verify(mockStream4, times(4)).writeMessage(any(InputStream.class)); + inOrder.verify(mockStream1).isReady(); + inOrder.verify(mockStream2).isReady(); + inOrder.verify(mockStream4).isReady(); inOrder.verifyNoMoreInteractions(); InsightBuilder insight = new InsightBuilder(); @@ -2009,6 +2059,7 @@ public void hedging_maxAttempts() { ArgumentCaptor sublistenerCaptor1 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream1).start(sublistenerCaptor1.capture()); + inOrder.verify(mockStream1).isReady(); inOrder.verifyNoMoreInteractions(); fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS); @@ -2016,6 +2067,7 @@ public void hedging_maxAttempts() { ArgumentCaptor sublistenerCaptor2 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream2).start(sublistenerCaptor2.capture()); + inOrder.verify(mockStream2).isReady(); inOrder.verifyNoMoreInteractions(); fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS); @@ -2023,6 +2075,7 @@ public void hedging_maxAttempts() { ArgumentCaptor sublistenerCaptor3 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream3).start(sublistenerCaptor3.capture()); + inOrder.verify(mockStream3).isReady(); inOrder.verifyNoMoreInteractions(); fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS); @@ -2030,6 +2083,7 @@ public void hedging_maxAttempts() { ArgumentCaptor sublistenerCaptor4 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream4).start(sublistenerCaptor4.capture()); + inOrder.verify(mockStream4).isReady(); inOrder.verifyNoMoreInteractions(); // a random one of the hedges fails @@ -2041,6 +2095,7 @@ public void hedging_maxAttempts() { ArgumentCaptor sublistenerCaptor5 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream5).start(sublistenerCaptor5.capture()); + inOrder.verify(mockStream5).isReady(); inOrder.verifyNoMoreInteractions(); fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS); @@ -2048,6 +2103,7 @@ public void hedging_maxAttempts() { ArgumentCaptor sublistenerCaptor6 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream6).start(sublistenerCaptor6.capture()); + inOrder.verify(mockStream6).isReady(); inOrder.verifyNoMoreInteractions(); fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS); @@ -2092,6 +2148,7 @@ public void hedging_receiveHeaders() { ArgumentCaptor sublistenerCaptor1 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream1).start(sublistenerCaptor1.capture()); + inOrder.verify(mockStream1).isReady(); inOrder.verifyNoMoreInteractions(); fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS); @@ -2099,6 +2156,7 @@ public void hedging_receiveHeaders() { ArgumentCaptor sublistenerCaptor2 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream2).start(sublistenerCaptor2.capture()); + inOrder.verify(mockStream2).isReady(); inOrder.verifyNoMoreInteractions(); fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS); @@ -2106,6 +2164,7 @@ public void hedging_receiveHeaders() { ArgumentCaptor sublistenerCaptor3 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream3).start(sublistenerCaptor3.capture()); + inOrder.verify(mockStream3).isReady(); inOrder.verifyNoMoreInteractions(); // a random one of the hedges receives headers @@ -2143,6 +2202,7 @@ public void hedging_pushback_negative() { ArgumentCaptor sublistenerCaptor1 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream1).start(sublistenerCaptor1.capture()); + inOrder.verify(mockStream1).isReady(); inOrder.verifyNoMoreInteractions(); fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS); @@ -2150,6 +2210,7 @@ public void hedging_pushback_negative() { ArgumentCaptor sublistenerCaptor2 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream2).start(sublistenerCaptor2.capture()); + inOrder.verify(mockStream2).isReady(); inOrder.verifyNoMoreInteractions(); fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS); @@ -2157,6 +2218,7 @@ public void hedging_pushback_negative() { ArgumentCaptor sublistenerCaptor3 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream3).start(sublistenerCaptor3.capture()); + inOrder.verify(mockStream3).isReady(); inOrder.verifyNoMoreInteractions(); // a random one of the hedges receives a negative pushback @@ -2188,6 +2250,7 @@ public void hedging_pushback_positive() { ArgumentCaptor sublistenerCaptor1 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream1).start(sublistenerCaptor1.capture()); + inOrder.verify(mockStream1).isReady(); inOrder.verifyNoMoreInteractions(); fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS); @@ -2195,6 +2258,7 @@ public void hedging_pushback_positive() { ArgumentCaptor sublistenerCaptor2 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream2).start(sublistenerCaptor2.capture()); + inOrder.verify(mockStream2).isReady(); inOrder.verifyNoMoreInteractions(); @@ -2212,6 +2276,7 @@ public void hedging_pushback_positive() { ArgumentCaptor sublistenerCaptor3 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream3).start(sublistenerCaptor3.capture()); + inOrder.verify(mockStream3).isReady(); inOrder.verifyNoMoreInteractions(); // hedge2 receives a pushback for HEDGING_DELAY_IN_SECONDS - 1 second @@ -2225,6 +2290,7 @@ public void hedging_pushback_positive() { ArgumentCaptor sublistenerCaptor4 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream4).start(sublistenerCaptor4.capture()); + inOrder.verify(mockStream4).isReady(); inOrder.verifyNoMoreInteractions(); // commit @@ -2254,6 +2320,7 @@ public void hedging_cancelled() { ArgumentCaptor sublistenerCaptor1 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream1).start(sublistenerCaptor1.capture()); + inOrder.verify(mockStream1).isReady(); inOrder.verifyNoMoreInteractions(); fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS); @@ -2261,6 +2328,8 @@ public void hedging_cancelled() { ArgumentCaptor sublistenerCaptor2 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream2).start(sublistenerCaptor2.capture()); + inOrder.verify(mockStream1).isReady(); + inOrder.verify(mockStream2).isReady(); inOrder.verifyNoMoreInteractions(); Status status = Status.CANCELLED.withDescription("cancelled"); @@ -2275,6 +2344,8 @@ public void hedging_cancelled() { assertEquals(CANCELLED_BECAUSE_COMMITTED, statusCaptor.getValue().getDescription()); inOrder.verify(retriableStreamRecorder).postCommit(); + inOrder.verify(masterListener).closed( + any(Status.class), any(RpcProgress.class), any(Metadata.class)); inOrder.verifyNoMoreInteractions(); } @@ -2289,6 +2360,7 @@ public void hedging_perRpcBufferLimitExceeded() { ArgumentCaptor sublistenerCaptor1 = ArgumentCaptor.forClass(ClientStreamListener.class); verify(mockStream1).start(sublistenerCaptor1.capture()); + verify(mockStream1).isReady(); ClientStreamTracer bufferSizeTracer1 = bufferSizeTracer; bufferSizeTracer1.outboundWireSize(PER_RPC_BUFFER_LIMIT - 1); @@ -2297,6 +2369,8 @@ public void hedging_perRpcBufferLimitExceeded() { ArgumentCaptor sublistenerCaptor2 = ArgumentCaptor.forClass(ClientStreamListener.class); verify(mockStream2).start(sublistenerCaptor2.capture()); + verify(mockStream1, times(2)).isReady(); + verify(mockStream2).isReady(); ClientStreamTracer bufferSizeTracer2 = bufferSizeTracer; bufferSizeTracer2.outboundWireSize(PER_RPC_BUFFER_LIMIT - 1); @@ -2313,6 +2387,7 @@ public void hedging_perRpcBufferLimitExceeded() { verify(retriableStreamRecorder).postCommit(); verifyNoMoreInteractions(mockStream1); + verify(mockStream2).isReady(); verifyNoMoreInteractions(mockStream2); } @@ -2327,6 +2402,7 @@ public void hedging_channelBufferLimitExceeded() { ArgumentCaptor sublistenerCaptor1 = ArgumentCaptor.forClass(ClientStreamListener.class); verify(mockStream1).start(sublistenerCaptor1.capture()); + verify(mockStream1).isReady(); ClientStreamTracer bufferSizeTracer1 = bufferSizeTracer; bufferSizeTracer1.outboundWireSize(100); @@ -2335,6 +2411,8 @@ public void hedging_channelBufferLimitExceeded() { ArgumentCaptor sublistenerCaptor2 = ArgumentCaptor.forClass(ClientStreamListener.class); verify(mockStream2).start(sublistenerCaptor2.capture()); + verify(mockStream1, times(2)).isReady(); + verify(mockStream2).isReady(); ClientStreamTracer bufferSizeTracer2 = bufferSizeTracer; bufferSizeTracer2.outboundWireSize(100); From 61b5ae97639ec487da3a1059b0a9456a3096067c Mon Sep 17 00:00:00 2001 From: ZHANG Dapeng Date: Wed, 11 Aug 2021 14:44:23 -0700 Subject: [PATCH 3/4] core: enable retry by default (#8402) Stabilize `enableRetry()` and `disableRetry()`. Disable retry in `ManagedChannelImplTest` because each call attempt will fork the headers to a new instance, and add a ClientStreamTracer.Factory for bufferSizeLimit in CallOptions, which makes verification not straightforward. --- api/src/main/java/io/grpc/ManagedChannelBuilder.java | 2 -- .../java/io/grpc/internal/ManagedChannelImplBuilder.java | 2 +- .../test/java/io/grpc/internal/ManagedChannelImplTest.java | 5 +++++ 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/api/src/main/java/io/grpc/ManagedChannelBuilder.java b/api/src/main/java/io/grpc/ManagedChannelBuilder.java index 73e66ed6dc4..98b22807ccc 100644 --- a/api/src/main/java/io/grpc/ManagedChannelBuilder.java +++ b/api/src/main/java/io/grpc/ManagedChannelBuilder.java @@ -467,7 +467,6 @@ public T perRpcBufferLimit(long bytes) { * @return this * @since 1.11.0 */ - @ExperimentalApi("https://github.com/grpc/grpc-java/issues/3982") public T disableRetry() { throw new UnsupportedOperationException(); } @@ -482,7 +481,6 @@ public T disableRetry() { * @return this * @since 1.11.0 */ - @ExperimentalApi("https://github.com/grpc/grpc-java/issues/3982") public T enableRetry() { throw new UnsupportedOperationException(); } diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImplBuilder.java b/core/src/main/java/io/grpc/internal/ManagedChannelImplBuilder.java index cad4ece233e..26c48fc8596 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImplBuilder.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImplBuilder.java @@ -142,7 +142,7 @@ public static ManagedChannelBuilder forTarget(String target) { int maxHedgedAttempts = 5; long retryBufferSize = DEFAULT_RETRY_BUFFER_SIZE_IN_BYTES; long perRpcBufferLimit = DEFAULT_PER_RPC_BUFFER_LIMIT_IN_BYTES; - boolean retryEnabled = false; // TODO(zdapeng): default to true + boolean retryEnabled = true; InternalChannelz channelz = InternalChannelz.instance(); int maxTraceEvents; diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java index ccfb5f074c5..668411d7ecc 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java @@ -355,6 +355,7 @@ public void close() throws SecurityException { channelBuilder = new ManagedChannelImplBuilder(TARGET, new UnsupportedClientTransportFactoryBuilder(), new FixedPortProvider(DEFAULT_PORT)); + channelBuilder.disableRetry(); configureBuilder(channelBuilder); } @@ -1881,6 +1882,7 @@ public void oobChannelHasNoChannelCallCredentials() { TARGET, InsecureChannelCredentials.create(), new FakeCallCredentials(metadataKey, channelCredValue), new UnsupportedClientTransportFactoryBuilder(), new FixedPortProvider(DEFAULT_PORT)); + channelBuilder.disableRetry(); configureBuilder(channelBuilder); createChannel(); @@ -1933,6 +1935,7 @@ public void oobChannelHasNoChannelCallCredentials() { new FakeNameResolverFactory.Builder(URI.create("oobauthority")).build()) .defaultLoadBalancingPolicy(MOCK_POLICY_NAME) .idleTimeout(ManagedChannelImplBuilder.IDLE_MODE_MAX_TIMEOUT_DAYS, TimeUnit.DAYS) + .disableRetry() // irrelevant to what we test, disable retry to make verification easy .build(); oob.getState(true); ArgumentCaptor helperCaptor = ArgumentCaptor.forClass(Helper.class); @@ -1980,6 +1983,7 @@ public SwapChannelCredentialsResult answer(InvocationOnMock invocation) { TARGET, InsecureChannelCredentials.create(), new FakeCallCredentials(metadataKey, channelCredValue), new UnsupportedClientTransportFactoryBuilder(), new FixedPortProvider(DEFAULT_PORT)); + channelBuilder.disableRetry(); configureBuilder(channelBuilder); createChannel(); @@ -2017,6 +2021,7 @@ public SwapChannelCredentialsResult answer(InvocationOnMock invocation) { new FakeNameResolverFactory.Builder(URI.create("fake://oobauthority/")).build()) .defaultLoadBalancingPolicy(MOCK_POLICY_NAME) .idleTimeout(ManagedChannelImplBuilder.IDLE_MODE_MAX_TIMEOUT_DAYS, TimeUnit.DAYS) + .disableRetry() // irrelevant to what we test, disable retry to make verification easy .build(); oob.getState(true); ArgumentCaptor helperCaptor = ArgumentCaptor.forClass(Helper.class); From 1da48a8f7cae43c63404e6c9e54b2a154ce4f760 Mon Sep 17 00:00:00 2001 From: ZHANG Dapeng Date: Thu, 12 Aug 2021 10:01:32 -0700 Subject: [PATCH 4/4] xds: enable xDS retry by default (#8403) --- xds/src/main/java/io/grpc/xds/ClientXdsClient.java | 4 ++-- xds/src/test/java/io/grpc/xds/ClientXdsClientDataTest.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/ClientXdsClient.java b/xds/src/main/java/io/grpc/xds/ClientXdsClient.java index 4ae6651784f..93bfeb36b34 100644 --- a/xds/src/main/java/io/grpc/xds/ClientXdsClient.java +++ b/xds/src/main/java/io/grpc/xds/ClientXdsClient.java @@ -134,8 +134,8 @@ final class ClientXdsClient extends AbstractXdsClient { || Boolean.parseBoolean(System.getenv("GRPC_XDS_EXPERIMENTAL_FAULT_INJECTION")); @VisibleForTesting static boolean enableRetry = - !Strings.isNullOrEmpty(System.getenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RETRY")) - && Boolean.parseBoolean(System.getenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RETRY")); + Strings.isNullOrEmpty(System.getenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RETRY")) + || Boolean.parseBoolean(System.getenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RETRY")); private static final String TYPE_URL_HTTP_CONNECTION_MANAGER_V2 = "type.googleapis.com/envoy.config.filter.network.http_connection_manager.v2" diff --git a/xds/src/test/java/io/grpc/xds/ClientXdsClientDataTest.java b/xds/src/test/java/io/grpc/xds/ClientXdsClientDataTest.java index 4b12ebd71c8..60ce2befe45 100644 --- a/xds/src/test/java/io/grpc/xds/ClientXdsClientDataTest.java +++ b/xds/src/test/java/io/grpc/xds/ClientXdsClientDataTest.java @@ -136,7 +136,7 @@ public class ClientXdsClientDataTest { @Before public void setUp() { originalEnableRetry = ClientXdsClient.enableRetry; - assertThat(originalEnableRetry).isFalse(); + assertThat(originalEnableRetry).isTrue(); } @After