Skip to content

Commit

Permalink
census: Fix retry stats data race (#8459)
Browse files Browse the repository at this point in the history
There is data race in `CensusStatsModule. CallAttemptsTracerFactory`:

If client call is cancelled while an active stream on the transport is not committed, then a [noop substream](https://github.com/grpc/grpc-java/blob/v1.40.0/core/src/main/java/io/grpc/internal/RetriableStream.java#L486) will be committed and the active stream will be cancelled. Because the active stream cancellation triggers the stream listener closed() on the _transport_ thread, the closed() method can be invoked concurrently with the call listener onClose(). Therefore, one `CallAttemptsTracerFactory.attemptEnded()` can be called concurrently with `CallAttemptsTracerFactory.callEnded()`, and there could be data race on RETRY_DELAY_PER_CALL. See also the regression test added.

The same data race can happen in hedging case when one of hedges is committed and completes the call, other uncommitted hedges would cancel themselves and trigger their stream listeners closed() on the transport_thread concurrently. 

Fixing the race by recording RETRY_DELAY_PER_CALL once both the conditions are met: 
- callEnded is true 
- number of active streams is 0.
  • Loading branch information
dapengzhang0 committed Sep 2, 2021
1 parent c39c6ff commit 6fccaaa
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 52 deletions.
111 changes: 59 additions & 52 deletions census/src/main/java/io/grpc/census/CensusStatsModule.java
Expand Up @@ -55,13 +55,13 @@
import io.opencensus.tags.unsafe.ContextUtils;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;

/**
* Provides factories for {@link StreamTracer} that records stats to Census.
Expand Down Expand Up @@ -356,12 +356,12 @@ public void streamClosed(Status status) {
if (module.recordFinishedRpcs) {
// Stream is closed early. So no need to record metrics for any inbound events after this
// point.
recordFinishedRpc();
recordFinishedAttempt();
}
} // Otherwise will report stats in callEnded() to guarantee all inbound metrics are recorded.
}

void recordFinishedRpc() {
void recordFinishedAttempt() {
MeasureMap measureMap = module.statsRecorder.newMeasureMap()
// TODO(songya): remove the deprecated measure constants once they are completed removed.
.put(DeprecatedCensusConstants.RPC_CLIENT_FINISHED_COUNT, 1)
Expand Down Expand Up @@ -405,48 +405,34 @@ static final class CallAttemptsTracerFactory extends
Measure.MeasureDouble.create(
"grpc.io/client/retry_delay_per_call", "Retry delay per call", "ms");

@Nullable
private static final AtomicIntegerFieldUpdater<CallAttemptsTracerFactory> callEndedUpdater;

/**
* When using Atomic*FieldUpdater, some Samsung Android 5.0.x devices encounter a bug in their
* JDK reflection API that triggers a NoSuchFieldException. When this occurs, we fallback to
* (potentially racy) direct updates of the volatile variables.
*/
static {
AtomicIntegerFieldUpdater<CallAttemptsTracerFactory> tmpCallEndedUpdater;
try {
tmpCallEndedUpdater =
AtomicIntegerFieldUpdater.newUpdater(CallAttemptsTracerFactory.class, "callEnded");
} catch (Throwable t) {
logger.log(Level.SEVERE, "Creating atomic field updaters failed", t);
tmpCallEndedUpdater = null;
}
callEndedUpdater = tmpCallEndedUpdater;
}

ClientTracer inboundMetricTracer;
private final CensusStatsModule module;
private final Stopwatch stopwatch;
private volatile int callEnded;
@GuardedBy("lock")
private boolean callEnded;
private final TagContext parentCtx;
private final TagContext startCtx;
private final String fullMethodName;

// TODO(zdapeng): optimize memory allocation using AtomicFieldUpdater.
private final AtomicLong attemptsPerCall = new AtomicLong();
private final AtomicLong transparentRetriesPerCall = new AtomicLong();
private final AtomicLong retryDelayNanos = new AtomicLong();
private final AtomicLong lastInactiveTimeStamp = new AtomicLong();
private final AtomicInteger activeStreams = new AtomicInteger();
private final AtomicBoolean activated = new AtomicBoolean();
// write happens before read
private Status status;
private final Object lock = new Object();
// write @GuardedBy("lock") and happens before read
private long retryDelayNanos;
@GuardedBy("lock")
private int activeStreams;
@GuardedBy("lock")
private boolean finishedCallToBeRecorded;

CallAttemptsTracerFactory(
CensusStatsModule module, TagContext parentCtx, String fullMethodName) {
this.module = checkNotNull(module, "module");
this.parentCtx = checkNotNull(parentCtx, "parentCtx");
this.fullMethodName = checkNotNull(fullMethodName, "fullMethodName");
this.stopwatch = module.stopwatchSupplier.get().start();
this.stopwatch = module.stopwatchSupplier.get();
TagValue methodTag = TagValue.create(fullMethodName);
startCtx = module.tagger.toBuilder(parentCtx)
.putLocal(RpcMeasureConstants.GRPC_CLIENT_METHOD, methodTag)
Expand All @@ -461,10 +447,14 @@ static final class CallAttemptsTracerFactory extends

@Override
public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata metadata) {
ClientTracer tracer = new ClientTracer(this, module, parentCtx, startCtx, info);
if (activeStreams.incrementAndGet() == 1) {
if (!activated.compareAndSet(false, true)) {
retryDelayNanos.addAndGet(stopwatch.elapsed(TimeUnit.NANOSECONDS));
synchronized (lock) {
if (finishedCallToBeRecorded) {
// This can be the case when the called is cancelled but a retry attempt is created.
return new ClientStreamTracer() {};
}
if (++activeStreams == 1 && stopwatch.isRunning()) {
stopwatch.stop();
retryDelayNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS);
}
}
if (module.recordStartedRpcs && attemptsPerCall.get() > 0) {
Expand All @@ -477,42 +467,59 @@ public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata metada
} else {
attemptsPerCall.incrementAndGet();
}
return tracer;
return new ClientTracer(this, module, parentCtx, startCtx, info);
}

// Called whenever each attempt is ended.
void attemptEnded() {
if (activeStreams.decrementAndGet() == 0) {
// Race condition between two extremely close events does not matter because the difference
// in the result would be very small.
long lastInactiveTimeStamp =
this.lastInactiveTimeStamp.getAndSet(stopwatch.elapsed(TimeUnit.NANOSECONDS));
retryDelayNanos.addAndGet(-lastInactiveTimeStamp);
if (!module.recordFinishedRpcs) {
return;
}
boolean shouldRecordFinishedCall = false;
synchronized (lock) {
if (--activeStreams == 0) {
stopwatch.start();
if (callEnded && !finishedCallToBeRecorded) {
shouldRecordFinishedCall = true;
finishedCallToBeRecorded = true;
}
}
}
if (shouldRecordFinishedCall) {
recordFinishedCall();
}
}

void callEnded(Status status) {
if (callEndedUpdater != null) {
if (callEndedUpdater.getAndSet(this, 1) != 0) {
if (!module.recordFinishedRpcs) {
return;
}
this.status = status;
boolean shouldRecordFinishedCall = false;
synchronized (lock) {
if (callEnded) {
// FIXME(https://github.com/grpc/grpc-java/issues/7921): this shouldn't happen
return;
}
} else {
if (callEnded != 0) {
return;
callEnded = true;
if (activeStreams == 0 && !finishedCallToBeRecorded) {
shouldRecordFinishedCall = true;
finishedCallToBeRecorded = true;
}
callEnded = 1;
}
if (!module.recordFinishedRpcs) {
return;
if (shouldRecordFinishedCall) {
recordFinishedCall();
}
stopwatch.stop();
}

void recordFinishedCall() {
if (attemptsPerCall.get() == 0) {
ClientTracer tracer = new ClientTracer(this, module, parentCtx, startCtx, null);
tracer.roundtripNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS);
tracer.statusCode = status.getCode();
tracer.recordFinishedRpc();
tracer.recordFinishedAttempt();
} else if (inboundMetricTracer != null) {
inboundMetricTracer.recordFinishedRpc();
inboundMetricTracer.recordFinishedAttempt();
}

long retriesPerCall = 0;
Expand All @@ -523,7 +530,7 @@ void callEnded(Status status) {
MeasureMap measureMap = module.statsRecorder.newMeasureMap()
.put(RETRIES_PER_CALL, retriesPerCall)
.put(TRANSPARENT_RETRIES_PER_CALL, transparentRetriesPerCall.get())
.put(RETRY_DELAY_PER_CALL, retryDelayNanos.get() / NANOS_PER_MILLI);
.put(RETRY_DELAY_PER_CALL, retryDelayNanos / NANOS_PER_MILLI);
TagValue methodTag = TagValue.create(fullMethodName);
TagValue statusTag = TagValue.create(status.getCode().toString());
measureMap.record(
Expand Down
Expand Up @@ -356,6 +356,70 @@ public void statsRecorded() throws Exception {
assertRetryStatsRecorded(1, 0, 10_000);
}

@Test
public void statsRecorde_callCancelledBeforeCommit() throws Exception {
startNewServer();
retryPolicy = ImmutableMap.<String, Object>builder()
.put("maxAttempts", 4D)
.put("initialBackoff", "10s")
.put("maxBackoff", "10s")
.put("backoffMultiplier", 1D)
.put("retryableStatusCodes", Arrays.<Object>asList("UNAVAILABLE"))
.build();
createNewChannel();

// We will have streamClosed return at a particular moment that we want.
final CountDownLatch streamClosedLatch = new CountDownLatch(1);
ClientStreamTracer.Factory streamTracerFactory = new ClientStreamTracer.Factory() {
@Override
public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) {
return new ClientStreamTracer() {
@Override
public void streamClosed(Status status) {
if (status.getCode().equals(Code.CANCELLED)) {
try {
streamClosedLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new AssertionError("streamClosedLatch interrupted", e);
}
}
}
};
}
};
ClientCall<String, Integer> call = channel.newCall(
clientStreamingMethod, CallOptions.DEFAULT.withStreamTracerFactory(streamTracerFactory));
call.start(mockCallListener, new Metadata());
assertRpcStartedRecorded();
fakeClock.forwardTime(5, SECONDS);
String message = "String of length 20.";
call.sendMessage(message);
assertOutboundMessageRecorded();
ServerCall<String, Integer> serverCall = serverCalls.poll(5, SECONDS);
serverCall.request(2);
assertOutboundWireSizeRecorded(message.length());
// trigger retry
serverCall.close(
Status.UNAVAILABLE.withDescription("original attempt failed"),
new Metadata());
assertRpcStatusRecorded(Code.UNAVAILABLE, 5000, 1);
elapseBackoff(10, SECONDS);
assertRpcStartedRecorded();
assertOutboundMessageRecorded();
serverCall = serverCalls.poll(5, SECONDS);
serverCall.request(2);
assertOutboundWireSizeRecorded(message.length());
fakeClock.forwardTime(7, SECONDS);
call.cancel("Cancelled before commit", null); // A noop substream will commit.
// The call listener is closed, but the netty substream listener is not yet closed.
verify(mockCallListener, timeout(5000)).onClose(any(Status.class), any(Metadata.class));
// Let the netty substream listener be closed.
streamClosedLatch.countDown();
assertRetryStatsRecorded(1, 0, 10_000);
assertRpcStatusRecorded(Code.CANCELLED, 7_000, 1);
}

@Test
public void serverCancelledAndClientDeadlineExceeded() throws Exception {
startNewServer();
Expand Down

0 comments on commit 6fccaaa

Please sign in to comment.