Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: fix retry flow control issue #8401

Merged
merged 4 commits into from Aug 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
100 changes: 87 additions & 13 deletions core/src/main/java/io/grpc/internal/RetriableStream.java
Expand Up @@ -30,8 +30,10 @@
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.grpc.SynchronizationContext;
import io.grpc.internal.ClientStreamListener.RpcProgress;
import java.io.InputStream;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -64,6 +66,16 @@ abstract class RetriableStream<ReqT> implements ClientStream {

private final MethodDescriptor<ReqT, ?> method;
private final Executor callExecutor;
private final Executor listenerSerializeExecutor = new SynchronizationContext(
new UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
throw Status.fromThrowable(e)
.withDescription("Uncaught exception in the SynchronizationContext. Re-thrown.")
.asRuntimeException();
}
}
);
private final ScheduledExecutorService scheduledExecutorService;
// Must not modify it.
private final Metadata headers;
Expand Down Expand Up @@ -105,6 +117,7 @@ abstract class RetriableStream<ReqT> implements ClientStream {
private FutureCanceller scheduledHedging;
private long nextBackoffIntervalNanos;
private Status cancellationStatus;
private boolean isClosed;

RetriableStream(
MethodDescriptor<ReqT, ?> method, Metadata headers,
Expand Down Expand Up @@ -246,6 +259,7 @@ private void drain(Substream substream) {
int chunk = 0x80;
List<BufferEntry> list = null;
boolean streamStarted = false;
Runnable onReadyRunnable = null;

while (true) {
State savedState;
Expand All @@ -263,7 +277,18 @@ private void drain(Substream substream) {
}
if (index == savedState.buffer.size()) { // I'm drained
state = savedState.substreamDrained(substream);
return;
if (!isReady()) {
return;
}
onReadyRunnable = new Runnable() {
@Override
public void run() {
if (!isClosed) {
masterListener.onReady();
ejona86 marked this conversation as resolved.
Show resolved Hide resolved
}
}
};
break;
}

if (substream.closed) {
Expand Down Expand Up @@ -298,6 +323,11 @@ private void drain(Substream substream) {
}
}

if (onReadyRunnable != null) {
listenerSerializeExecutor.execute(onReadyRunnable);
return;
}

substream.stream.cancel(
state.winningSubstream == substream ? cancellationStatus : CANCELLED_BECAUSE_COMMITTED);
}
Expand Down Expand Up @@ -449,14 +479,22 @@ public void run() {
}

@Override
public final void cancel(Status reason) {
public final void cancel(final Status reason) {
Substream noopSubstream = new Substream(0 /* previousAttempts doesn't matter here */);
noopSubstream.stream = new NoopClientStream();
Runnable runnable = commit(noopSubstream);

if (runnable != null) {
masterListener.closed(reason, RpcProgress.PROCESSED, new Metadata());
runnable.run();
listenerSerializeExecutor.execute(
new Runnable() {
@Override
public void run() {
isClosed = true;
masterListener.closed(reason, RpcProgress.PROCESSED, new Metadata());

}
});
return;
}

Expand Down Expand Up @@ -770,18 +808,25 @@ private final class Sublistener implements ClientStreamListener {
}

@Override
public void headersRead(Metadata headers) {
public void headersRead(final Metadata headers) {
commitAndRun(substream);
if (state.winningSubstream == substream) {
masterListener.headersRead(headers);
if (throttle != null) {
throttle.onSuccess();
}
listenerSerializeExecutor.execute(
new Runnable() {
@Override
public void run() {
masterListener.headersRead(headers);
}
});
}
}

@Override
public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) {
public void closed(
final Status status, final RpcProgress rpcProgress, final Metadata trailers) {
synchronized (lock) {
state = state.substreamClosed(substream);
closedSubstreamsInsight.append(status.getCode());
Expand All @@ -792,7 +837,14 @@ public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) {
if (substream.bufferLimitExceeded) {
commitAndRun(substream);
if (state.winningSubstream == substream) {
masterListener.closed(status, rpcProgress, trailers);
listenerSerializeExecutor.execute(
new Runnable() {
@Override
public void run() {
isClosed = true;
masterListener.closed(status, rpcProgress, trailers);
}
});
}
return;
}
Expand Down Expand Up @@ -897,7 +949,14 @@ public void run() {

commitAndRun(substream);
if (state.winningSubstream == substream) {
masterListener.closed(status, rpcProgress, trailers);
listenerSerializeExecutor.execute(
new Runnable() {
@Override
public void run() {
isClosed = true;
masterListener.closed(status, rpcProgress, trailers);
}
});
}
}

Expand Down Expand Up @@ -967,22 +1026,37 @@ private Integer getPushbackMills(Metadata trailer) {
}

@Override
public void messagesAvailable(MessageProducer producer) {
public void messagesAvailable(final MessageProducer producer) {
State savedState = state;
checkState(
savedState.winningSubstream != null, "Headers should be received prior to messages.");
if (savedState.winningSubstream != substream) {
return;
}
masterListener.messagesAvailable(producer);
listenerSerializeExecutor.execute(
new Runnable() {
@Override
public void run() {
masterListener.messagesAvailable(producer);
}
});
}

@Override
public void onReady() {
// FIXME(#7089): hedging case is broken.
// TODO(zdapeng): optimization: if the substream is not drained yet, delay onReady() once
// drained and if is still ready.
masterListener.onReady();
if (!isReady()) {
return;
}
listenerSerializeExecutor.execute(
new Runnable() {
@Override
public void run() {
if (!isClosed) {
masterListener.onReady();
}
}
});
}
}

Expand Down