Skip to content

Commit

Permalink
core: fix retry flow control issue
Browse files Browse the repository at this point in the history
  • Loading branch information
dapengzhang0 committed Aug 10, 2021
1 parent 96a5c25 commit e43b829
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 18 deletions.
79 changes: 66 additions & 13 deletions core/src/main/java/io/grpc/internal/RetriableStream.java
Expand Up @@ -64,6 +64,7 @@ abstract class RetriableStream<ReqT> implements ClientStream {

private final MethodDescriptor<ReqT, ?> method;
private final Executor callExecutor;
private final Executor listenerSerializeExecutor = new SerializeReentrantCallsDirectExecutor();
private final ScheduledExecutorService scheduledExecutorService;
// Must not modify it.
private final Metadata headers;
Expand Down Expand Up @@ -246,6 +247,7 @@ private void drain(Substream substream) {
int chunk = 0x80;
List<BufferEntry> list = null;
boolean streamStarted = false;
Runnable onReadyRunnable = null;

while (true) {
State savedState;
Expand All @@ -263,7 +265,15 @@ private void drain(Substream substream) {
}
if (index == savedState.buffer.size()) { // I'm drained
state = savedState.substreamDrained(substream);
return;
onReadyRunnable = new Runnable() {
@Override
public void run() {
if (isReady()) {
masterListener.onReady();
}
}
};
break;
}

if (substream.closed) {
Expand Down Expand Up @@ -298,6 +308,11 @@ private void drain(Substream substream) {
}
}

if (onReadyRunnable != null) {
listenerSerializeExecutor.execute(onReadyRunnable);
return;
}

substream.stream.cancel(
state.winningSubstream == substream ? cancellationStatus : CANCELLED_BECAUSE_COMMITTED);
}
Expand Down Expand Up @@ -449,14 +464,21 @@ public void run() {
}

@Override
public final void cancel(Status reason) {
public final void cancel(final Status reason) {
Substream noopSubstream = new Substream(0 /* previousAttempts doesn't matter here */);
noopSubstream.stream = new NoopClientStream();
Runnable runnable = commit(noopSubstream);

if (runnable != null) {
masterListener.closed(reason, RpcProgress.PROCESSED, new Metadata());
runnable.run();
listenerSerializeExecutor.execute(
new Runnable() {
@Override
public void run() {
masterListener.closed(reason, RpcProgress.PROCESSED, new Metadata());

}
});
return;
}

Expand Down Expand Up @@ -770,18 +792,25 @@ private final class Sublistener implements ClientStreamListener {
}

@Override
public void headersRead(Metadata headers) {
public void headersRead(final Metadata headers) {
commitAndRun(substream);
if (state.winningSubstream == substream) {
masterListener.headersRead(headers);
if (throttle != null) {
throttle.onSuccess();
}
listenerSerializeExecutor.execute(
new Runnable() {
@Override
public void run() {
masterListener.headersRead(headers);
}
});
}
}

@Override
public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) {
public void closed(
final Status status, final RpcProgress rpcProgress, final Metadata trailers) {
synchronized (lock) {
state = state.substreamClosed(substream);
closedSubstreamsInsight.append(status.getCode());
Expand All @@ -792,7 +821,13 @@ public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) {
if (substream.bufferLimitExceeded) {
commitAndRun(substream);
if (state.winningSubstream == substream) {
masterListener.closed(status, rpcProgress, trailers);
listenerSerializeExecutor.execute(
new Runnable() {
@Override
public void run() {
masterListener.closed(status, rpcProgress, trailers);
}
});
}
return;
}
Expand Down Expand Up @@ -897,7 +932,13 @@ public void run() {

commitAndRun(substream);
if (state.winningSubstream == substream) {
masterListener.closed(status, rpcProgress, trailers);
listenerSerializeExecutor.execute(
new Runnable() {
@Override
public void run() {
masterListener.closed(status, rpcProgress, trailers);
}
});
}
}

Expand Down Expand Up @@ -967,22 +1008,34 @@ private Integer getPushbackMills(Metadata trailer) {
}

@Override
public void messagesAvailable(MessageProducer producer) {
public void messagesAvailable(final MessageProducer producer) {
State savedState = state;
checkState(
savedState.winningSubstream != null, "Headers should be received prior to messages.");
if (savedState.winningSubstream != substream) {
return;
}
masterListener.messagesAvailable(producer);
listenerSerializeExecutor.execute(
new Runnable() {
@Override
public void run() {
masterListener.messagesAvailable(producer);
}
});
}

@Override
public void onReady() {
// FIXME(#7089): hedging case is broken.
// TODO(zdapeng): optimization: if the substream is not drained yet, delay onReady() once
// drained and if is still ready.
masterListener.onReady();
listenerSerializeExecutor.execute(
new Runnable() {
@Override
public void run() {
if (isReady()) {
masterListener.onReady();
}
}
});
}
}

Expand Down

0 comments on commit e43b829

Please sign in to comment.