From cbda32a3c1c8e253d8d78afef4b5f934f0db1826 Mon Sep 17 00:00:00 2001 From: ZHANG Dapeng Date: Fri, 6 Aug 2021 18:32:55 -0700 Subject: [PATCH] core: fix RetriableStream edge case bug introduced in #8386 (#8393) While adding regression tests to #8386, I found a bug in an edge case: while retry attempt is draining the last buffered entry, if it is in the mean time committed and then we cancel the call, the stream will never be cancelled. See the regression test case `commitAndCancelWhileDraining()`. --- .../io/grpc/internal/RetriableStream.java | 38 +++--- .../io/grpc/internal/RetriableStreamTest.java | 126 ++++++++++++++++++ 2 files changed, 146 insertions(+), 18 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/RetriableStream.java b/core/src/main/java/io/grpc/internal/RetriableStream.java index 396c7cedfe2..d19a260049b 100644 --- a/core/src/main/java/io/grpc/internal/RetriableStream.java +++ b/core/src/main/java/io/grpc/internal/RetriableStream.java @@ -252,10 +252,14 @@ private void drain(Substream substream) { synchronized (lock) { savedState = state; - if (savedState.winningSubstream != null && savedState.winningSubstream != substream - && streamStarted) { - // committed but not me, to be cancelled - break; + if (streamStarted) { + if (savedState.winningSubstream != null && savedState.winningSubstream != substream) { + // committed but not me, to be cancelled + break; + } + if (savedState.cancelled) { + break; + } } if (index == savedState.buffer.size()) { // I'm drained state = savedState.substreamDrained(substream); @@ -277,27 +281,25 @@ private void drain(Substream substream) { } for (BufferEntry bufferEntry : list) { - savedState = state; - if (savedState.winningSubstream != null && savedState.winningSubstream != substream - && streamStarted) { - // committed but not me, to be cancelled - break; - } - if (savedState.cancelled && streamStarted) { - checkState( - savedState.winningSubstream == substream, - "substream should be CANCELLED_BECAUSE_COMMITTED already"); - substream.stream.cancel(cancellationStatus); - return; - } bufferEntry.runWith(substream); if (bufferEntry instanceof RetriableStream.StartEntry) { streamStarted = true; } + if (streamStarted) { + savedState = state; + if (savedState.winningSubstream != null && savedState.winningSubstream != substream) { + // committed but not me, to be cancelled + break; + } + if (savedState.cancelled) { + break; + } + } } } - substream.stream.cancel(CANCELLED_BECAUSE_COMMITTED); + substream.stream.cancel( + state.winningSubstream == substream ? cancellationStatus : CANCELLED_BECAUSE_COMMITTED); } /** diff --git a/core/src/test/java/io/grpc/internal/RetriableStreamTest.java b/core/src/test/java/io/grpc/internal/RetriableStreamTest.java index 26c6fcf9b4e..95d2c2ba8b5 100644 --- a/core/src/test/java/io/grpc/internal/RetriableStreamTest.java +++ b/core/src/test/java/io/grpc/internal/RetriableStreamTest.java @@ -749,6 +749,91 @@ public void request(int numMessages) { inOrder.verify(mockStream2, never()).writeMessage(any(InputStream.class)); } + @Test + public void cancelWhileDraining() { + ArgumentCaptor sublistenerCaptor1 = + ArgumentCaptor.forClass(ClientStreamListener.class); + ClientStream mockStream1 = mock(ClientStream.class); + ClientStream mockStream2 = + mock( + ClientStream.class, + delegatesTo( + new NoopClientStream() { + @Override + public void request(int numMessages) { + retriableStream.cancel( + Status.CANCELLED.withDescription("cancelled while requesting")); + } + })); + + InOrder inOrder = inOrder(retriableStreamRecorder, mockStream1, mockStream2); + doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0); + retriableStream.start(masterListener); + inOrder.verify(mockStream1).start(sublistenerCaptor1.capture()); + retriableStream.request(3); + inOrder.verify(mockStream1).request(3); + + // retry + doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1); + sublistenerCaptor1.getValue().closed( + Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata()); + fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS); + + inOrder.verify(mockStream2).start(any(ClientStreamListener.class)); + inOrder.verify(mockStream2).request(3); + inOrder.verify(retriableStreamRecorder).postCommit(); + ArgumentCaptor statusCaptor = ArgumentCaptor.forClass(Status.class); + inOrder.verify(mockStream2).cancel(statusCaptor.capture()); + assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.CANCELLED); + assertThat(statusCaptor.getValue().getDescription()) + .isEqualTo("Stream thrown away because RetriableStream committed"); + verify(masterListener).closed( + statusCaptor.capture(), any(RpcProgress.class), any(Metadata.class)); + assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.CANCELLED); + assertThat(statusCaptor.getValue().getDescription()).isEqualTo("cancelled while requesting"); + } + + @Test + public void cancelWhileRetryStart() { + ArgumentCaptor sublistenerCaptor1 = + ArgumentCaptor.forClass(ClientStreamListener.class); + ClientStream mockStream1 = mock(ClientStream.class); + ClientStream mockStream2 = + mock( + ClientStream.class, + delegatesTo( + new NoopClientStream() { + @Override + public void start(ClientStreamListener listener) { + retriableStream.cancel( + Status.CANCELLED.withDescription("cancelled while retry start")); + } + })); + + InOrder inOrder = inOrder(retriableStreamRecorder, mockStream1, mockStream2); + doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0); + retriableStream.start(masterListener); + inOrder.verify(mockStream1).start(sublistenerCaptor1.capture()); + + // retry + doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1); + sublistenerCaptor1.getValue().closed( + Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata()); + fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS); + + inOrder.verify(mockStream2).start(any(ClientStreamListener.class)); + inOrder.verify(retriableStreamRecorder).postCommit(); + ArgumentCaptor statusCaptor = ArgumentCaptor.forClass(Status.class); + inOrder.verify(mockStream2).cancel(statusCaptor.capture()); + assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.CANCELLED); + assertThat(statusCaptor.getValue().getDescription()) + .isEqualTo("Stream thrown away because RetriableStream committed"); + verify(masterListener).closed( + statusCaptor.capture(), any(RpcProgress.class), any(Metadata.class)); + assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.CANCELLED); + assertThat(statusCaptor.getValue().getDescription()).isEqualTo("cancelled while retry start"); + } + @Test public void operationsAfterImmediateCommit() { ArgumentCaptor sublistenerCaptor1 = @@ -916,6 +1001,47 @@ public void start(ClientStreamListener listener) { verify(mockStream3).request(1); } + @Test + public void commitAndCancelWhileDraining() { + ClientStream mockStream1 = mock(ClientStream.class); + ClientStream mockStream2 = + mock( + ClientStream.class, + delegatesTo( + new NoopClientStream() { + @Override + public void start(ClientStreamListener listener) { + // commit while draining + listener.headersRead(new Metadata()); + // cancel while draining + retriableStream.cancel( + Status.CANCELLED.withDescription("cancelled while drained")); + } + })); + + when(retriableStreamRecorder.newSubstream(anyInt())) + .thenReturn(mockStream1, mockStream2); + + retriableStream.start(masterListener); + + ArgumentCaptor sublistenerCaptor1 = + ArgumentCaptor.forClass(ClientStreamListener.class); + verify(mockStream1).start(sublistenerCaptor1.capture()); + + ClientStreamListener listener1 = sublistenerCaptor1.getValue(); + + // retry + listener1.closed( + Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata()); + fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS); + + verify(mockStream2).start(any(ClientStreamListener.class)); + verify(retriableStreamRecorder).postCommit(); + ArgumentCaptor statusCaptor = ArgumentCaptor.forClass(Status.class); + verify(mockStream2).cancel(statusCaptor.capture()); + assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.CANCELLED); + assertThat(statusCaptor.getValue().getDescription()).isEqualTo("cancelled while drained"); + } @Test public void perRpcBufferLimitExceeded() {