Skip to content

Commit

Permalink
core: fix bug RetriableStream cancel() racing with start() (grpc#8386)
Browse files Browse the repository at this point in the history
There is a bug in the scenario of the following sequence of events:

- `call.start()` 
- received retryable status and about to retry
- The retry attempt Substream is created but not yet `drain()` 
- `call.cancel()`

But `stream.cancel()` cannot be called prior to `stream.start()`, otherwise retry will cause a failure with IllegalStateException. The current RetriableStream code must be fixed to not cancel a stream until `start()` is called in `drain()`.
  • Loading branch information
dapengzhang0 committed Aug 6, 2021
1 parent c3b26d3 commit 551d51e
Showing 1 changed file with 29 additions and 14 deletions.
43 changes: 29 additions & 14 deletions core/src/main/java/io/grpc/internal/RetriableStream.java
Expand Up @@ -104,6 +104,7 @@ abstract class RetriableStream<ReqT> implements ClientStream {
@GuardedBy("lock")
private FutureCanceller scheduledHedging;
private long nextBackoffIntervalNanos;
private Status cancellationStatus;

RetriableStream(
MethodDescriptor<ReqT, ?> method, Metadata headers,
Expand Down Expand Up @@ -244,14 +245,16 @@ private void drain(Substream substream) {
int index = 0;
int chunk = 0x80;
List<BufferEntry> list = null;
boolean streamStarted = false;

while (true) {
State savedState;

synchronized (lock) {
savedState = state;
if (savedState.winningSubstream != null && savedState.winningSubstream != substream) {
// committed but not me
if (savedState.winningSubstream != null && savedState.winningSubstream != substream
&& streamStarted) {
// committed but not me, to be cancelled
break;
}
if (index == savedState.buffer.size()) { // I'm drained
Expand All @@ -275,17 +278,22 @@ private void drain(Substream substream) {

for (BufferEntry bufferEntry : list) {
savedState = state;
if (savedState.winningSubstream != null && savedState.winningSubstream != substream) {
// committed but not me
if (savedState.winningSubstream != null && savedState.winningSubstream != substream
&& streamStarted) {
// committed but not me, to be cancelled
break;
}
if (savedState.cancelled) {
if (savedState.cancelled && streamStarted) {
checkState(
savedState.winningSubstream == substream,
"substream should be CANCELLED_BECAUSE_COMMITTED already");
substream.stream.cancel(cancellationStatus);
return;
}
bufferEntry.runWith(substream);
if (bufferEntry instanceof RetriableStream.StartEntry) {
streamStarted = true;
}
}
}

Expand All @@ -299,6 +307,13 @@ private void drain(Substream substream) {
@Nullable
abstract Status prestart();

class StartEntry implements BufferEntry {
@Override
public void runWith(Substream substream) {
substream.stream.start(new Sublistener(substream));
}
}

/** Starts the first PRC attempt. */
@Override
public final void start(ClientStreamListener listener) {
Expand All @@ -311,13 +326,6 @@ public final void start(ClientStreamListener listener) {
return;
}

class StartEntry implements BufferEntry {
@Override
public void runWith(Substream substream) {
substream.stream.start(new Sublistener(substream));
}
}

synchronized (lock) {
state.buffer.add(new StartEntry());
}
Expand Down Expand Up @@ -450,11 +458,18 @@ public final void cancel(Status reason) {
return;
}

state.winningSubstream.stream.cancel(reason);
Substream winningSubstreamToCancel = null;
synchronized (lock) {
// This is not required, but causes a short-circuit in the draining process.
if (state.drainedSubstreams.contains(state.winningSubstream)) {
winningSubstreamToCancel = state.winningSubstream;
} else { // the winningSubstream will be cancelled while draining
cancellationStatus = reason;
}
state = state.cancelled();
}
if (winningSubstreamToCancel != null) {
winningSubstreamToCancel.stream.cancel(reason);
}
}

private void delayOrExecute(BufferEntry bufferEntry) {
Expand Down

0 comments on commit 551d51e

Please sign in to comment.