Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

census: Fix retry stats data race (v2) #8459

Merged
merged 5 commits into from Sep 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
111 changes: 59 additions & 52 deletions census/src/main/java/io/grpc/census/CensusStatsModule.java
Expand Up @@ -55,13 +55,13 @@
import io.opencensus.tags.unsafe.ContextUtils;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;

/**
* Provides factories for {@link StreamTracer} that records stats to Census.
Expand Down Expand Up @@ -356,12 +356,12 @@ public void streamClosed(Status status) {
if (module.recordFinishedRpcs) {
// Stream is closed early. So no need to record metrics for any inbound events after this
// point.
recordFinishedRpc();
recordFinishedAttempt();
}
} // Otherwise will report stats in callEnded() to guarantee all inbound metrics are recorded.
}

void recordFinishedRpc() {
void recordFinishedAttempt() {
MeasureMap measureMap = module.statsRecorder.newMeasureMap()
// TODO(songya): remove the deprecated measure constants once they are completed removed.
.put(DeprecatedCensusConstants.RPC_CLIENT_FINISHED_COUNT, 1)
Expand Down Expand Up @@ -405,48 +405,34 @@ static final class CallAttemptsTracerFactory extends
Measure.MeasureDouble.create(
"grpc.io/client/retry_delay_per_call", "Retry delay per call", "ms");

@Nullable
private static final AtomicIntegerFieldUpdater<CallAttemptsTracerFactory> callEndedUpdater;

/**
* When using Atomic*FieldUpdater, some Samsung Android 5.0.x devices encounter a bug in their
* JDK reflection API that triggers a NoSuchFieldException. When this occurs, we fallback to
* (potentially racy) direct updates of the volatile variables.
*/
static {
AtomicIntegerFieldUpdater<CallAttemptsTracerFactory> tmpCallEndedUpdater;
try {
tmpCallEndedUpdater =
AtomicIntegerFieldUpdater.newUpdater(CallAttemptsTracerFactory.class, "callEnded");
} catch (Throwable t) {
logger.log(Level.SEVERE, "Creating atomic field updaters failed", t);
tmpCallEndedUpdater = null;
}
callEndedUpdater = tmpCallEndedUpdater;
}

ClientTracer inboundMetricTracer;
private final CensusStatsModule module;
private final Stopwatch stopwatch;
private volatile int callEnded;
@GuardedBy("lock")
private boolean callEnded;
dapengzhang0 marked this conversation as resolved.
Show resolved Hide resolved
private final TagContext parentCtx;
private final TagContext startCtx;
private final String fullMethodName;

// TODO(zdapeng): optimize memory allocation using AtomicFieldUpdater.
private final AtomicLong attemptsPerCall = new AtomicLong();
private final AtomicLong transparentRetriesPerCall = new AtomicLong();
private final AtomicLong retryDelayNanos = new AtomicLong();
private final AtomicLong lastInactiveTimeStamp = new AtomicLong();
private final AtomicInteger activeStreams = new AtomicInteger();
private final AtomicBoolean activated = new AtomicBoolean();
// write happens before read
private Status status;
private final Object lock = new Object();
// write @GuardedBy("lock") and happens before read
private long retryDelayNanos;
@GuardedBy("lock")
private int activeStreams;
@GuardedBy("lock")
private boolean finishedCallToBeRecorded;

CallAttemptsTracerFactory(
CensusStatsModule module, TagContext parentCtx, String fullMethodName) {
this.module = checkNotNull(module, "module");
this.parentCtx = checkNotNull(parentCtx, "parentCtx");
this.fullMethodName = checkNotNull(fullMethodName, "fullMethodName");
this.stopwatch = module.stopwatchSupplier.get().start();
this.stopwatch = module.stopwatchSupplier.get();
TagValue methodTag = TagValue.create(fullMethodName);
startCtx = module.tagger.toBuilder(parentCtx)
.putLocal(RpcMeasureConstants.GRPC_CLIENT_METHOD, methodTag)
Expand All @@ -461,10 +447,14 @@ static final class CallAttemptsTracerFactory extends

@Override
public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata metadata) {
ClientTracer tracer = new ClientTracer(this, module, parentCtx, startCtx, info);
if (activeStreams.incrementAndGet() == 1) {
if (!activated.compareAndSet(false, true)) {
retryDelayNanos.addAndGet(stopwatch.elapsed(TimeUnit.NANOSECONDS));
synchronized (lock) {
if (finishedCallToBeRecorded) {
// This can be the case when the called is cancelled but a retry attempt is created.
return new ClientStreamTracer() {};
}
if (++activeStreams == 1 && stopwatch.isRunning()) {
stopwatch.stop();
retryDelayNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS);
}
}
if (module.recordStartedRpcs && attemptsPerCall.get() > 0) {
Expand All @@ -477,42 +467,59 @@ public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata metada
} else {
attemptsPerCall.incrementAndGet();
}
return tracer;
return new ClientTracer(this, module, parentCtx, startCtx, info);
}

// Called whenever each attempt is ended.
void attemptEnded() {
if (activeStreams.decrementAndGet() == 0) {
// Race condition between two extremely close events does not matter because the difference
// in the result would be very small.
long lastInactiveTimeStamp =
this.lastInactiveTimeStamp.getAndSet(stopwatch.elapsed(TimeUnit.NANOSECONDS));
retryDelayNanos.addAndGet(-lastInactiveTimeStamp);
if (!module.recordFinishedRpcs) {
return;
}
boolean shouldRecordFinishedCall = false;
synchronized (lock) {
if (--activeStreams == 0) {
stopwatch.start();
if (callEnded && !finishedCallToBeRecorded) {
shouldRecordFinishedCall = true;
finishedCallToBeRecorded = true;
}
}
}
if (shouldRecordFinishedCall) {
recordFinishedCall();
}
}

void callEnded(Status status) {
if (callEndedUpdater != null) {
if (callEndedUpdater.getAndSet(this, 1) != 0) {
if (!module.recordFinishedRpcs) {
return;
}
this.status = status;
boolean shouldRecordFinishedCall = false;
synchronized (lock) {
if (callEnded) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How could this be true?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not possible unless there's some bug like #7921.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then how about we log a warning if it happens? An assert would be great, but I don't think we are that confident. A warning with "this is a bug" at least would mean we'd learn if it happens. If it does trigger, it seems it'd probably be at a low rate.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added logging.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uhh... if we think it can happen, then let's just have the notice of the bug here and return like you had.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean to include the link to the issue in the log?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I mean delete the log statement. I was suggesting the log statement only if we thought it wasn't possible but it would be worrisome to add an assert. If we think it is possible, just having a comment seems good.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

// FIXME(https://github.com/grpc/grpc-java/issues/7921): this shouldn't happen
return;
}
} else {
if (callEnded != 0) {
return;
callEnded = true;
if (activeStreams == 0 && !finishedCallToBeRecorded) {
shouldRecordFinishedCall = true;
finishedCallToBeRecorded = true;
}
callEnded = 1;
}
if (!module.recordFinishedRpcs) {
return;
if (shouldRecordFinishedCall) {
recordFinishedCall();
}
stopwatch.stop();
}

void recordFinishedCall() {
if (attemptsPerCall.get() == 0) {
ClientTracer tracer = new ClientTracer(this, module, parentCtx, startCtx, null);
tracer.roundtripNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS);
tracer.statusCode = status.getCode();
tracer.recordFinishedRpc();
tracer.recordFinishedAttempt();
} else if (inboundMetricTracer != null) {
inboundMetricTracer.recordFinishedRpc();
inboundMetricTracer.recordFinishedAttempt();
}

long retriesPerCall = 0;
Expand All @@ -523,7 +530,7 @@ void callEnded(Status status) {
MeasureMap measureMap = module.statsRecorder.newMeasureMap()
.put(RETRIES_PER_CALL, retriesPerCall)
.put(TRANSPARENT_RETRIES_PER_CALL, transparentRetriesPerCall.get())
.put(RETRY_DELAY_PER_CALL, retryDelayNanos.get() / NANOS_PER_MILLI);
.put(RETRY_DELAY_PER_CALL, retryDelayNanos / NANOS_PER_MILLI);
TagValue methodTag = TagValue.create(fullMethodName);
TagValue statusTag = TagValue.create(status.getCode().toString());
measureMap.record(
Expand Down
Expand Up @@ -356,6 +356,70 @@ public void statsRecorded() throws Exception {
assertRetryStatsRecorded(1, 0, 10_000);
}

@Test
public void statsRecorde_callCancelledBeforeCommit() throws Exception {
startNewServer();
retryPolicy = ImmutableMap.<String, Object>builder()
.put("maxAttempts", 4D)
.put("initialBackoff", "10s")
.put("maxBackoff", "10s")
.put("backoffMultiplier", 1D)
.put("retryableStatusCodes", Arrays.<Object>asList("UNAVAILABLE"))
.build();
createNewChannel();

// We will have streamClosed return at a particular moment that we want.
final CountDownLatch streamClosedLatch = new CountDownLatch(1);
ClientStreamTracer.Factory streamTracerFactory = new ClientStreamTracer.Factory() {
@Override
public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) {
return new ClientStreamTracer() {
@Override
public void streamClosed(Status status) {
if (status.getCode().equals(Code.CANCELLED)) {
try {
streamClosedLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new AssertionError("streamClosedLatch interrupted", e);
}
}
}
};
}
};
ClientCall<String, Integer> call = channel.newCall(
clientStreamingMethod, CallOptions.DEFAULT.withStreamTracerFactory(streamTracerFactory));
call.start(mockCallListener, new Metadata());
assertRpcStartedRecorded();
fakeClock.forwardTime(5, SECONDS);
String message = "String of length 20.";
call.sendMessage(message);
assertOutboundMessageRecorded();
ServerCall<String, Integer> serverCall = serverCalls.poll(5, SECONDS);
serverCall.request(2);
assertOutboundWireSizeRecorded(message.length());
// trigger retry
serverCall.close(
Status.UNAVAILABLE.withDescription("original attempt failed"),
new Metadata());
assertRpcStatusRecorded(Code.UNAVAILABLE, 5000, 1);
elapseBackoff(10, SECONDS);
assertRpcStartedRecorded();
assertOutboundMessageRecorded();
serverCall = serverCalls.poll(5, SECONDS);
serverCall.request(2);
assertOutboundWireSizeRecorded(message.length());
fakeClock.forwardTime(7, SECONDS);
call.cancel("Cancelled before commit", null); // A noop substream will commit.
// The call listener is closed, but the netty substream listener is not yet closed.
verify(mockCallListener, timeout(5000)).onClose(any(Status.class), any(Metadata.class));
// Let the netty substream listener be closed.
streamClosedLatch.countDown();
assertRetryStatsRecorded(1, 0, 10_000);
assertRpcStatusRecorded(Code.CANCELLED, 7_000, 1);
}

@Test
public void serverCancelledAndClientDeadlineExceeded() throws Exception {
startNewServer();
Expand Down