Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

census: Fix retry stats data race #8422

Closed

Conversation

dapengzhang0
Copy link
Member

@dapengzhang0 dapengzhang0 commented Aug 18, 2021

There is data race in CensusStatsModule. CallAttemptsTracerFactory:

If client call is cancelled while an active stream on the transport is not committed, then a noop substream will be committed and the active stream will be cancelled. Because the active stream cancellation triggers the stream listener closed() on the transport thread, the closed() method can be invoked concurrently with the call listener onClose(). Therefore, one CallAttemptsTracerFactory.attemptEnded() can be called concurrently with CallAttemptsTracerFactory.callEnded(), and there could be data race on RETRY_DELAY_PER_CALL. See also the regression test added.

The same data race can happen in hedging case when one of hedges is committed and completes the call, other uncommitted hedges would cancel themselves and trigger their stream listeners closed() on the transport_thread concurrently.

Fixing the race by recording RETRY_DELAY_PER_CALL once both the conditions are met:

  • callEnded is true
  • number of active streams is 0.

@@ -439,7 +439,10 @@ void recordFinishedRpc() {
private final AtomicLong retryDelayNanos = new AtomicLong();
private final AtomicLong lastInactiveTimeStamp = new AtomicLong();
private final AtomicInteger activeStreams = new AtomicInteger();
private final AtomicInteger activeStreams2 = new AtomicInteger();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand. Why do we need another atomic variable?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the first atomic variable

if (activeStreams.decrementAndGet() == 0) {
    // Race condition between two extremely close events does not matter because the difference
    // in the result would be very small.
    long lastInactiveTimeStamp =
        this.lastInactiveTimeStamp.getAndSet(stopwatch.elapsed(TimeUnit.NANOSECONDS));
    retryDelayNanos.addAndGet(-lastInactiveTimeStamp);
}

is doing something after the atomic check, and that thing can not be done atomically with the decrement. If another thread is using the same atomic variable, it will be broken.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is doing something after the atomic check, and that thing can not be done atomically with the decrement

Why does it need to be? There's no critical section, so it seems "any time after decrementAndGet() == 0 must be safe."

If another thread is using the same atomic variable, it will be broken.

What atomic variable? How would another thread be using it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not safe when callEnded() reports RETRY_DELAY_PER_CALL, to be exaggerated adding a long sleep as follows:

// attmeptEnded()
if (activeStreams.decrementAndGet() == 0) {
    Thread.sleep(10000000000);   // long sleep
    long lastInactiveTimeStamp =
        this.lastInactiveTimeStamp.getAndSet(stopwatch.elapsed(TimeUnit.NANOSECONDS));
    retryDelayNanos.addAndGet(-lastInactiveTimeStamp);   // write operation
    if (callEnded == 1) {
      recordFinishedCall();
    }
}

...

// callEnded()
if (callEndedUpdater != null) {
  if (callEndedUpdater.getAndSet(this, 1) != 0) {
    return;
  }
} else {
  if (callEnded != 0) {
    return;
  }
  callEnded = 1;
}
if (activeStreams.get() == 0) {  // another thread using the same atomic variable
  recordFinishedCall();   // read operation not safe
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A race on retryDelayNanos wouldn't have been detected by a race detector. The race detector noticed a race on stopwatch. But why would there be a race on stopwatch, since stopwatch.elapsed() is read-only? (After all, we could have just saved a System.nanoTime() ourselves and that wouldn't change over time.)

It looks like the stopwatch.stop() in callEnded() shouldn't be there, as it adds no value and introduces a write that could interfere with attemptEnded(). I agree that you've found an additional race though.

Concerning updating retryDelayNanos in attemptEnded(), I think we should just "not do that." Instead of taking the full RPC duration and subtracting out the time in each attempt, can we instead just add the individual time periods when there has been no attempt? That is, we update the atomic when a delay period ends instead of when it starts? It looks like that'd be as simple as:

// CallAttemptsTracerFactory.<init>
retryDelayStopwatch = module.stopwatchSupplier.get(); // initially stopped

// newClientStreamTracer
if (activeStreams.incrementAndGet() == 1) {
  retryDelayStopwatch.stop(); // does nothing for the first attempt
  retryDelayNanos = stopwatch.elapsed(); // just a plain volatile
}

// attemptEnded
if (activeStreams.decrementAndGet() == 0) {
  retryDelayStopwatch.start();
}

That has the advantage that the stopwatch won't be racy and will be exactly 0 if there were no retries.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A race on retryDelayNanos wouldn't have been detected by a race detector. The race detector noticed a race on stopwatch. But why would there be a race on stopwatch, since stopwatch.elapsed() is read-only? (After all, we could have just saved a System.nanoTime() ourselves and that wouldn't change over time.)

The read/write operations inside the if-block

if (activeStreams.decrementAndGet() == 0) { ...}

can happen concurrently with the write/read operations callEnded(). So the race detector noticed stopwatch.elapsed() first.

It looks like the stopwatch.stop() in callEnded() shouldn't be there, as it adds no value.

I agree.

can we instead just add the individual time periods when there has been no attempt? That is, we update the atomic when a delay period ends instead of when it starts? It looks like that'd be as simple as:

// CallAttemptsTracerFactory.<init>
retryDelayStopwatch = module.stopwatchSupplier.get(); // initially stopped

// newClientStreamTracer
if (activeStreams.incrementAndGet() == 1) {
  retryDelayStopwatch.stop(); // does nothing for the first attempt
  retryDelayNanos = stopwatch.elapsed(); // just a plain volatile
}

// attemptEnded
if (activeStreams.decrementAndGet() == 0) {
  retryDelayStopwatch.start();
}

It works for retry but not for hedging.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The read/write operations inside the if-block

Yeah, I was basically getting to a part that "there has to be a bug." And that bug was we were doing a write with stopwatch.stop().

It works for retry but not for hedging.

I just came here to mention that, as it just dawned on me. There are ways to fix that, but they wouldn't be trivial. I think the answer is "use a lock."

// newClientStreamTracer
synchronized (this) {
  if (activeStreams++ == 0) { // not volatile
    retryDelayStopwatch.stop(); // does nothing for the first attempt
    retryDelayNanos = stopwatch.elapsed();
  }
}

// attemptEnded
synchronized (this) {
  if (--activeStreams == 0) {
    retryDelayStopwatch.start();
  }
}

//------------ or -----------

// attemptEnded
synchronized (this) {
  if (activeStreams.decrementAndGet() == 0) {
    long lastInactiveTimeStamp =
            this.lastInactiveTimeStamp.getAndSet(stopwatch.elapsed(TimeUnit.NANOSECONDS));
    retryDelayNanos.addAndGet(-lastInactiveTimeStamp);
  }
}

I'd be willing to go further moving "anything that is used for control flow" under the lock. So attemptsPerCall and callEnded.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the answer is "use a lock."

That's fair. The original atomic counter activeStream was already too complex. I was not using a lock in the origin implementation just because I had feeling that CensusStatsModule is performance sensitive (I saw it's using AtomicReferenceFieldUpdaters). Even it's viable to fix without a lock, I'd prefer using lock.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be willing to go further moving "anything that is used for control flow" under the lock.

I was thinking about that too, for fixing the isReady()-halfClose race. Using a lock seems only possible solution in that case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about that too, for fixing the isReady()-halfClose race. Using a lock seems only possible solution in that case.

The simplest possible solution is to make MessageFramer.closed volatile. Better though I think is to just stop using framer().isClosed() in isReady() check. AbstractStream (or the subclasses) can just keep their own volatile variable that is set when halfClose()/close() is called. In fact, AbstractClientStream already has such a variable: AbstractClientStream.TransportState.outboundClosed.

@dapengzhang0
Copy link
Member Author

Closing in favor of #8459

@github-actions github-actions bot locked as resolved and limited conversation to collaborators Nov 30, 2021
@dapengzhang0 dapengzhang0 deleted the fix-retry-stats-data-race branch January 16, 2022 18:28
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants