From 6c98f22a2765da025f4f92eee6004500575a92a6 Mon Sep 17 00:00:00 2001 From: "Penn (Dapeng) Zhang" Date: Wed, 18 Aug 2021 10:31:40 -0700 Subject: [PATCH 1/5] add regression test --- .../grpc/testing/integration/RetryTest.java | 64 +++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/RetryTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/RetryTest.java index bdf39e8546a..eb815501d5c 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/RetryTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/RetryTest.java @@ -356,6 +356,70 @@ public void statsRecorded() throws Exception { assertRetryStatsRecorded(1, 0, 10_000); } + @Test + public void statsRecorde_callCancelledBeforeCommit() throws Exception { + startNewServer(); + retryPolicy = ImmutableMap.builder() + .put("maxAttempts", 4D) + .put("initialBackoff", "10s") + .put("maxBackoff", "10s") + .put("backoffMultiplier", 1D) + .put("retryableStatusCodes", Arrays.asList("UNAVAILABLE")) + .build(); + createNewChannel(); + + // We will have streamClosed return at a particular moment that we want. + final CountDownLatch streamClosedLatch = new CountDownLatch(1); + ClientStreamTracer.Factory streamTracerFactory = new ClientStreamTracer.Factory() { + @Override + public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) { + return new ClientStreamTracer() { + @Override + public void streamClosed(Status status) { + if (status.getCode().equals(Code.CANCELLED)) { + try { + streamClosedLatch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new AssertionError("streamClosedLatch interrupted", e); + } + } + } + }; + } + }; + ClientCall call = channel.newCall( + clientStreamingMethod, CallOptions.DEFAULT.withStreamTracerFactory(streamTracerFactory)); + call.start(mockCallListener, new Metadata()); + assertRpcStartedRecorded(); + fakeClock.forwardTime(5, SECONDS); + String message = "String of length 20."; + call.sendMessage(message); + assertOutboundMessageRecorded(); + ServerCall serverCall = serverCalls.poll(5, SECONDS); + serverCall.request(2); + assertOutboundWireSizeRecorded(message.length()); + // trigger retry + serverCall.close( + Status.UNAVAILABLE.withDescription("original attempt failed"), + new Metadata()); + assertRpcStatusRecorded(Code.UNAVAILABLE, 5000, 1); + elapseBackoff(10, SECONDS); + assertRpcStartedRecorded(); + assertOutboundMessageRecorded(); + serverCall = serverCalls.poll(5, SECONDS); + serverCall.request(2); + assertOutboundWireSizeRecorded(message.length()); + fakeClock.forwardTime(7, SECONDS); + call.cancel("Cancelled before commit", null); // A noop substream will commit. + // The call listener is closed, but the netty substream listener is not yet closed. + verify(mockCallListener, timeout(5000)).onClose(any(Status.class), any(Metadata.class)); + // Let the netty substream listener be closed. + streamClosedLatch.countDown(); + assertRetryStatsRecorded(1, 0, 10_000); + assertRpcStatusRecorded(Code.CANCELLED, 7_000, 1); + } + @Test public void serverCancelledAndClientDeadlineExceeded() throws Exception { startNewServer(); From a167f3c35b8372d78bfd9bb1eba79472489b347e Mon Sep 17 00:00:00 2001 From: "Penn (Dapeng) Zhang" Date: Mon, 30 Aug 2021 22:24:22 -0700 Subject: [PATCH 2/5] census: fix retry stats data race v2 --- .../io/grpc/census/CensusStatsModule.java | 104 +++++++++--------- 1 file changed, 52 insertions(+), 52 deletions(-) diff --git a/census/src/main/java/io/grpc/census/CensusStatsModule.java b/census/src/main/java/io/grpc/census/CensusStatsModule.java index 6faeb575ccc..d710e43630e 100644 --- a/census/src/main/java/io/grpc/census/CensusStatsModule.java +++ b/census/src/main/java/io/grpc/census/CensusStatsModule.java @@ -55,7 +55,6 @@ import io.opencensus.tags.unsafe.ContextUtils; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLongFieldUpdater; @@ -356,12 +355,12 @@ public void streamClosed(Status status) { if (module.recordFinishedRpcs) { // Stream is closed early. So no need to record metrics for any inbound events after this // point. - recordFinishedRpc(); + recordFinishedAttempt(); } } // Otherwise will report stats in callEnded() to guarantee all inbound metrics are recorded. } - void recordFinishedRpc() { + void recordFinishedAttempt() { MeasureMap measureMap = module.statsRecorder.newMeasureMap() // TODO(songya): remove the deprecated measure constants once they are completed removed. .put(DeprecatedCensusConstants.RPC_CLIENT_FINISHED_COUNT, 1) @@ -405,30 +404,10 @@ static final class CallAttemptsTracerFactory extends Measure.MeasureDouble.create( "grpc.io/client/retry_delay_per_call", "Retry delay per call", "ms"); - @Nullable - private static final AtomicIntegerFieldUpdater callEndedUpdater; - - /** - * When using Atomic*FieldUpdater, some Samsung Android 5.0.x devices encounter a bug in their - * JDK reflection API that triggers a NoSuchFieldException. When this occurs, we fallback to - * (potentially racy) direct updates of the volatile variables. - */ - static { - AtomicIntegerFieldUpdater tmpCallEndedUpdater; - try { - tmpCallEndedUpdater = - AtomicIntegerFieldUpdater.newUpdater(CallAttemptsTracerFactory.class, "callEnded"); - } catch (Throwable t) { - logger.log(Level.SEVERE, "Creating atomic field updaters failed", t); - tmpCallEndedUpdater = null; - } - callEndedUpdater = tmpCallEndedUpdater; - } - ClientTracer inboundMetricTracer; private final CensusStatsModule module; private final Stopwatch stopwatch; - private volatile int callEnded; + private boolean callEnded; private final TagContext parentCtx; private final TagContext startCtx; private final String fullMethodName; @@ -436,17 +415,18 @@ static final class CallAttemptsTracerFactory extends // TODO(zdapeng): optimize memory allocation using AtomicFieldUpdater. private final AtomicLong attemptsPerCall = new AtomicLong(); private final AtomicLong transparentRetriesPerCall = new AtomicLong(); - private final AtomicLong retryDelayNanos = new AtomicLong(); - private final AtomicLong lastInactiveTimeStamp = new AtomicLong(); - private final AtomicInteger activeStreams = new AtomicInteger(); - private final AtomicBoolean activated = new AtomicBoolean(); + private Status status; + private final Object lock = new Object(); + private long retryDelayNanos; + private int activeStreams; + private boolean finishedCallToBeRecorded; CallAttemptsTracerFactory( CensusStatsModule module, TagContext parentCtx, String fullMethodName) { this.module = checkNotNull(module, "module"); this.parentCtx = checkNotNull(parentCtx, "parentCtx"); this.fullMethodName = checkNotNull(fullMethodName, "fullMethodName"); - this.stopwatch = module.stopwatchSupplier.get().start(); + this.stopwatch = module.stopwatchSupplier.get(); TagValue methodTag = TagValue.create(fullMethodName); startCtx = module.tagger.toBuilder(parentCtx) .putLocal(RpcMeasureConstants.GRPC_CLIENT_METHOD, methodTag) @@ -461,10 +441,14 @@ static final class CallAttemptsTracerFactory extends @Override public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata metadata) { - ClientTracer tracer = new ClientTracer(this, module, parentCtx, startCtx, info); - if (activeStreams.incrementAndGet() == 1) { - if (!activated.compareAndSet(false, true)) { - retryDelayNanos.addAndGet(stopwatch.elapsed(TimeUnit.NANOSECONDS)); + synchronized (lock) { + if (finishedCallToBeRecorded) { + // This can be the case when the called is cancelled but a retry attempt is created. + return new ClientStreamTracer() {}; + } + if (++activeStreams == 1 && stopwatch.isRunning()) { + stopwatch.stop(); + retryDelayNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS); } } if (module.recordStartedRpcs && attemptsPerCall.get() > 0) { @@ -477,42 +461,58 @@ public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata metada } else { attemptsPerCall.incrementAndGet(); } - return tracer; + return new ClientTracer(this, module, parentCtx, startCtx, info); } // Called whenever each attempt is ended. void attemptEnded() { - if (activeStreams.decrementAndGet() == 0) { - // Race condition between two extremely close events does not matter because the difference - // in the result would be very small. - long lastInactiveTimeStamp = - this.lastInactiveTimeStamp.getAndSet(stopwatch.elapsed(TimeUnit.NANOSECONDS)); - retryDelayNanos.addAndGet(-lastInactiveTimeStamp); + if (!module.recordFinishedRpcs) { + return; + } + boolean shouldRecordFinishedCall = false; + synchronized (lock) { + if (--activeStreams == 0) { + stopwatch.start(); + if (callEnded && !finishedCallToBeRecorded) { + shouldRecordFinishedCall = true; + finishedCallToBeRecorded = true; + } + } + } + if (shouldRecordFinishedCall) { + recordFinishedCall(); } } void callEnded(Status status) { - if (callEndedUpdater != null) { - if (callEndedUpdater.getAndSet(this, 1) != 0) { + if (!module.recordFinishedRpcs) { + return; + } + this.status = status; + boolean shouldRecordFinishedCall = false; + synchronized (lock) { + if (callEnded) { return; } - } else { - if (callEnded != 0) { - return; + callEnded = true; + if (activeStreams == 0 && !finishedCallToBeRecorded) { + shouldRecordFinishedCall = true; + finishedCallToBeRecorded = true; } - callEnded = 1; } - if (!module.recordFinishedRpcs) { - return; + if (shouldRecordFinishedCall) { + recordFinishedCall(); } - stopwatch.stop(); + } + + void recordFinishedCall() { if (attemptsPerCall.get() == 0) { ClientTracer tracer = new ClientTracer(this, module, parentCtx, startCtx, null); tracer.roundtripNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS); tracer.statusCode = status.getCode(); - tracer.recordFinishedRpc(); + tracer.recordFinishedAttempt(); } else if (inboundMetricTracer != null) { - inboundMetricTracer.recordFinishedRpc(); + inboundMetricTracer.recordFinishedAttempt(); } long retriesPerCall = 0; @@ -523,7 +523,7 @@ void callEnded(Status status) { MeasureMap measureMap = module.statsRecorder.newMeasureMap() .put(RETRIES_PER_CALL, retriesPerCall) .put(TRANSPARENT_RETRIES_PER_CALL, transparentRetriesPerCall.get()) - .put(RETRY_DELAY_PER_CALL, retryDelayNanos.get() / NANOS_PER_MILLI); + .put(RETRY_DELAY_PER_CALL, retryDelayNanos / NANOS_PER_MILLI); TagValue methodTag = TagValue.create(fullMethodName); TagValue statusTag = TagValue.create(status.getCode().toString()); measureMap.record( From 18f5a36d79f2304c61b13b37f1ace62e1c710d68 Mon Sep 17 00:00:00 2001 From: "Penn (Dapeng) Zhang" Date: Wed, 1 Sep 2021 16:26:08 -0700 Subject: [PATCH 3/5] log warning --- census/src/main/java/io/grpc/census/CensusStatsModule.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/census/src/main/java/io/grpc/census/CensusStatsModule.java b/census/src/main/java/io/grpc/census/CensusStatsModule.java index d710e43630e..0915cfdca0f 100644 --- a/census/src/main/java/io/grpc/census/CensusStatsModule.java +++ b/census/src/main/java/io/grpc/census/CensusStatsModule.java @@ -492,6 +492,8 @@ void callEnded(Status status) { boolean shouldRecordFinishedCall = false; synchronized (lock) { if (callEnded) { + // FIXME(https://github.com/grpc/grpc-java/issues/7921) + logger.warning("callEnded() already called. This is a bug."); return; } callEnded = true; From 3c5851037f8c357c17f7e5e10d6066b7e6c42f2c Mon Sep 17 00:00:00 2001 From: "Penn (Dapeng) Zhang" Date: Wed, 1 Sep 2021 16:38:43 -0700 Subject: [PATCH 4/5] add @GuardedBy --- census/src/main/java/io/grpc/census/CensusStatsModule.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/census/src/main/java/io/grpc/census/CensusStatsModule.java b/census/src/main/java/io/grpc/census/CensusStatsModule.java index 0915cfdca0f..5e501417751 100644 --- a/census/src/main/java/io/grpc/census/CensusStatsModule.java +++ b/census/src/main/java/io/grpc/census/CensusStatsModule.java @@ -61,6 +61,7 @@ import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; /** * Provides factories for {@link StreamTracer} that records stats to Census. @@ -407,6 +408,7 @@ static final class CallAttemptsTracerFactory extends ClientTracer inboundMetricTracer; private final CensusStatsModule module; private final Stopwatch stopwatch; + @GuardedBy("lock") private boolean callEnded; private final TagContext parentCtx; private final TagContext startCtx; @@ -415,10 +417,14 @@ static final class CallAttemptsTracerFactory extends // TODO(zdapeng): optimize memory allocation using AtomicFieldUpdater. private final AtomicLong attemptsPerCall = new AtomicLong(); private final AtomicLong transparentRetriesPerCall = new AtomicLong(); + // write happens before read private Status status; private final Object lock = new Object(); + // write @GuardedBy("lock") and happens before read private long retryDelayNanos; + @GuardedBy("lock") private int activeStreams; + @GuardedBy("lock") private boolean finishedCallToBeRecorded; CallAttemptsTracerFactory( From dd15729a0f16869a18a4180f6602103297a23415 Mon Sep 17 00:00:00 2001 From: "Penn (Dapeng) Zhang" Date: Thu, 2 Sep 2021 09:53:13 -0700 Subject: [PATCH 5/5] remove logging --- census/src/main/java/io/grpc/census/CensusStatsModule.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/census/src/main/java/io/grpc/census/CensusStatsModule.java b/census/src/main/java/io/grpc/census/CensusStatsModule.java index 5e501417751..6f8acdb71e9 100644 --- a/census/src/main/java/io/grpc/census/CensusStatsModule.java +++ b/census/src/main/java/io/grpc/census/CensusStatsModule.java @@ -498,8 +498,7 @@ void callEnded(Status status) { boolean shouldRecordFinishedCall = false; synchronized (lock) { if (callEnded) { - // FIXME(https://github.com/grpc/grpc-java/issues/7921) - logger.warning("callEnded() already called. This is a bug."); + // FIXME(https://github.com/grpc/grpc-java/issues/7921): this shouldn't happen return; } callEnded = true;