From 58a964ebb71a781633250b388d026cafe5be5cfa Mon Sep 17 00:00:00 2001 From: "Penn (Dapeng) Zhang" Date: Thu, 5 Aug 2021 10:46:12 -0700 Subject: [PATCH 1/2] core: fix bug RetriableStream cancel() racing with start() --- .../io/grpc/internal/RetriableStream.java | 42 ++++++++++++------- 1 file changed, 28 insertions(+), 14 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/RetriableStream.java b/core/src/main/java/io/grpc/internal/RetriableStream.java index 23725788466..a2d0e6b9e83 100644 --- a/core/src/main/java/io/grpc/internal/RetriableStream.java +++ b/core/src/main/java/io/grpc/internal/RetriableStream.java @@ -104,6 +104,7 @@ abstract class RetriableStream implements ClientStream { @GuardedBy("lock") private FutureCanceller scheduledHedging; private long nextBackoffIntervalNanos; + private Status cancellationStatus; RetriableStream( MethodDescriptor method, Metadata headers, @@ -244,14 +245,16 @@ private void drain(Substream substream) { int index = 0; int chunk = 0x80; List list = null; + boolean streamStarted = false; while (true) { State savedState; synchronized (lock) { savedState = state; - if (savedState.winningSubstream != null && savedState.winningSubstream != substream) { - // committed but not me + if (savedState.winningSubstream != null && savedState.winningSubstream != substream + && streamStarted) { + // committed but not me, to be cancelled break; } if (index == savedState.buffer.size()) { // I'm drained @@ -275,17 +278,22 @@ private void drain(Substream substream) { for (BufferEntry bufferEntry : list) { savedState = state; - if (savedState.winningSubstream != null && savedState.winningSubstream != substream) { - // committed but not me + if (savedState.winningSubstream != null && savedState.winningSubstream != substream + && streamStarted) { + // committed but not me, to be cancelled break; } - if (savedState.cancelled) { + if (savedState.cancelled && streamStarted) { checkState( savedState.winningSubstream == substream, "substream should be CANCELLED_BECAUSE_COMMITTED already"); + substream.stream.cancel(cancellationStatus); return; } bufferEntry.runWith(substream); + if (bufferEntry instanceof RetriableStream.StartEntry) { + streamStarted = true; + } } } @@ -299,6 +307,13 @@ private void drain(Substream substream) { @Nullable abstract Status prestart(); + class StartEntry implements BufferEntry { + @Override + public void runWith(Substream substream) { + substream.stream.start(new Sublistener(substream)); + } + } + /** Starts the first PRC attempt. */ @Override public final void start(ClientStreamListener listener) { @@ -311,13 +326,6 @@ public final void start(ClientStreamListener listener) { return; } - class StartEntry implements BufferEntry { - @Override - public void runWith(Substream substream) { - substream.stream.start(new Sublistener(substream)); - } - } - synchronized (lock) { state.buffer.add(new StartEntry()); } @@ -440,6 +448,7 @@ public void run() { @Override public final void cancel(Status reason) { + cancellationStatus = reason; Substream noopSubstream = new Substream(0 /* previousAttempts doesn't matter here */); noopSubstream.stream = new NoopClientStream(); Runnable runnable = commit(noopSubstream); @@ -450,11 +459,16 @@ public final void cancel(Status reason) { return; } - state.winningSubstream.stream.cancel(reason); + Substream winningSubstreamToCancel = null; synchronized (lock) { - // This is not required, but causes a short-circuit in the draining process. + if (state.drainedSubstreams.contains(state.winningSubstream)) { + winningSubstreamToCancel = state.winningSubstream; + } // otherwise the winningSubstream will be cancelled while draining state = state.cancelled(); } + if (winningSubstreamToCancel != null) { + winningSubstreamToCancel.stream.cancel(reason); + } } private void delayOrExecute(BufferEntry bufferEntry) { From 878206061ccb770bb0a998f360b8edc4b39682b9 Mon Sep 17 00:00:00 2001 From: "Penn (Dapeng) Zhang" Date: Thu, 5 Aug 2021 17:23:28 -0700 Subject: [PATCH 2/2] move cancellationStatus assignment to a more meaningful place --- core/src/main/java/io/grpc/internal/RetriableStream.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/RetriableStream.java b/core/src/main/java/io/grpc/internal/RetriableStream.java index a2d0e6b9e83..396c7cedfe2 100644 --- a/core/src/main/java/io/grpc/internal/RetriableStream.java +++ b/core/src/main/java/io/grpc/internal/RetriableStream.java @@ -448,7 +448,6 @@ public void run() { @Override public final void cancel(Status reason) { - cancellationStatus = reason; Substream noopSubstream = new Substream(0 /* previousAttempts doesn't matter here */); noopSubstream.stream = new NoopClientStream(); Runnable runnable = commit(noopSubstream); @@ -463,7 +462,9 @@ public final void cancel(Status reason) { synchronized (lock) { if (state.drainedSubstreams.contains(state.winningSubstream)) { winningSubstreamToCancel = state.winningSubstream; - } // otherwise the winningSubstream will be cancelled while draining + } else { // the winningSubstream will be cancelled while draining + cancellationStatus = reason; + } state = state.cancelled(); } if (winningSubstreamToCancel != null) {