From e43b829c3e2d478fec49f71a7bbb74e7012a25be Mon Sep 17 00:00:00 2001 From: "Penn (Dapeng) Zhang" Date: Tue, 10 Aug 2021 16:07:37 -0700 Subject: [PATCH] core: fix retry flow control issue --- .../io/grpc/internal/RetriableStream.java | 79 ++++++++++++++--- .../io/grpc/internal/RetriableStreamTest.java | 88 +++++++++++++++++-- 2 files changed, 149 insertions(+), 18 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/RetriableStream.java b/core/src/main/java/io/grpc/internal/RetriableStream.java index d19a260049b..f7c476699ec 100644 --- a/core/src/main/java/io/grpc/internal/RetriableStream.java +++ b/core/src/main/java/io/grpc/internal/RetriableStream.java @@ -64,6 +64,7 @@ abstract class RetriableStream implements ClientStream { private final MethodDescriptor method; private final Executor callExecutor; + private final Executor listenerSerializeExecutor = new SerializeReentrantCallsDirectExecutor(); private final ScheduledExecutorService scheduledExecutorService; // Must not modify it. private final Metadata headers; @@ -246,6 +247,7 @@ private void drain(Substream substream) { int chunk = 0x80; List list = null; boolean streamStarted = false; + Runnable onReadyRunnable = null; while (true) { State savedState; @@ -263,7 +265,15 @@ private void drain(Substream substream) { } if (index == savedState.buffer.size()) { // I'm drained state = savedState.substreamDrained(substream); - return; + onReadyRunnable = new Runnable() { + @Override + public void run() { + if (isReady()) { + masterListener.onReady(); + } + } + }; + break; } if (substream.closed) { @@ -298,6 +308,11 @@ private void drain(Substream substream) { } } + if (onReadyRunnable != null) { + listenerSerializeExecutor.execute(onReadyRunnable); + return; + } + substream.stream.cancel( state.winningSubstream == substream ? cancellationStatus : CANCELLED_BECAUSE_COMMITTED); } @@ -449,14 +464,21 @@ public void run() { } @Override - public final void cancel(Status reason) { + public final void cancel(final Status reason) { Substream noopSubstream = new Substream(0 /* previousAttempts doesn't matter here */); noopSubstream.stream = new NoopClientStream(); Runnable runnable = commit(noopSubstream); if (runnable != null) { - masterListener.closed(reason, RpcProgress.PROCESSED, new Metadata()); runnable.run(); + listenerSerializeExecutor.execute( + new Runnable() { + @Override + public void run() { + masterListener.closed(reason, RpcProgress.PROCESSED, new Metadata()); + + } + }); return; } @@ -770,18 +792,25 @@ private final class Sublistener implements ClientStreamListener { } @Override - public void headersRead(Metadata headers) { + public void headersRead(final Metadata headers) { commitAndRun(substream); if (state.winningSubstream == substream) { - masterListener.headersRead(headers); if (throttle != null) { throttle.onSuccess(); } + listenerSerializeExecutor.execute( + new Runnable() { + @Override + public void run() { + masterListener.headersRead(headers); + } + }); } } @Override - public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) { + public void closed( + final Status status, final RpcProgress rpcProgress, final Metadata trailers) { synchronized (lock) { state = state.substreamClosed(substream); closedSubstreamsInsight.append(status.getCode()); @@ -792,7 +821,13 @@ public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) { if (substream.bufferLimitExceeded) { commitAndRun(substream); if (state.winningSubstream == substream) { - masterListener.closed(status, rpcProgress, trailers); + listenerSerializeExecutor.execute( + new Runnable() { + @Override + public void run() { + masterListener.closed(status, rpcProgress, trailers); + } + }); } return; } @@ -897,7 +932,13 @@ public void run() { commitAndRun(substream); if (state.winningSubstream == substream) { - masterListener.closed(status, rpcProgress, trailers); + listenerSerializeExecutor.execute( + new Runnable() { + @Override + public void run() { + masterListener.closed(status, rpcProgress, trailers); + } + }); } } @@ -967,22 +1008,34 @@ private Integer getPushbackMills(Metadata trailer) { } @Override - public void messagesAvailable(MessageProducer producer) { + public void messagesAvailable(final MessageProducer producer) { State savedState = state; checkState( savedState.winningSubstream != null, "Headers should be received prior to messages."); if (savedState.winningSubstream != substream) { return; } - masterListener.messagesAvailable(producer); + listenerSerializeExecutor.execute( + new Runnable() { + @Override + public void run() { + masterListener.messagesAvailable(producer); + } + }); } @Override public void onReady() { // FIXME(#7089): hedging case is broken. - // TODO(zdapeng): optimization: if the substream is not drained yet, delay onReady() once - // drained and if is still ready. - masterListener.onReady(); + listenerSerializeExecutor.execute( + new Runnable() { + @Override + public void run() { + if (isReady()) { + masterListener.onReady(); + } + } + }); } } diff --git a/core/src/test/java/io/grpc/internal/RetriableStreamTest.java b/core/src/test/java/io/grpc/internal/RetriableStreamTest.java index 95d2c2ba8b5..28c4ef3f25b 100644 --- a/core/src/test/java/io/grpc/internal/RetriableStreamTest.java +++ b/core/src/test/java/io/grpc/internal/RetriableStreamTest.java @@ -255,6 +255,7 @@ public Void answer(InvocationOnMock in) { ArgumentCaptor sublistenerCaptor1 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream1).start(sublistenerCaptor1.capture()); + inOrder.verify(mockStream1).isReady(); inOrder.verifyNoMoreInteractions(); retriableStream.sendMessage("msg1"); @@ -307,6 +308,7 @@ public Void answer(InvocationOnMock in) { inOrder.verify(mockStream2).writeMessage(any(InputStream.class)); inOrder.verify(mockStream2).request(456); inOrder.verify(mockStream2, times(2)).writeMessage(any(InputStream.class)); + inOrder.verify(mockStream2).isReady(); inOrder.verifyNoMoreInteractions(); // send more messages @@ -355,6 +357,7 @@ public Void answer(InvocationOnMock in) { inOrder.verify(mockStream3).writeMessage(any(InputStream.class)); inOrder.verify(mockStream3).request(456); inOrder.verify(mockStream3, times(7)).writeMessage(any(InputStream.class)); + inOrder.verify(mockStream3).isReady(); inOrder.verifyNoMoreInteractions(); InsightBuilder insight = new InsightBuilder(); @@ -636,6 +639,7 @@ public void retry_cancelWhileBackoff() { ArgumentCaptor sublistenerCaptor1 = ArgumentCaptor.forClass(ClientStreamListener.class); verify(mockStream1).start(sublistenerCaptor1.capture()); + verify(mockStream1).isReady(); // retry ClientStream mockStream2 = mock(ClientStream.class); @@ -655,7 +659,7 @@ public void retry_cancelWhileBackoff() { @Test public void operationsWhileDraining() { - ArgumentCaptor sublistenerCaptor1 = + final ArgumentCaptor sublistenerCaptor1 = ArgumentCaptor.forClass(ClientStreamListener.class); final AtomicReference sublistenerCaptor2 = new AtomicReference<>(); @@ -668,10 +672,16 @@ public void operationsWhileDraining() { @Override public void request(int numMessages) { retriableStream.sendMessage("substream1 request " + numMessages); + sublistenerCaptor1.getValue().onReady(); if (numMessages > 1) { retriableStream.request(--numMessages); } } + + @Override + public boolean isReady() { + return true; + } })); final ClientStream mockStream2 = @@ -687,7 +697,7 @@ public void start(ClientStreamListener listener) { @Override public void request(int numMessages) { retriableStream.sendMessage("substream2 request " + numMessages); - + sublistenerCaptor2.get().onReady(); if (numMessages == 3) { sublistenerCaptor2.get().headersRead(new Metadata()); } @@ -698,9 +708,14 @@ public void request(int numMessages) { retriableStream.cancel(cancelStatus); } } + + @Override + public boolean isReady() { + return true; + } })); - InOrder inOrder = inOrder(retriableStreamRecorder, mockStream1, mockStream2); + InOrder inOrder = inOrder(retriableStreamRecorder, mockStream1, mockStream2, masterListener); doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0); retriableStream.start(masterListener); @@ -715,6 +730,7 @@ public void request(int numMessages) { inOrder.verify(mockStream1).writeMessage(any(InputStream.class)); // msg "substream1 request 2" inOrder.verify(mockStream1).request(1); inOrder.verify(mockStream1).writeMessage(any(InputStream.class)); // msg "substream1 request 1" + inOrder.verify(masterListener).onReady(); // retry doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1); @@ -742,8 +758,8 @@ public void request(int numMessages) { // msg "substream2 request 2" inOrder.verify(mockStream2, times(2)).writeMessage(any(InputStream.class)); inOrder.verify(mockStream2).request(100); - - verify(mockStream2).cancel(cancelStatus); + inOrder.verify(mockStream2).cancel(cancelStatus); + inOrder.verify(masterListener, never()).onReady(); // "substream2 request 1" will never be sent inOrder.verify(mockStream2, never()).writeMessage(any(InputStream.class)); @@ -1072,6 +1088,7 @@ public void perRpcBufferLimitExceededDuringBackoff() { ArgumentCaptor sublistenerCaptor1 = ArgumentCaptor.forClass(ClientStreamListener.class); verify(mockStream1).start(sublistenerCaptor1.capture()); + verify(mockStream1).isReady(); bufferSizeTracer.outboundWireSize(PER_RPC_BUFFER_LIMIT - 1); @@ -1088,6 +1105,7 @@ public void perRpcBufferLimitExceededDuringBackoff() { fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS); verify(mockStream2).start(any(ClientStreamListener.class)); + verify(mockStream2).isReady(); // bufferLimitExceeded bufferSizeTracer.outboundWireSize(PER_RPC_BUFFER_LIMIT - 1); @@ -1151,6 +1169,7 @@ public void expBackoff_maxBackoff_maxRetryAttempts() { ArgumentCaptor sublistenerCaptor1 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream1).start(sublistenerCaptor1.capture()); + inOrder.verify(mockStream1).isReady(); inOrder.verifyNoMoreInteractions(); @@ -1166,6 +1185,7 @@ public void expBackoff_maxBackoff_maxRetryAttempts() { ArgumentCaptor sublistenerCaptor2 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream2).start(sublistenerCaptor2.capture()); + inOrder.verify(mockStream2).isReady(); inOrder.verifyNoMoreInteractions(); // retry2 @@ -1182,6 +1202,7 @@ public void expBackoff_maxBackoff_maxRetryAttempts() { ArgumentCaptor sublistenerCaptor3 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream3).start(sublistenerCaptor3.capture()); + inOrder.verify(mockStream3).isReady(); inOrder.verifyNoMoreInteractions(); // retry3 @@ -1199,6 +1220,7 @@ public void expBackoff_maxBackoff_maxRetryAttempts() { ArgumentCaptor sublistenerCaptor4 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream4).start(sublistenerCaptor4.capture()); + inOrder.verify(mockStream4).isReady(); inOrder.verifyNoMoreInteractions(); // retry4 @@ -1213,6 +1235,7 @@ public void expBackoff_maxBackoff_maxRetryAttempts() { ArgumentCaptor sublistenerCaptor5 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream5).start(sublistenerCaptor5.capture()); + inOrder.verify(mockStream5).isReady(); inOrder.verifyNoMoreInteractions(); // retry5 @@ -1227,6 +1250,7 @@ public void expBackoff_maxBackoff_maxRetryAttempts() { ArgumentCaptor sublistenerCaptor6 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream6).start(sublistenerCaptor6.capture()); + inOrder.verify(mockStream6).isReady(); inOrder.verifyNoMoreInteractions(); // can not retry any more @@ -1257,6 +1281,7 @@ public void pushback() { ArgumentCaptor sublistenerCaptor1 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream1).start(sublistenerCaptor1.capture()); + inOrder.verify(mockStream1).isReady(); inOrder.verifyNoMoreInteractions(); @@ -1275,6 +1300,7 @@ public void pushback() { ArgumentCaptor sublistenerCaptor2 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream2).start(sublistenerCaptor2.capture()); + inOrder.verify(mockStream2).isReady(); inOrder.verifyNoMoreInteractions(); // retry2 @@ -1292,6 +1318,7 @@ public void pushback() { ArgumentCaptor sublistenerCaptor3 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream3).start(sublistenerCaptor3.capture()); + inOrder.verify(mockStream3).isReady(); inOrder.verifyNoMoreInteractions(); // retry3 @@ -1306,6 +1333,7 @@ public void pushback() { ArgumentCaptor sublistenerCaptor4 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream4).start(sublistenerCaptor4.capture()); + inOrder.verify(mockStream4).isReady(); inOrder.verifyNoMoreInteractions(); // retry4 @@ -1322,6 +1350,7 @@ public void pushback() { ArgumentCaptor sublistenerCaptor5 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream5).start(sublistenerCaptor5.capture()); + inOrder.verify(mockStream5).isReady(); inOrder.verifyNoMoreInteractions(); // retry5 @@ -1339,6 +1368,7 @@ public void pushback() { ArgumentCaptor sublistenerCaptor6 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream6).start(sublistenerCaptor6.capture()); + inOrder.verify(mockStream6).isReady(); inOrder.verifyNoMoreInteractions(); // can not retry any more even pushback is positive @@ -1596,6 +1626,7 @@ public void transparentRetry() { ArgumentCaptor sublistenerCaptor1 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream1).start(sublistenerCaptor1.capture()); + inOrder.verify(mockStream1).isReady(); inOrder.verifyNoMoreInteractions(); // transparent retry @@ -1607,6 +1638,7 @@ public void transparentRetry() { ArgumentCaptor sublistenerCaptor2 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream2).start(sublistenerCaptor2.capture()); + inOrder.verify(mockStream2).isReady(); inOrder.verifyNoMoreInteractions(); verify(retriableStreamRecorder, never()).postCommit(); assertEquals(0, fakeClock.numPendingTasks()); @@ -1622,6 +1654,7 @@ public void transparentRetry() { ArgumentCaptor sublistenerCaptor3 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream3).start(sublistenerCaptor3.capture()); + inOrder.verify(mockStream3).isReady(); inOrder.verifyNoMoreInteractions(); verify(retriableStreamRecorder, never()).postCommit(); assertEquals(0, fakeClock.numPendingTasks()); @@ -1644,6 +1677,7 @@ public void normalRetry_thenNoTransparentRetry_butNormalRetry() { ArgumentCaptor sublistenerCaptor1 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream1).start(sublistenerCaptor1.capture()); + inOrder.verify(mockStream1).isReady(); inOrder.verifyNoMoreInteractions(); // normal retry @@ -1657,6 +1691,7 @@ public void normalRetry_thenNoTransparentRetry_butNormalRetry() { ArgumentCaptor sublistenerCaptor2 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream2).start(sublistenerCaptor2.capture()); + inOrder.verify(mockStream2).isReady(); inOrder.verifyNoMoreInteractions(); verify(retriableStreamRecorder, never()).postCommit(); assertEquals(0, fakeClock.numPendingTasks()); @@ -1673,6 +1708,7 @@ public void normalRetry_thenNoTransparentRetry_butNormalRetry() { ArgumentCaptor sublistenerCaptor3 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream3).start(sublistenerCaptor3.capture()); + inOrder.verify(mockStream3).isReady(); inOrder.verifyNoMoreInteractions(); verify(retriableStreamRecorder, never()).postCommit(); } @@ -1694,6 +1730,7 @@ public void normalRetry_thenNoTransparentRetry_andNoMoreRetry() { ArgumentCaptor sublistenerCaptor1 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream1).start(sublistenerCaptor1.capture()); + inOrder.verify(mockStream1).isReady(); inOrder.verifyNoMoreInteractions(); // normal retry @@ -1707,6 +1744,7 @@ public void normalRetry_thenNoTransparentRetry_andNoMoreRetry() { ArgumentCaptor sublistenerCaptor2 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream2).start(sublistenerCaptor2.capture()); + inOrder.verify(mockStream2).isReady(); inOrder.verifyNoMoreInteractions(); verify(retriableStreamRecorder, never()).postCommit(); assertEquals(0, fakeClock.numPendingTasks()); @@ -1737,6 +1775,7 @@ method, new Metadata(), channelBufferUsed, PER_RPC_BUFFER_LIMIT, CHANNEL_BUFFER_ ArgumentCaptor sublistenerCaptor1 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream1).start(sublistenerCaptor1.capture()); + inOrder.verify(mockStream1).isReady(); inOrder.verifyNoMoreInteractions(); // transparent retry @@ -1749,6 +1788,7 @@ method, new Metadata(), channelBufferUsed, PER_RPC_BUFFER_LIMIT, CHANNEL_BUFFER_ ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(retriableStreamRecorder).postCommit(); inOrder.verify(mockStream2).start(sublistenerCaptor2.capture()); + inOrder.verify(mockStream2).isReady(); inOrder.verifyNoMoreInteractions(); assertEquals(0, fakeClock.numPendingTasks()); } @@ -1767,6 +1807,7 @@ public void droppedShouldNeverRetry() { ArgumentCaptor sublistenerCaptor1 = ArgumentCaptor.forClass(ClientStreamListener.class); verify(mockStream1).start(sublistenerCaptor1.capture()); + verify(mockStream1).isReady(); // drop and verify no retry Status status = Status.fromCode(RETRIABLE_STATUS_CODE_1); @@ -1838,6 +1879,7 @@ public Void answer(InvocationOnMock in) { ArgumentCaptor sublistenerCaptor1 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream1).start(sublistenerCaptor1.capture()); + inOrder.verify(mockStream1).isReady(); inOrder.verifyNoMoreInteractions(); hedgingStream.sendMessage("msg1"); @@ -1879,6 +1921,8 @@ public Void answer(InvocationOnMock in) { inOrder.verify(mockStream2, times(2)).flush(); inOrder.verify(mockStream2).writeMessage(any(InputStream.class)); inOrder.verify(mockStream2).request(456); + inOrder.verify(mockStream1).isReady(); + inOrder.verify(mockStream2).isReady(); inOrder.verifyNoMoreInteractions(); // send more messages @@ -1916,6 +1960,9 @@ public Void answer(InvocationOnMock in) { inOrder.verify(mockStream3).writeMessage(any(InputStream.class)); inOrder.verify(mockStream3).request(456); inOrder.verify(mockStream3, times(2)).writeMessage(any(InputStream.class)); + inOrder.verify(mockStream1).isReady(); + inOrder.verify(mockStream2).isReady(); + inOrder.verify(mockStream3).isReady(); inOrder.verifyNoMoreInteractions(); // send one more message @@ -1958,6 +2005,9 @@ public Void answer(InvocationOnMock in) { inOrder.verify(mockStream4).writeMessage(any(InputStream.class)); inOrder.verify(mockStream4).request(456); inOrder.verify(mockStream4, times(4)).writeMessage(any(InputStream.class)); + inOrder.verify(mockStream1).isReady(); + inOrder.verify(mockStream2).isReady(); + inOrder.verify(mockStream4).isReady(); inOrder.verifyNoMoreInteractions(); InsightBuilder insight = new InsightBuilder(); @@ -2008,6 +2058,7 @@ public void hedging_maxAttempts() { ArgumentCaptor sublistenerCaptor1 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream1).start(sublistenerCaptor1.capture()); + inOrder.verify(mockStream1).isReady(); inOrder.verifyNoMoreInteractions(); fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS); @@ -2015,6 +2066,7 @@ public void hedging_maxAttempts() { ArgumentCaptor sublistenerCaptor2 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream2).start(sublistenerCaptor2.capture()); + inOrder.verify(mockStream2).isReady(); inOrder.verifyNoMoreInteractions(); fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS); @@ -2022,6 +2074,7 @@ public void hedging_maxAttempts() { ArgumentCaptor sublistenerCaptor3 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream3).start(sublistenerCaptor3.capture()); + inOrder.verify(mockStream3).isReady(); inOrder.verifyNoMoreInteractions(); fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS); @@ -2029,6 +2082,7 @@ public void hedging_maxAttempts() { ArgumentCaptor sublistenerCaptor4 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream4).start(sublistenerCaptor4.capture()); + inOrder.verify(mockStream4).isReady(); inOrder.verifyNoMoreInteractions(); // a random one of the hedges fails @@ -2040,6 +2094,7 @@ public void hedging_maxAttempts() { ArgumentCaptor sublistenerCaptor5 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream5).start(sublistenerCaptor5.capture()); + inOrder.verify(mockStream5).isReady(); inOrder.verifyNoMoreInteractions(); fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS); @@ -2047,6 +2102,7 @@ public void hedging_maxAttempts() { ArgumentCaptor sublistenerCaptor6 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream6).start(sublistenerCaptor6.capture()); + inOrder.verify(mockStream6).isReady(); inOrder.verifyNoMoreInteractions(); fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS); @@ -2091,6 +2147,7 @@ public void hedging_receiveHeaders() { ArgumentCaptor sublistenerCaptor1 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream1).start(sublistenerCaptor1.capture()); + inOrder.verify(mockStream1).isReady(); inOrder.verifyNoMoreInteractions(); fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS); @@ -2098,6 +2155,7 @@ public void hedging_receiveHeaders() { ArgumentCaptor sublistenerCaptor2 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream2).start(sublistenerCaptor2.capture()); + inOrder.verify(mockStream2).isReady(); inOrder.verifyNoMoreInteractions(); fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS); @@ -2105,6 +2163,7 @@ public void hedging_receiveHeaders() { ArgumentCaptor sublistenerCaptor3 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream3).start(sublistenerCaptor3.capture()); + inOrder.verify(mockStream3).isReady(); inOrder.verifyNoMoreInteractions(); // a random one of the hedges receives headers @@ -2142,6 +2201,7 @@ public void hedging_pushback_negative() { ArgumentCaptor sublistenerCaptor1 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream1).start(sublistenerCaptor1.capture()); + inOrder.verify(mockStream1).isReady(); inOrder.verifyNoMoreInteractions(); fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS); @@ -2149,6 +2209,7 @@ public void hedging_pushback_negative() { ArgumentCaptor sublistenerCaptor2 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream2).start(sublistenerCaptor2.capture()); + inOrder.verify(mockStream2).isReady(); inOrder.verifyNoMoreInteractions(); fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS); @@ -2156,6 +2217,7 @@ public void hedging_pushback_negative() { ArgumentCaptor sublistenerCaptor3 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream3).start(sublistenerCaptor3.capture()); + inOrder.verify(mockStream3).isReady(); inOrder.verifyNoMoreInteractions(); // a random one of the hedges receives a negative pushback @@ -2187,6 +2249,7 @@ public void hedging_pushback_positive() { ArgumentCaptor sublistenerCaptor1 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream1).start(sublistenerCaptor1.capture()); + inOrder.verify(mockStream1).isReady(); inOrder.verifyNoMoreInteractions(); fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS); @@ -2194,6 +2257,7 @@ public void hedging_pushback_positive() { ArgumentCaptor sublistenerCaptor2 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream2).start(sublistenerCaptor2.capture()); + inOrder.verify(mockStream2).isReady(); inOrder.verifyNoMoreInteractions(); @@ -2211,6 +2275,7 @@ public void hedging_pushback_positive() { ArgumentCaptor sublistenerCaptor3 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream3).start(sublistenerCaptor3.capture()); + inOrder.verify(mockStream3).isReady(); inOrder.verifyNoMoreInteractions(); // hedge2 receives a pushback for HEDGING_DELAY_IN_SECONDS - 1 second @@ -2224,6 +2289,7 @@ public void hedging_pushback_positive() { ArgumentCaptor sublistenerCaptor4 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream4).start(sublistenerCaptor4.capture()); + inOrder.verify(mockStream4).isReady(); inOrder.verifyNoMoreInteractions(); // commit @@ -2253,6 +2319,7 @@ public void hedging_cancelled() { ArgumentCaptor sublistenerCaptor1 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream1).start(sublistenerCaptor1.capture()); + inOrder.verify(mockStream1).isReady(); inOrder.verifyNoMoreInteractions(); fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS); @@ -2260,6 +2327,8 @@ public void hedging_cancelled() { ArgumentCaptor sublistenerCaptor2 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream2).start(sublistenerCaptor2.capture()); + inOrder.verify(mockStream1).isReady(); + inOrder.verify(mockStream2).isReady(); inOrder.verifyNoMoreInteractions(); Status status = Status.CANCELLED.withDescription("cancelled"); @@ -2274,6 +2343,8 @@ public void hedging_cancelled() { assertEquals(CANCELLED_BECAUSE_COMMITTED, statusCaptor.getValue().getDescription()); inOrder.verify(retriableStreamRecorder).postCommit(); + inOrder.verify(masterListener).closed( + any(Status.class), any(RpcProgress.class), any(Metadata.class)); inOrder.verifyNoMoreInteractions(); } @@ -2288,6 +2359,7 @@ public void hedging_perRpcBufferLimitExceeded() { ArgumentCaptor sublistenerCaptor1 = ArgumentCaptor.forClass(ClientStreamListener.class); verify(mockStream1).start(sublistenerCaptor1.capture()); + verify(mockStream1).isReady(); ClientStreamTracer bufferSizeTracer1 = bufferSizeTracer; bufferSizeTracer1.outboundWireSize(PER_RPC_BUFFER_LIMIT - 1); @@ -2296,6 +2368,8 @@ public void hedging_perRpcBufferLimitExceeded() { ArgumentCaptor sublistenerCaptor2 = ArgumentCaptor.forClass(ClientStreamListener.class); verify(mockStream2).start(sublistenerCaptor2.capture()); + verify(mockStream1, times(2)).isReady(); + verify(mockStream2).isReady(); ClientStreamTracer bufferSizeTracer2 = bufferSizeTracer; bufferSizeTracer2.outboundWireSize(PER_RPC_BUFFER_LIMIT - 1); @@ -2312,6 +2386,7 @@ public void hedging_perRpcBufferLimitExceeded() { verify(retriableStreamRecorder).postCommit(); verifyNoMoreInteractions(mockStream1); + verify(mockStream2).isReady(); verifyNoMoreInteractions(mockStream2); } @@ -2326,6 +2401,7 @@ public void hedging_channelBufferLimitExceeded() { ArgumentCaptor sublistenerCaptor1 = ArgumentCaptor.forClass(ClientStreamListener.class); verify(mockStream1).start(sublistenerCaptor1.capture()); + verify(mockStream1).isReady(); ClientStreamTracer bufferSizeTracer1 = bufferSizeTracer; bufferSizeTracer1.outboundWireSize(100); @@ -2334,6 +2410,8 @@ public void hedging_channelBufferLimitExceeded() { ArgumentCaptor sublistenerCaptor2 = ArgumentCaptor.forClass(ClientStreamListener.class); verify(mockStream2).start(sublistenerCaptor2.capture()); + verify(mockStream1, times(2)).isReady(); + verify(mockStream2).isReady(); ClientStreamTracer bufferSizeTracer2 = bufferSizeTracer; bufferSizeTracer2.outboundWireSize(100);