Skip to content

Commit

Permalink
core: fix retry flow control issue (grpc#8401)
Browse files Browse the repository at this point in the history
There has been an issue about flow control when retry is enabled.

Currently we call `masterListener.onReady()` whenever `substreamListener.onReady()` is called.

The user's `onReady()` implementation might do

```
while(observer.isReady()) {
  // send one more message.
}
```

However, currently if the `RetriableStream` is still draining, `isReady()` is false, and user's `onReady()` exits immediately. And because `substreamListener.onReady()` is already called, it may not be called again after drained.

This PR fixes the issue by

- Use a SerializeExecutor to call all `masterListener` callbacks.
- Once `RetriableStream` is drained, check `isReady()` and if so call `onReady()`.
- Once `substreamListener.onReady()` is called, check `isReady()` and only if so we call `masterListener.onReady()`.
  • Loading branch information
dapengzhang0 committed Aug 12, 2021
1 parent 2c7f56a commit 969a7b5
Show file tree
Hide file tree
Showing 2 changed files with 170 additions and 18 deletions.
100 changes: 87 additions & 13 deletions core/src/main/java/io/grpc/internal/RetriableStream.java
Expand Up @@ -30,8 +30,10 @@
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.grpc.SynchronizationContext;
import io.grpc.internal.ClientStreamListener.RpcProgress;
import java.io.InputStream;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -64,6 +66,16 @@ abstract class RetriableStream<ReqT> implements ClientStream {

private final MethodDescriptor<ReqT, ?> method;
private final Executor callExecutor;
private final Executor listenerSerializeExecutor = new SynchronizationContext(
new UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
throw Status.fromThrowable(e)
.withDescription("Uncaught exception in the SynchronizationContext. Re-thrown.")
.asRuntimeException();
}
}
);
private final ScheduledExecutorService scheduledExecutorService;
// Must not modify it.
private final Metadata headers;
Expand Down Expand Up @@ -105,6 +117,7 @@ abstract class RetriableStream<ReqT> implements ClientStream {
private FutureCanceller scheduledHedging;
private long nextBackoffIntervalNanos;
private Status cancellationStatus;
private boolean isClosed;

RetriableStream(
MethodDescriptor<ReqT, ?> method, Metadata headers,
Expand Down Expand Up @@ -247,6 +260,7 @@ private void drain(Substream substream) {
int chunk = 0x80;
List<BufferEntry> list = null;
boolean streamStarted = false;
Runnable onReadyRunnable = null;

while (true) {
State savedState;
Expand All @@ -264,7 +278,18 @@ private void drain(Substream substream) {
}
if (index == savedState.buffer.size()) { // I'm drained
state = savedState.substreamDrained(substream);
return;
if (!isReady()) {
return;
}
onReadyRunnable = new Runnable() {
@Override
public void run() {
if (!isClosed) {
masterListener.onReady();
}
}
};
break;
}

if (substream.closed) {
Expand Down Expand Up @@ -299,6 +324,11 @@ private void drain(Substream substream) {
}
}

if (onReadyRunnable != null) {
listenerSerializeExecutor.execute(onReadyRunnable);
return;
}

substream.stream.cancel(
state.winningSubstream == substream ? cancellationStatus : CANCELLED_BECAUSE_COMMITTED);
}
Expand Down Expand Up @@ -450,14 +480,22 @@ public void run() {
}

@Override
public final void cancel(Status reason) {
public final void cancel(final Status reason) {
Substream noopSubstream = new Substream(0 /* previousAttempts doesn't matter here */);
noopSubstream.stream = new NoopClientStream();
Runnable runnable = commit(noopSubstream);

if (runnable != null) {
masterListener.closed(reason, RpcProgress.PROCESSED, new Metadata());
runnable.run();
listenerSerializeExecutor.execute(
new Runnable() {
@Override
public void run() {
isClosed = true;
masterListener.closed(reason, RpcProgress.PROCESSED, new Metadata());

}
});
return;
}

Expand Down Expand Up @@ -771,18 +809,25 @@ private final class Sublistener implements ClientStreamListener {
}

@Override
public void headersRead(Metadata headers) {
public void headersRead(final Metadata headers) {
commitAndRun(substream);
if (state.winningSubstream == substream) {
masterListener.headersRead(headers);
if (throttle != null) {
throttle.onSuccess();
}
listenerSerializeExecutor.execute(
new Runnable() {
@Override
public void run() {
masterListener.headersRead(headers);
}
});
}
}

@Override
public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) {
public void closed(
final Status status, final RpcProgress rpcProgress, final Metadata trailers) {
synchronized (lock) {
state = state.substreamClosed(substream);
closedSubstreamsInsight.append(status.getCode());
Expand All @@ -793,7 +838,14 @@ public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) {
if (substream.bufferLimitExceeded) {
commitAndRun(substream);
if (state.winningSubstream == substream) {
masterListener.closed(status, rpcProgress, trailers);
listenerSerializeExecutor.execute(
new Runnable() {
@Override
public void run() {
isClosed = true;
masterListener.closed(status, rpcProgress, trailers);
}
});
}
return;
}
Expand Down Expand Up @@ -900,7 +952,14 @@ public void run() {

commitAndRun(substream);
if (state.winningSubstream == substream) {
masterListener.closed(status, rpcProgress, trailers);
listenerSerializeExecutor.execute(
new Runnable() {
@Override
public void run() {
isClosed = true;
masterListener.closed(status, rpcProgress, trailers);
}
});
}
}

Expand Down Expand Up @@ -970,22 +1029,37 @@ private Integer getPushbackMills(Metadata trailer) {
}

@Override
public void messagesAvailable(MessageProducer producer) {
public void messagesAvailable(final MessageProducer producer) {
State savedState = state;
checkState(
savedState.winningSubstream != null, "Headers should be received prior to messages.");
if (savedState.winningSubstream != substream) {
return;
}
masterListener.messagesAvailable(producer);
listenerSerializeExecutor.execute(
new Runnable() {
@Override
public void run() {
masterListener.messagesAvailable(producer);
}
});
}

@Override
public void onReady() {
// FIXME(#7089): hedging case is broken.
// TODO(zdapeng): optimization: if the substream is not drained yet, delay onReady() once
// drained and if is still ready.
masterListener.onReady();
if (!isReady()) {
return;
}
listenerSerializeExecutor.execute(
new Runnable() {
@Override
public void run() {
if (!isClosed) {
masterListener.onReady();
}
}
});
}
}

Expand Down

0 comments on commit 969a7b5

Please sign in to comment.