New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
census: Fix retry stats data race #8422
Conversation
@@ -439,7 +439,10 @@ void recordFinishedRpc() { | |||
private final AtomicLong retryDelayNanos = new AtomicLong(); | |||
private final AtomicLong lastInactiveTimeStamp = new AtomicLong(); | |||
private final AtomicInteger activeStreams = new AtomicInteger(); | |||
private final AtomicInteger activeStreams2 = new AtomicInteger(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand. Why do we need another atomic variable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because the first atomic variable
if (activeStreams.decrementAndGet() == 0) {
// Race condition between two extremely close events does not matter because the difference
// in the result would be very small.
long lastInactiveTimeStamp =
this.lastInactiveTimeStamp.getAndSet(stopwatch.elapsed(TimeUnit.NANOSECONDS));
retryDelayNanos.addAndGet(-lastInactiveTimeStamp);
}
is doing something after the atomic check, and that thing can not be done atomically with the decrement. If another thread is using the same atomic variable, it will be broken.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is doing something after the atomic check, and that thing can not be done atomically with the decrement
Why does it need to be? There's no critical section, so it seems "any time after decrementAndGet() == 0 must be safe."
If another thread is using the same atomic variable, it will be broken.
What atomic variable? How would another thread be using it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not safe when callEnded()
reports RETRY_DELAY_PER_CALL, to be exaggerated adding a long sleep as follows:
// attmeptEnded()
if (activeStreams.decrementAndGet() == 0) {
Thread.sleep(10000000000); // long sleep
long lastInactiveTimeStamp =
this.lastInactiveTimeStamp.getAndSet(stopwatch.elapsed(TimeUnit.NANOSECONDS));
retryDelayNanos.addAndGet(-lastInactiveTimeStamp); // write operation
if (callEnded == 1) {
recordFinishedCall();
}
}
...
// callEnded()
if (callEndedUpdater != null) {
if (callEndedUpdater.getAndSet(this, 1) != 0) {
return;
}
} else {
if (callEnded != 0) {
return;
}
callEnded = 1;
}
if (activeStreams.get() == 0) { // another thread using the same atomic variable
recordFinishedCall(); // read operation not safe
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A race on retryDelayNanos
wouldn't have been detected by a race detector. The race detector noticed a race on stopwatch
. But why would there be a race on stopwatch
, since stopwatch.elapsed()
is read-only? (After all, we could have just saved a System.nanoTime()
ourselves and that wouldn't change over time.)
It looks like the stopwatch.stop()
in callEnded()
shouldn't be there, as it adds no value and introduces a write that could interfere with attemptEnded()
. I agree that you've found an additional race though.
Concerning updating retryDelayNanos
in attemptEnded()
, I think we should just "not do that." Instead of taking the full RPC duration and subtracting out the time in each attempt, can we instead just add the individual time periods when there has been no attempt? That is, we update the atomic when a delay period ends instead of when it starts? It looks like that'd be as simple as:
// CallAttemptsTracerFactory.<init>
retryDelayStopwatch = module.stopwatchSupplier.get(); // initially stopped
// newClientStreamTracer
if (activeStreams.incrementAndGet() == 1) {
retryDelayStopwatch.stop(); // does nothing for the first attempt
retryDelayNanos = stopwatch.elapsed(); // just a plain volatile
}
// attemptEnded
if (activeStreams.decrementAndGet() == 0) {
retryDelayStopwatch.start();
}
That has the advantage that the stopwatch won't be racy and will be exactly 0
if there were no retries.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A race on retryDelayNanos wouldn't have been detected by a race detector. The race detector noticed a race on stopwatch. But why would there be a race on stopwatch, since stopwatch.elapsed() is read-only? (After all, we could have just saved a System.nanoTime() ourselves and that wouldn't change over time.)
The read/write operations inside the if-block
if (activeStreams.decrementAndGet() == 0) { ...}
can happen concurrently with the write/read operations callEnded(). So the race detector noticed stopwatch.elapsed()
first.
It looks like the stopwatch.stop() in callEnded() shouldn't be there, as it adds no value.
I agree.
can we instead just add the individual time periods when there has been no attempt? That is, we update the atomic when a delay period ends instead of when it starts? It looks like that'd be as simple as:
// CallAttemptsTracerFactory.<init>
retryDelayStopwatch = module.stopwatchSupplier.get(); // initially stopped
// newClientStreamTracer
if (activeStreams.incrementAndGet() == 1) {
retryDelayStopwatch.stop(); // does nothing for the first attempt
retryDelayNanos = stopwatch.elapsed(); // just a plain volatile
}
// attemptEnded
if (activeStreams.decrementAndGet() == 0) {
retryDelayStopwatch.start();
}
It works for retry but not for hedging.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The read/write operations inside the if-block
Yeah, I was basically getting to a part that "there has to be a bug." And that bug was we were doing a write with stopwatch.stop()
.
It works for retry but not for hedging.
I just came here to mention that, as it just dawned on me. There are ways to fix that, but they wouldn't be trivial. I think the answer is "use a lock."
// newClientStreamTracer
synchronized (this) {
if (activeStreams++ == 0) { // not volatile
retryDelayStopwatch.stop(); // does nothing for the first attempt
retryDelayNanos = stopwatch.elapsed();
}
}
// attemptEnded
synchronized (this) {
if (--activeStreams == 0) {
retryDelayStopwatch.start();
}
}
//------------ or -----------
// attemptEnded
synchronized (this) {
if (activeStreams.decrementAndGet() == 0) {
long lastInactiveTimeStamp =
this.lastInactiveTimeStamp.getAndSet(stopwatch.elapsed(TimeUnit.NANOSECONDS));
retryDelayNanos.addAndGet(-lastInactiveTimeStamp);
}
}
I'd be willing to go further moving "anything that is used for control flow" under the lock. So attemptsPerCall
and callEnded
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the answer is "use a lock."
That's fair. The original atomic counter activeStream
was already too complex. I was not using a lock in the origin implementation just because I had feeling that CensusStatsModule is performance sensitive (I saw it's using AtomicReferenceFieldUpdater
s). Even it's viable to fix without a lock, I'd prefer using lock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd be willing to go further moving "anything that is used for control flow" under the lock.
I was thinking about that too, for fixing the isReady()
-halfClose race. Using a lock seems only possible solution in that case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking about that too, for fixing the isReady()-halfClose race. Using a lock seems only possible solution in that case.
The simplest possible solution is to make MessageFramer.closed
volatile. Better though I think is to just stop using framer().isClosed()
in isReady()
check. AbstractStream
(or the subclasses) can just keep their own volatile variable that is set when halfClose()
/close()
is called. In fact, AbstractClientStream
already has such a variable: AbstractClientStream.TransportState.outboundClosed
.
Closing in favor of #8459 |
There is data race in
CensusStatsModule. CallAttemptsTracerFactory
:If client call is cancelled while an active stream on the transport is not committed, then a noop substream will be committed and the active stream will be cancelled. Because the active stream cancellation triggers the stream listener closed() on the transport thread, the closed() method can be invoked concurrently with the call listener onClose(). Therefore, one
CallAttemptsTracerFactory.attemptEnded()
can be called concurrently withCallAttemptsTracerFactory.callEnded()
, and there could be data race on RETRY_DELAY_PER_CALL. See also the regression test added.The same data race can happen in hedging case when one of hedges is committed and completes the call, other uncommitted hedges would cancel themselves and trigger their stream listeners closed() on the transport_thread concurrently.
Fixing the race by recording RETRY_DELAY_PER_CALL once both the conditions are met: