Skip to content

Commit

Permalink
core: fix RetriableStream edge case bug introduced in grpc#8386
Browse files Browse the repository at this point in the history
  • Loading branch information
dapengzhang0 committed Aug 6, 2021
1 parent 3668f2e commit ed60b57
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 18 deletions.
38 changes: 20 additions & 18 deletions core/src/main/java/io/grpc/internal/RetriableStream.java
Expand Up @@ -252,10 +252,14 @@ private void drain(Substream substream) {

synchronized (lock) {
savedState = state;
if (savedState.winningSubstream != null && savedState.winningSubstream != substream
&& streamStarted) {
// committed but not me, to be cancelled
break;
if (streamStarted) {
if (savedState.winningSubstream != null && savedState.winningSubstream != substream) {
// committed but not me, to be cancelled
break;
}
if (savedState.cancelled) {
break;
}
}
if (index == savedState.buffer.size()) { // I'm drained
state = savedState.substreamDrained(substream);
Expand All @@ -277,27 +281,25 @@ private void drain(Substream substream) {
}

for (BufferEntry bufferEntry : list) {
savedState = state;
if (savedState.winningSubstream != null && savedState.winningSubstream != substream
&& streamStarted) {
// committed but not me, to be cancelled
break;
}
if (savedState.cancelled && streamStarted) {
checkState(
savedState.winningSubstream == substream,
"substream should be CANCELLED_BECAUSE_COMMITTED already");
substream.stream.cancel(cancellationStatus);
return;
}
bufferEntry.runWith(substream);
if (bufferEntry instanceof RetriableStream.StartEntry) {
streamStarted = true;
}
if (streamStarted) {
savedState = state;
if (savedState.winningSubstream != null && savedState.winningSubstream != substream) {
// committed but not me, to be cancelled
break;
}
if (savedState.cancelled) {
break;
}
}
}
}

substream.stream.cancel(CANCELLED_BECAUSE_COMMITTED);
substream.stream.cancel(
state.winningSubstream == substream ? cancellationStatus : CANCELLED_BECAUSE_COMMITTED);
}

/**
Expand Down
128 changes: 128 additions & 0 deletions core/src/test/java/io/grpc/internal/RetriableStreamTest.java
Expand Up @@ -749,6 +749,91 @@ public void request(int numMessages) {
inOrder.verify(mockStream2, never()).writeMessage(any(InputStream.class));
}

@Test
public void cancelWhileDraining() {
ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
ArgumentCaptor.forClass(ClientStreamListener.class);
final Status cancelStatus = Status.CANCELLED.withDescription("cancelled while requesting");
ClientStream mockStream1 = mock(ClientStream.class);
ClientStream mockStream2 =
mock(
ClientStream.class,
delegatesTo(
new NoopClientStream() {
@Override
public void request(int numMessages) {
retriableStream.cancel(cancelStatus);
}
}));

InOrder inOrder = inOrder(retriableStreamRecorder, mockStream1, mockStream2);
doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0);
retriableStream.start(masterListener);
inOrder.verify(mockStream1).start(sublistenerCaptor1.capture());
retriableStream.request(3);
inOrder.verify(mockStream1).request(3);

// retry
doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1);
sublistenerCaptor1.getValue().closed(
Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata());
fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS);

inOrder.verify(mockStream2).start(any(ClientStreamListener.class));
inOrder.verify(mockStream2).request(3);
inOrder.verify(retriableStreamRecorder).postCommit();
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
inOrder.verify(mockStream2).cancel(statusCaptor.capture());
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.CANCELLED);
assertThat(statusCaptor.getValue().getDescription())
.isEqualTo("Stream thrown away because RetriableStream committed");
verify(masterListener).closed(
statusCaptor.capture(), any(RpcProgress.class), any(Metadata.class));
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.CANCELLED);
assertThat(statusCaptor.getValue().getDescription()).isEqualTo("cancelled while requesting");
}

@Test
public void cancelWhileRetryStart() {
ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
ArgumentCaptor.forClass(ClientStreamListener.class);
final Status cancelStatus = Status.CANCELLED.withDescription("cancelled while retry start");
ClientStream mockStream1 = mock(ClientStream.class);
ClientStream mockStream2 =
mock(
ClientStream.class,
delegatesTo(
new NoopClientStream() {
@Override
public void start(ClientStreamListener listener) {
retriableStream.cancel(cancelStatus);
}
}));

InOrder inOrder = inOrder(retriableStreamRecorder, mockStream1, mockStream2);
doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0);
retriableStream.start(masterListener);
inOrder.verify(mockStream1).start(sublistenerCaptor1.capture());

// retry
doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1);
sublistenerCaptor1.getValue().closed(
Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata());
fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS);

inOrder.verify(mockStream2).start(any(ClientStreamListener.class));
inOrder.verify(retriableStreamRecorder).postCommit();
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
inOrder.verify(mockStream2).cancel(statusCaptor.capture());
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.CANCELLED);
assertThat(statusCaptor.getValue().getDescription())
.isEqualTo("Stream thrown away because RetriableStream committed");
verify(masterListener).closed(
statusCaptor.capture(), any(RpcProgress.class), any(Metadata.class));
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.CANCELLED);
assertThat(statusCaptor.getValue().getDescription()).isEqualTo("cancelled while retry start");
}

@Test
public void operationsAfterImmediateCommit() {
ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
Expand Down Expand Up @@ -916,6 +1001,49 @@ public void start(ClientStreamListener listener) {
verify(mockStream3).request(1);
}

@Test
public void commitAndCancelWhileDraining() {
ClientStream mockStream1 = mock(ClientStream.class);
ClientStream mockStream2 =
mock(
ClientStream.class,
delegatesTo(
new NoopClientStream() {
@Override
public void start(ClientStreamListener listener) {
// commit while draining
listener.headersRead(new Metadata());
// cancel while draining
retriableStream.cancel(
Status.CANCELLED.withDescription("cancelled while drained"));
}
}));

when(retriableStreamRecorder.newSubstream(anyInt()))
.thenReturn(mockStream1, mockStream2);

retriableStream.start(masterListener);

ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
ArgumentCaptor.forClass(ClientStreamListener.class);
verify(mockStream1).start(sublistenerCaptor1.capture());

ClientStreamListener listener1 = sublistenerCaptor1.getValue();

// retry
listener1.closed(
Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata());
fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS);

verify(mockStream2).start(any(ClientStreamListener.class));
verify(retriableStreamRecorder).postCommit();
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
verify(mockStream2).cancel(statusCaptor.capture());
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.CANCELLED);
assertThat(statusCaptor.getValue().getDescription()).isEqualTo("cancelled while drained");
}



@Test
public void perRpcBufferLimitExceeded() {
Expand Down

0 comments on commit ed60b57

Please sign in to comment.